Building a Type-Safe Data Processing Pipeline in TypeScript

This is a companion post to the Railway-Oriented TypeScript series — a bonus deep-dive showing the pipeline library in an ETL context, with no React or UI. If you haven’t read the main series, start here. The pipeline operators are covered in Part 2; this part covers what’s new for batch processing: combine, combineAll, partition, reusable sub-pipelines, and structured error reporting.

You have messy data from an external source. Some records are malformed, some fail business rules, some need enrichment from an async service. You need clean results, structured error reports, and graceful recovery — and you need all of this to work across thousands of records without accumulating a hand-rolled error tracking loop in every pipeline.

Here’s how the Result model handles it.

Open in StackBlitz — run the full pipeline in your browser, no setup required.

Schema for a Single Record

Records arrive as JSON with string values — common when pulling from CSVs or external APIs. The schema handles type coercion:

import {
  validate,
  object,
  required,
  chain,
  string,
  nonEmpty,
  email,
  parseNumber,
  min,
  parseDate,
  refine,
  formatErrors,
  type InferSchemaType,
  type ValidationError,
} from "@railway-ts/pipelines/schema";

const transactionSchema = object({
  id: required(chain(string(), nonEmpty())),
  customerEmail: required(chain(string(), nonEmpty(), email())),
  amount: required(chain(parseNumber(), min(0.01, "Amount must be positive"))),
  currency: required(
    chain(
      string(),
      refine((s) => ["USD", "EUR", "GBP"].includes(s), "Unsupported currency"),
    ),
  ),
  date: required(parseDate()),
});

type Transaction = InferSchemaType<typeof transactionSchema>;

parseNumber() converts "42.50"42.50. parseDate() converts "2025-03-15"Date. refine() applies a custom predicate. All errors accumulate — you get every problem in a record at once, not just the first.

validate returns Result<Transaction, ValidationError[]> directly — no SafeParseReturnType, no conversion.

Processing Pipeline

The operators here are covered in Part 2. What’s worth noting is how they compose in a data processing context — particularly how mapWith(normalize) and flatMapWith(enrichWithCustomerData) build a typed chain where each step’s output becomes the next step’s input:

import { flowAsync } from "@railway-ts/pipelines/composition";
import {
  ok,
  err,
  mapWith,
  flatMapWith,
  filterWith,
  tapWith,
  tapErrWith,
  orElseWith,
  mapErrWith,
} from "@railway-ts/pipelines/result";

const USD_RATES: Record<string, number> = { USD: 1, EUR: 1.08, GBP: 1.27 };

const normalize = (tx: Transaction) => ({
  ...tx,
  amountUSD: +(tx.amount * (USD_RATES[tx.currency] ?? 1)).toFixed(2),
  dateFormatted: tx.date.toISOString().split("T")[0],
});

type NormalizedTransaction = ReturnType<typeof normalize>;

const enrichWithCustomerData = async (tx: NormalizedTransaction) => {
  const res = await fetch(
    `/api/customers?email=${encodeURIComponent(tx.customerEmail)}`,
  );
  if (!res.ok) return err(`Customer lookup failed for ${tx.customerEmail}`);
  const customer = await res.json();
  return ok({ ...tx, customerName: customer.name, tier: customer.tier });
};

type EnrichedTransaction = NormalizedTransaction & {
  customerName: string;
  tier: string;
};

const MINIMUM_AMOUNT_USD = 10;

const processTransaction = flowAsync(
  (raw: unknown) => validate(raw, transactionSchema),
  mapErrWith((errors: ValidationError[]) =>
    Object.entries(formatErrors(errors))
      .map(([field, msg]) => `${field}: ${msg}`)
      .join("; "),
  ),
  mapWith(normalize),
  flatMapWith(enrichWithCustomerData),
  filterWith(
    (tx: EnrichedTransaction) => tx.amountUSD >= MINIMUM_AMOUNT_USD,
    `Transaction below minimum ($${MINIMUM_AMOUNT_USD})`,
  ),
  tapWith((tx: EnrichedTransaction) =>
    console.log(`Processed: ${tx.id} — $${tx.amountUSD} (${tx.customerName})`),
  ),
  tapErrWith((error: string) => console.error(`[pipeline error] ${error}`)),
  orElseWith((error: string) => {
    if (error.startsWith("Customer lookup failed")) {
      return ok({
        partial: true,
        message: "Processed without customer data",
        error,
      });
    }
    return err(error);
  }),
);

