Transformation Functions

These return a new ReadableStream — the original is not consumed.


map

Applies one or more transform functions to each chunk. Supports chaining up to 9 transforms with full type inference.

function map<A, B>(readable: ReadableStream<A>, fn: (chunk: A) => B | Promise<B>): ReadableStream<B>
function map<A, B, C>(readable: ReadableStream<A>, fn1: (chunk: A) => B, fn2: (chunk: B) => C): ReadableStream<C>
// ... up to 9 chained transforms
Parameter Type Description
readable ReadableStream<A> The input stream
...fns ((chunk: T) => U)[] One or more transform functions

Returns: ReadableStream of the final transform’s output type

import { createReadable, list, map } from "@sgmonda/streamfu"

const stream = createReadable(["alice", "bob", "charlie"])

// Single transform
const upper = map(stream, (s) => s.toUpperCase())
// ["ALICE", "BOB", "CHARLIE"]

// Chained transforms — types flow through
const result = map(
  createReadable(["1,alice", "2,bob"]),
  (line) => line.split(","),
  (cols) => ({ id: Number(cols[0]), name: cols[1] }),
)
// [{ id: 1, name: "alice" }, { id: 2, name: "bob" }]

filter

Keeps only chunks that match a predicate function. Supports async predicates.

function filter<T>(readable: ReadableStream<T>, fn: (chunk: T) => boolean | Promise<boolean>): ReadableStream<T>
Parameter Type Description
readable ReadableStream<T> The input stream
fn (chunk: T) => boolean \| Promise<boolean> Predicate function

Returns: ReadableStream<T>

import { createReadable, filter, list } from "@sgmonda/streamfu"

const evens = await list(
  filter(createReadable([1, 2, 3, 4, 5]), (n) => n % 2 === 0),
)
// [2, 4]

flat

Flattens a stream of arrays.

function flat<T>(readable: ReadableStream<unknown>, depth?: number): ReadableStream<T>
Parameter Type Default Description
readable ReadableStream The stream to flatten
depth number 1 How deep to flatten

Returns: ReadableStream<T>

import { createReadable, flat, list } from "@sgmonda/streamfu"

const result = await list(
  flat(createReadable([[1, 2], [3, 4], [5]])),
)
// [1, 2, 3, 4, 5]

flatMap

Map + flatten in one step. Applies a function that returns an array, then flattens by one level.

function flatMap<T, U>(readable: ReadableStream<T>, mapper: (chunk: T) => U[]): ReadableStream<U>
Parameter Type Description
readable ReadableStream<T> The input stream
mapper (chunk: T) => U[] Function returning an array

Returns: ReadableStream<U>

import { createReadable, flatMap, list } from "@sgmonda/streamfu"

const result = await list(
  flatMap(createReadable([1, 2, 3]), (n) => [n, n * 10]),
)
// [1, 10, 2, 20, 3, 30]

slice

Extracts a portion of the stream between start and end indexes.

function slice<T>(readable: ReadableStream<T>, start: number, end?: number): ReadableStream<T>
Parameter Type Default Description
readable ReadableStream<T> The stream to slice
start number Start index (inclusive)
end number Infinity End index (exclusive)

Returns: ReadableStream<T>

import { createReadable, list, slice } from "@sgmonda/streamfu"

const result = await list(
  slice(createReadable([10, 20, 30, 40, 50]), 1, 4),
)
// [20, 30, 40]

splice

Remove and/or insert chunks at a position.

function splice<T>(readable: ReadableStream<T>, start: number, replaced: number, ...newItems: T[]): ReadableStream<T>
Parameter Type Description
readable ReadableStream<T> The input stream
start number Index at which to start replacing
replaced number Number of items to remove
...newItems T[] Items to insert

Returns: ReadableStream<T>

import { createReadable, list, splice } from "@sgmonda/streamfu"

const result = await list(
  splice(createReadable([1, 2, 3, 4, 5]), 2, 1, 99, 100),
)
// [1, 2, 99, 100, 4, 5]

concat

Concatenates multiple streams sequentially.

function concat<T>(...readables: ReadableStream<T>[]): ReadableStream<T>
Parameter Type Description
...readables ReadableStream<T>[] Streams to concatenate

Returns: ReadableStream<T>

import { concat, createReadable, list } from "@sgmonda/streamfu"

const result = await list(
  concat(createReadable([1, 2]), createReadable([3, 4]), createReadable([5])),
)
// [1, 2, 3, 4, 5]

zip

Combines multiple streams into a stream of tuples. The shortest stream determines the output length.

function zip<T extends readonly ReadableStream<unknown>[]>(...readables: T): ReadableStream<ITuple<T>>
Parameter Type Description
...readables ReadableStream[] Streams to combine

Returns: ReadableStream<[T1, T2, ...]> — tuples of values from each stream

import { createReadable, list, zip } from "@sgmonda/streamfu"

const result = await list(
  zip(createReadable(["a", "b", "c"]), createReadable([1, 2, 3])),
)
// [["a", 1], ["b", 2], ["c", 3]]

pipe

Chains multiple stream operations with full type inference (up to 9 transforms).

function pipe<A, B>(readable: ReadableStream<A>, fn1: (r: ReadableStream<A>) => ReadableStream<B>): ReadableStream<B>
function pipe<A, B, C>(readable: ReadableStream<A>, fn1: ..., fn2: ...): ReadableStream<C>
// ... up to 9 chained functions
Parameter Type Description
readable ReadableStream<A> The input stream
...fns ((r: ReadableStream) => ReadableStream)[] Transform functions

Returns: ReadableStream of the final function’s output type

import { createReadable, filter, list, map, pipe } from "@sgmonda/streamfu"

const result = await list(pipe(
  createReadable([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]),
  (r) => filter(r, (n) => n % 2 === 0),
  (r) => map(r, (n) => n * 2),
  (r) => map(r, (n) => `Value: ${n}`),
))
// ["Value: 4", "Value: 8", "Value: 12", "Value: 16", "Value: 20"]

branch

Splits a stream into n independent copies. The original stream is locked after branching.

function branch<T>(readable: ReadableStream<T>, n: number): ReadableStream<T>[]
Parameter Type Description
readable ReadableStream<T> Stream to branch
n number Number of copies

Returns: ReadableStream<T>[]

import { branch, createReadable, list, reduce } from "@sgmonda/streamfu"

const [forSum, forList] = branch(createReadable([1, 2, 3]), 2)

const sum = await reduce(forSum, (a, b) => a + b, 0) // 6
const items = await list(forList) // [1, 2, 3]