Recipes

Complete, copy-pasteable pipelines for real tasks — not didactic snippets. Each recipe is mirrored by an executable, type-checked source file under examples/recipes/, so the code here cannot drift from an API that compiles. The offline ones (NDJSON ingest, large-CSV) are unit-tested.

Every recipe splits a runtime-agnostic core (a stream transform you can unit-test) from a thin network/file entrypoint you wire to your I/O.


NDJSON ingest with bounded concurrency

Stream a Newline-Delimited JSON endpoint, drop heartbeat events, enrich each surviving event with up to 10 concurrent async calls (order preserved), and drain to a sink — backpressure end-to-end, no intermediate array.

import { filter, fromResponse, mapAsync, ndjson, pipeTo } from "@sgmonda/streamfu"

type RawEvent = { id: number; type: string }
type EnrichedEvent = RawEvent & { receivedAt: number }

function ingestEvents(
  source: ReadableStream<Uint8Array | string>,
  enrich: (event: RawEvent) => Promise<EnrichedEvent>,
  sink: WritableStream<EnrichedEvent>,
): Promise<void> {
  return pipeTo(
    ndjson<RawEvent>(source),
    filter((event: RawEvent) => event.type !== "heartbeat"),
    mapAsync(enrich, { concurrency: 10 }),
    sink,
  )
}

// Network entrypoint
const response = await fetch("https://api.example.com/events.ndjson")
await ingestEvents(
  fromResponse(response),
  async (event) => ({ ...event, receivedAt: Date.now() }),
  sink,
)

ndjson skips blank lines and a leading BOM, and errors with an NdjsonParseError carrying the 1-indexed line on a bad record. mapAsync keeps exactly 10 enrich calls in flight and preserves input order; pass { ordered: false } to emit as each completes.


Streaming aggregation over a large CSV

Parse an RFC 4180 CSV with a header row, keep only European rows, project to a numeric shape, and keep a running cumulative total with scan — one summary per surviving row, without loading the file into memory.

import { csv, filter, map, pipeTo, scan } from "@sgmonda/streamfu"

type RunningTotal = { country: string; population: number; cumulative: number }

function summarizePopulation(
  source: ReadableStream<Uint8Array | string>,
  sink: WritableStream<RunningTotal>,
): Promise<void> {
  return pipeTo(
    csv({ header: true })(source),
    filter((row) => row.continent === "Europe"),
    map((row) => ({ country: row.country, population: Number(row.population) })),
    scan(
      (acc, row): RunningTotal => ({ ...row, cumulative: acc.cumulative + row.population }),
      { country: "", population: 0, cumulative: 0 } as RunningTotal,
    ),
    sink,
  )
}

// File entrypoint (Deno). Node: Readable.toWeb(fs.createReadStream(path))
const file = await Deno.open("./countries.csv")
await summarizePopulation(file.readable, sink)

csv({ header: true }) consumes the first row as keys and handles quoted fields, doubled-quote escapes, \n inside quotes, and a leading BOM. scan emits the accumulator after each chunk (the seed is never emitted).


Server-Sent Events (SSE) consumer

Stream a text/event-stream body, split it into lines, keep data: lines, and parse each payload as JSON — no EventSource, so it runs identically on the server and in the browser.

import { filter, forEach, fromResponse, lines, map, pipe } from "@sgmonda/streamfu"

function sseMessages<T>(source: ReadableStream<Uint8Array | string>): ReadableStream<T> {
  return pipe(
    source,
    lines,
    filter((line: string) => line.startsWith("data:")),
    map((line: string) => JSON.parse(line.slice("data:".length).trim()) as T),
  )
}

// Network entrypoint
const response = await fetch(url, { headers: { Accept: "text/event-stream" } })
await forEach(sseMessages<MyEvent>(fromResponse(response)), (message) => handle(message))

This minimal parser handles the common data:-only case. For multi-line data: fields or event: / id: metadata, accumulate lines until a blank separator.


WebSocket feed consumer

Bridge a WebSocket’s message events into a stream with fromEvents, parse each frame, drop the ones that fail a domain check, and run a side effect per surviving message. Closing the pipeline removes the listener — no leak.

import { filter, forEach, fromEvents, map, pipe } from "@sgmonda/streamfu"

type Quote = { symbol: string; price: number }

const socket = new WebSocket("wss://feed.example.com")
const messages = fromEvents<MessageEvent<string>>(socket, "message")

await forEach(
  pipe(
    messages,
    map((event: MessageEvent<string>) => JSON.parse(event.data) as Quote),
    filter((quote: Quote) => quote.price > 0),
  ),
  (quote) => render(quote),
)

fromEvents’ queue is unbounded — for a high-frequency feed, add throttle, debounce, or bufferTime downstream to bound memory.


Grep a remote log via fetch streaming

Download a large text resource and emit only the lines matching a pattern, with backpressure — the body is never buffered whole, so a multi-GB log costs a constant amount of memory.

import { filter, forEach, fromResponse, lines, pipe } from "@sgmonda/streamfu"

const response = await fetch("https://logs.example.com/app.log")
await forEach(
  pipe(
    fromResponse(response),
    lines,
    filter((line: string) => /ERROR|FATAL/.test(line)),
  ),
  (line) => process(line),
)

fromResponse throws synchronously on a non-OK response, so a 500 returning an HTML error page fails here instead of being mis-parsed downstream.


S3/R2 download → transform → upload

Stream an NDJSON object out of a presigned GET URL, keep only error-level lines, re-serialize to NDJSON, and stream the result straight into a presigned PUT URL — Web APIs only, no SDK, no intermediate buffer. The object is never fully resident in memory.

import { filter, fromResponse, map, ndjson, pipe } from "@sgmonda/streamfu"

type LogLine = { level: string; msg: string }

const download = await fetch(presignedGetUrl)
const transformed = pipe(
  fromResponse(download),
  ndjson<LogLine>(),
  filter((line: LogLine) => line.level === "error"),
  map((line: LogLine) => JSON.stringify(line) + "\n"),
)

const body = transformed.pipeThrough(new TextEncoderStream())
const init: RequestInit = { method: "PUT", body }
;(init as { duplex?: "half" }).duplex = "half" // required to send a stream body
const upload = await fetch(presignedPutUrl, init)
if (!upload.ok) throw new Error(`upload failed: ${upload.status}`)

duplex: "half" is required by the Fetch spec to send a streaming request body; it is not yet in the lib.dom types, hence the assignment. Works on Deno, Node 18+, Bun, and Cloudflare Workers.