If any step returns Err, all subsequent steps are skipped. Recovery in orElseWith is localized — changing it doesn’t touch anything else in the pipeline.

await processTransaction({
  id: "tx-001",
  customerEmail: "alice@example.com",
  amount: "150.00",
  currency: "EUR",
  date: "2025-03-15",
});
// → Ok({ id: "tx-001", amountUSD: 162.00, customerName: "Alice Smith", ... })

await processTransaction({
  id: "",
  customerEmail: "not-an-email",
  amount: "-5",
  currency: "BTC",
  date: "not-a-date",
});
// → Err("id: String must not be empty; customerEmail: Invalid email format; ...")

await processTransaction({
  id: "tx-002",
  customerEmail: "bob@example.com",
  amount: "5.00",
  currency: "USD",
  date: "2025-03-15",
});
// → Err("Transaction below minimum ($10)")

Batch Processing

This is what the Result model earns its keep over try/catch in a batch context. Process all records first, then decide what to do with the results:

import {
  combine,
  combineAll,
  partition,
  match,
} from "@railway-ts/pipelines/result";

const results = await Promise.all(rawRecords.map(processTransaction));

Three batch semantics, one call each:

combine — all-or-nothing, fail fast. If any record fails, the whole batch is an Err with the first failure. Use this when the batch must entirely succeed or the whole job is invalid.

const batchResult = combine(results);
match(batchResult, {
  ok: (transactions) => console.log(`All ${transactions.length} processed`),
  err: (error) => console.error(`Batch failed: ${error}`),
});

combineAll — all-or-nothing, all errors. If any record fails, you get every failure at once. Useful for import validation where you want to show users all problems before they fix anything.

const batchResult = combineAll(results);
match(batchResult, {
  ok: (transactions) => console.log(`All ${transactions.length} processed`),
  err: (errors) =>
    errors.forEach((e, i) => console.error(`  Error ${i + 1}: ${e}`)),
});

partition — keep both sides. Every record gets processed; successes and failures are separated at the end. This is what ETL workloads typically need.

const { successes, failures } = partition(results);
console.log(`Processed: ${successes.length}, Failed: ${failures.length}`);

With try/catch, each of these semantics is a custom accumulation loop you write from scratch. With Result, they’re one-liners on top of results you already have.

Reusable Sub-Pipelines

flow composes steps into a named, reusable function — meaning sub-pipelines compose into larger pipelines without any special wiring:

import { flow } from "@railway-ts/pipelines/composition";

const validateAndNormalize = flow(
  (raw: unknown) => validate(raw, transactionSchema),
  mapErrWith((errors: ValidationError[]) =>
    Object.entries(formatErrors(errors))
      .map(([field, msg]) => `${field}: ${msg}`)
      .join("; "),
  ),
  mapWith(normalize),
);

const applyBusinessRules = flow(
  filterWith(
    (tx: NormalizedTransaction) => tx.amountUSD >= MINIMUM_AMOUNT_USD,
    `Below minimum ($${MINIMUM_AMOUNT_USD})`,
  ),
  filterWith(
    (tx: NormalizedTransaction) => tx.date >= new Date("2025-01-01"),
    "Transaction too old",
  ),
);

// Compose into the full pipeline
const processTransaction = flowAsync(
  validateAndNormalize,
  applyBusinessRules,
  flatMapWith(enrichWithCustomerData),
  tapWith((tx: EnrichedTransaction) => console.log(`Done: ${tx.id}`)),
);

// Or compose for a dry-run that skips side effects
const validateOnly = flow(validateAndNormalize, applyBusinessRules);
const dryRunResult = validateOnly(rawRecord);

