Consumer Functions

These consume the stream and return a Promise — it cannot be reused afterward.

Need to consume a stream more than once? Use branch() first.


reduce

Reduces a stream to a single value, like Array.prototype.reduce.

function reduce<T, U>(readable: ReadableStream<T>, fn: (acc: U, chunk: T) => U, initialValue: U): Promise<U>
Parameter Type Description
readable ReadableStream<T> The stream to reduce
fn (acc: U, chunk: T) => U Reducer function
initialValue U Initial accumulator value

Returns: Promise<U>

import { createReadable, reduce } from "@sgmonda/streamfu"

const sum = await reduce(createReadable([1, 2, 3, 4, 5]), (acc, n) => acc + n, 0)
// 15

list

Collects all chunks into an array.

function list<T>(readable: ReadableStream<T>): Promise<T[]>
Parameter Type Description
readable ReadableStream<T> The stream to collect

Returns: Promise<T[]>

import { createReadable, list } from "@sgmonda/streamfu"

const items = await list(createReadable([1, 2, 3]))
// [1, 2, 3]

at

Gets the chunk at a specific index.

function at<T>(readable: ReadableStream<T>, index: number): Promise<T | undefined>
Parameter Type Description
readable ReadableStream<T> The stream to query
index number The index to retrieve

Returns: Promise<T | undefined>undefined if index is out of bounds

import { at, createReadable } from "@sgmonda/streamfu"

const third = await at(createReadable([10, 20, 30, 40, 50]), 2)
// 30

some

Checks if any chunk matches a predicate. Short-circuits on first match.

function some<T>(readable: ReadableStream<T>, predicate: (chunk: T) => boolean): Promise<boolean>
Parameter Type Description
readable ReadableStream<T> The stream to check
predicate (chunk: T) => boolean Test function

Returns: Promise<boolean>

import { createReadable, some } from "@sgmonda/streamfu"

const hasEven = await some(createReadable([1, 3, 4, 7]), (n) => n % 2 === 0)
// true

every

Checks if all chunks match a predicate. Short-circuits on first failure.

function every<T>(readable: ReadableStream<T>, predicate: (chunk: T) => boolean): Promise<boolean>
Parameter Type Description
readable ReadableStream<T> The stream to check
predicate (chunk: T) => boolean Test function

Returns: Promise<boolean>

import { createReadable, every } from "@sgmonda/streamfu"

const allPositive = await every(createReadable([1, 2, 3]), (n) => n > 0)
// true

forEach

Executes a function for each chunk. Receives the chunk and its index.

function forEach<T>(readable: ReadableStream<T>, fn: (chunk: T, index: number) => void | Promise<void>): Promise<void>
Parameter Type Description
readable ReadableStream<T> The stream to iterate
fn (chunk: T, index: number) => void Function to execute

Returns: Promise<void>

import { createReadable, forEach } from "@sgmonda/streamfu"

await forEach(createReadable([1, 2, 3]), (value, i) => {
  console.log(`${i}: ${value}`)
})
// 0: 1
// 1: 2
// 2: 3

includes

Checks if a value exists in the stream. Short-circuits on first match.

function includes<T>(readable: ReadableStream<T>, value: T): Promise<boolean>
Parameter Type Description
readable ReadableStream<T> The stream to search
value T Value to find

Returns: Promise<boolean>

import { createReadable, includes } from "@sgmonda/streamfu"

const has42 = await includes(createReadable([1, 2, 42, 100]), 42)
// true

indexOf

Finds the index of the first occurrence of a value. Returns -1 if not found.

function indexOf<T>(readable: ReadableStream<T>, value: T): Promise<number>
Parameter Type Description
readable ReadableStream<T> The stream to search
value T Value to find

Returns: Promise<number>

import { createReadable, indexOf } from "@sgmonda/streamfu"

const idx = await indexOf(createReadable(["a", "b", "c", "d"]), "c")
// 2