Recipes
Complete, copy-pasteable pipelines for real tasks — not didactic snippets. Each recipe is mirrored by an executable, type-checked source file under examples/recipes/, so the code here cannot drift from an API that compiles. The offline ones (NDJSON ingest, large-CSV) are unit-tested.
Every recipe splits a runtime-agnostic core (a stream transform you can unit-test) from a thin network/file entrypoint you wire to your I/O.
NDJSON ingest with bounded concurrency
Stream a Newline-Delimited JSON endpoint, drop heartbeat events, enrich each surviving event with up to 10 concurrent async calls (order preserved), and drain to a sink — backpressure end-to-end, no intermediate array.
import { filter, fromResponse, mapAsync, ndjson, pipeTo } from "@sgmonda/streamfu"
type RawEvent = { id: number; type: string }
type EnrichedEvent = RawEvent & { receivedAt: number }
function ingestEvents(
source: ReadableStream<Uint8Array | string>,
enrich: (event: RawEvent) => Promise<EnrichedEvent>,
sink: WritableStream<EnrichedEvent>,
): Promise<void> {
return pipeTo(
ndjson<RawEvent>(source),
filter((event: RawEvent) => event.type !== "heartbeat"),
mapAsync(enrich, { concurrency: 10 }),
sink,
)
}
// Network entrypoint
const response = await fetch("https://api.example.com/events.ndjson")
await ingestEvents(
fromResponse(response),
async (event) => ({ ...event, receivedAt: Date.now() }),
sink,
)
ndjson skips blank lines and a leading BOM, and errors with an NdjsonParseError carrying the 1-indexed line on a bad record. mapAsync keeps exactly 10 enrich calls in flight and preserves input order; pass { ordered: false } to emit as each completes.
Streaming aggregation over a large CSV
Parse an RFC 4180 CSV with a header row, keep only European rows, project to a numeric shape, and keep a running cumulative total with scan — one summary per surviving row, without loading the file into memory.
import { csv, filter, map, pipeTo, scan } from "@sgmonda/streamfu"
type RunningTotal = { country: string; population: number; cumulative: number }
function summarizePopulation(
source: ReadableStream<Uint8Array | string>,
sink: WritableStream<RunningTotal>,
): Promise<void> {
return pipeTo(
csv({ header: true })(source),
filter((row) => row.continent === "Europe"),
map((row) => ({ country: row.country, population: Number(row.population) })),
scan(
(acc, row): RunningTotal => ({ ...row, cumulative: acc.cumulative + row.population }),
{ country: "", population: 0, cumulative: 0 } as RunningTotal,
),
sink,
)
}
// File entrypoint (Deno). Node: Readable.toWeb(fs.createReadStream(path))
const file = await Deno.open("./countries.csv")
await summarizePopulation(file.readable, sink)
csv({ header: true }) consumes the first row as keys and handles quoted fields, doubled-quote escapes, \n inside quotes, and a leading BOM. scan emits the accumulator after each chunk (the seed is never emitted).
Server-Sent Events (SSE) consumer
Stream a text/event-stream body, split it into lines, keep data: lines, and parse each payload as JSON — no EventSource, so it runs identically on the server and in the browser.
import { filter, forEach, fromResponse, lines, map, pipe } from "@sgmonda/streamfu"
function sseMessages<T>(source: ReadableStream<Uint8Array | string>): ReadableStream<T> {
return pipe(
source,
lines,
filter((line: string) => line.startsWith("data:")),
map((line: string) => JSON.parse(line.slice("data:".length).trim()) as T),
)
}
// Network entrypoint
const response = await fetch(url, { headers: { Accept: "text/event-stream" } })
await forEach(sseMessages<MyEvent>(fromResponse(response)), (message) => handle(message))
This minimal parser handles the common data:-only case. For multi-line data: fields or event: / id: metadata, accumulate lines until a blank separator.
WebSocket feed consumer
Bridge a WebSocket’s message events into a stream with fromEvents, parse each frame, drop the ones that fail a domain check, and run a side effect per surviving message. Closing the pipeline removes the listener — no leak.
import { filter, forEach, fromEvents, map, pipe } from "@sgmonda/streamfu"
type Quote = { symbol: string; price: number }
const socket = new WebSocket("wss://feed.example.com")
const messages = fromEvents<MessageEvent<string>>(socket, "message")
await forEach(
pipe(
messages,
map((event: MessageEvent<string>) => JSON.parse(event.data) as Quote),
filter((quote: Quote) => quote.price > 0),
),
(quote) => render(quote),
)
fromEvents’ queue is unbounded — for a high-frequency feed, add throttle, debounce, or bufferTime downstream to bound memory.
Grep a remote log via fetch streaming
Download a large text resource and emit only the lines matching a pattern, with backpressure — the body is never buffered whole, so a multi-GB log costs a constant amount of memory.
import { filter, forEach, fromResponse, lines, pipe } from "@sgmonda/streamfu"
const response = await fetch("https://logs.example.com/app.log")
await forEach(
pipe(
fromResponse(response),
lines,
filter((line: string) => /ERROR|FATAL/.test(line)),
),
(line) => process(line),
)
fromResponse throws synchronously on a non-OK response, so a 500 returning an HTML error page fails here instead of being mis-parsed downstream.
S3/R2 download → transform → upload
Stream an NDJSON object out of a presigned GET URL, keep only error-level lines, re-serialize to NDJSON, and stream the result straight into a presigned PUT URL — Web APIs only, no SDK, no intermediate buffer. The object is never fully resident in memory.
import { filter, fromResponse, map, ndjson, pipe } from "@sgmonda/streamfu"
type LogLine = { level: string; msg: string }
const download = await fetch(presignedGetUrl)
const transformed = pipe(
fromResponse(download),
ndjson<LogLine>(),
filter((line: LogLine) => line.level === "error"),
map((line: LogLine) => JSON.stringify(line) + "\n"),
)
const body = transformed.pipeThrough(new TextEncoderStream())
const init: RequestInit = { method: "PUT", body }
;(init as { duplex?: "half" }).duplex = "half" // required to send a stream body
const upload = await fetch(presignedPutUrl, init)
if (!upload.ok) throw new Error(`upload failed: ${upload.status}`)
duplex: "half" is required by the Fetch spec to send a streaming request body; it is not yet in the lib.dom types, hence the assignment. Works on Deno, Node 18+, Bun, and Cloudflare Workers.