Two pipelines that differ only in their enrichment step share validateAndNormalize and applyBusinessRules without any coordination overhead — they’re plain functions. validateOnly is useful for preview validation: check whether records will pass before committing to the async enrichment step.

Structured Error Reporting

ETL jobs need row-level context in their error reports:

const processAndReport = async (
  records: Array<{ row: number; data: unknown }>,
) => {
  const results = await Promise.all(
    records.map(async ({ row, data }) => ({
      row,
      result: await processTransaction(data),
    })),
  );

  const report = {
    total: results.length,
    succeeded: 0,
    failed: 0,
    errors: [] as Array<{ row: number; error: string }>,
  };

  for (const { row, result } of results) {
    match(result, {
      ok: () => {
        report.succeeded++;
      },
      err: (error) => {
        report.failed++;
        report.errors.push({ row, error });
      },
    });
  }

  return report;
};
const report = await processAndReport([
  {
    row: 1,
    data: {
      id: "tx-001",
      customerEmail: "alice@example.com",
      amount: "150",
      currency: "EUR",
      date: "2025-03-15",
    },
  },
  {
    row: 2,
    data: {
      id: "",
      customerEmail: "bad",
      amount: "-5",
      currency: "BTC",
      date: "nope",
    },
  },
  {
    row: 3,
    data: {
      id: "tx-003",
      customerEmail: "carol@example.com",
      amount: "75",
      currency: "USD",
      date: "2025-03-16",
    },
  },
]);

// { total: 3, succeeded: 2, failed: 1, errors: [{ row: 2, error: "id: String must not be empty; ..." }] }

Compared to try/catch

For reference, the same pipeline logic written imperatively:

const processTransactionImperative = async (raw: unknown) => {
  let transaction: Transaction;
  try {
    transaction = validateTransaction(raw); // throws on failure
  } catch (e) {
    console.error(`[pipeline error] Validation failed: ${e}`);
    return { success: false, error: `Validation failed: ${e}` };
  }

  const normalized = {
    ...transaction,
    amountUSD: +(
      transaction.amount * (USD_RATES[transaction.currency] ?? 1)
    ).toFixed(2),
    dateFormatted: transaction.date.toISOString().split("T")[0],
  };

  if (normalized.amountUSD < MINIMUM_AMOUNT_USD) {
    const error = `Transaction below minimum ($${MINIMUM_AMOUNT_USD})`;
    console.error(`[pipeline error] ${error}`);
    return { success: false, error };
  }

  let enriched;
  try {
    const res = await fetch(
      `/api/customers?email=${encodeURIComponent(normalized.customerEmail)}`,
    );
    if (!res.ok) throw new Error("Customer lookup failed");
    const customer = await res.json();
    enriched = {
      ...normalized,
      customerName: customer.name,
      tier: customer.tier,
    };
  } catch (e) {
    if (String(e).includes("Customer lookup failed")) {
      return {
        success: true,
        data: { partial: true, message: "Processed without customer data" },
      };
    }
    return { success: false, error: String(e) };
  }

  console.log(
    `Processed: ${enriched.id} — $${enriched.amountUSD} (${enriched.customerName})`,
  );
  return { success: true, data: enriched };
};

The pipeline version is shorter, but line count isn’t the interesting difference. What matters structurally: error handling is interleaved with business logic at every step; recovery is buried inside a catch block rather than isolated in orElseWith; adding a step means adding another try/catch; and batch semantics (combine, combineAll, partition) don’t exist — you write a custom accumulation loop for each pipeline.

The pipeline version is also easier to test: normalize and enrichWithCustomerData are standalone functions with clear Result contracts. Testing normalize means calling it with a Transaction. In the imperative version, testing the normalization step requires the validation step to succeed first.

Links:

  • @railway-ts/pipelines — Result, Option, pipe/flow, schema validation (0.2–3 kB per module, tree-shakable)

Full series: Railway-Oriented TypeScript — Overview · Part 1: The Glue Code Tax · Part 2: Composable Async Pipelines · Part 3: Schema-First React Forms · Setup & AI Tooling

Leave a Reply