Examples
Every example below shows a real task done the hard way with native Web Streams, then the easy way with streamfu.
1. Transform every chunk
Parse CSV lines and uppercase names.
Before — native Web Streams:
const transform1 = new TransformStream({
transform(line, ctrl) {
ctrl.enqueue(line.split(","))
},
})
const transform2 = new TransformStream({
transform(cols, ctrl) {
ctrl.enqueue({ name: cols[0].toUpperCase(), age: Number(cols[1]) })
},
})
const reader = csvStream.pipeThrough(transform1).pipeThrough(transform2).getReader()
const results = []
while (true) {
const { done, value } = await reader.read()
if (done) break
results.push(value)
}
reader.releaseLock()
After — streamfu:
import { list, map } from "@sgmonda/streamfu"
const stream = map(
csvStream,
(line) => line.split(","),
(cols) => ({ name: cols[0].toUpperCase(), age: Number(cols[1]) }),
)
const results = await list(stream)
2. Filter and collect
Keep only active users from a stream.
Before — native Web Streams:
const filter = new TransformStream({
transform(user, ctrl) {
if (user.status === "active") ctrl.enqueue(user)
},
})
const reader = usersStream.pipeThrough(filter).getReader()
const active = []
while (true) {
const { done, value } = await reader.read()
if (done) break
active.push(value)
}
reader.releaseLock()
After — streamfu:
import { filter, list } from "@sgmonda/streamfu"
const active = await list(filter(usersStream, (u) => u.status === "active"))
3. Reduce to a single value
Sum a stream of numbers.
Before — native Web Streams:
const reader = numbersStream.getReader()
let total = 0
while (true) {
const { done, value } = await reader.read()
if (done) break
total += value
}
reader.releaseLock()
After — streamfu:
import { reduce } from "@sgmonda/streamfu"
const total = await reduce(numbersStream, (sum, n) => sum + n, 0)
4. Multi-step pipeline
Filter, transform, and collect in one chain.
Before — native Web Streams:
const filterStep = new TransformStream({
transform(n, ctrl) {
if (n % 2 === 0) ctrl.enqueue(n)
},
})
const doubleStep = new TransformStream({
transform(n, ctrl) {
ctrl.enqueue(n * 2)
},
})
const toStringStep = new TransformStream({
transform(n, ctrl) {
ctrl.enqueue(`Value: ${n}`)
},
})
const reader = numbersStream
.pipeThrough(filterStep)
.pipeThrough(doubleStep)
.pipeThrough(toStringStep)
.getReader()
const results = []
while (true) {
const { done, value } = await reader.read()
if (done) break
results.push(value)
}
reader.releaseLock()
After — streamfu:
import { filter, list, map, pipe } from "@sgmonda/streamfu"
const results = await list(pipe(
numbersStream,
(r) => filter(r, (n) => n % 2 === 0),
(r) => map(r, (n) => n * 2),
(r) => map(r, (n) => `Value: ${n}`),
))
5. Concatenate multiple sources
Merge API pages into one stream and take the first 50.
Before — native Web Streams:
const sources = [page1Stream, page2Stream, page3Stream]
const reader1 = sources[0].getReader()
const reader2 = sources[1].getReader()
const reader3 = sources[2].getReader()
const all = []
for (const reader of [reader1, reader2, reader3]) {
while (true) {
const { done, value } = await reader.read()
if (done) break
all.push(value)
}
reader.releaseLock()
}
const first50 = all.slice(0, 50)
After — streamfu:
import { concat, list, slice } from "@sgmonda/streamfu"
const first50 = await list(slice(concat(page1Stream, page2Stream, page3Stream), 0, 50))
6. Zip parallel streams
Pair names with scores into labeled strings.
Before — native Web Streams:
const readerA = namesStream.getReader()
const readerB = scoresStream.getReader()
const leaderboard = []
while (true) {
const [a, b] = await Promise.all([readerA.read(), readerB.read()])
if (a.done || b.done) break
leaderboard.push(`${a.value}: ${b.value}`)
}
readerA.releaseLock()
readerB.releaseLock()
After — streamfu:
import { list, map, zip } from "@sgmonda/streamfu"
const leaderboard = await list(
map(zip(namesStream, scoresStream), ([name, score]) => `${name}: ${score}`),
)
7. Consume a stream twice
Get both the sum and the max from the same stream.
Before — native Web Streams:
const [copy1, copy2] = numbersStream.tee()
const reader1 = copy1.getReader()
let sum = 0
while (true) {
const { done, value } = await reader1.read()
if (done) break
sum += value
}
reader1.releaseLock()
const reader2 = copy2.getReader()
let max = -Infinity
while (true) {
const { done, value } = await reader2.read()
if (done) break
if (value > max) max = value
}
reader2.releaseLock()
After — streamfu:
import { branch, reduce } from "@sgmonda/streamfu"
const [forSum, forMax] = branch(numbersStream, 2)
const [sum, max] = await Promise.all([
reduce(forSum, (a, b) => a + b, 0),
reduce(forMax, (a, b) => (b > a ? b : a), -Infinity),
])
8. Flatten paginated results
Expand arrays of items into individual chunks.
Before — native Web Streams:
const flatten = new TransformStream({
transform(page, ctrl) {
for (const item of page) ctrl.enqueue(item)
},
})
const label = new TransformStream({
transform(item, ctrl) {
ctrl.enqueue(`#${item.id}: ${item.title}`)
},
})
const reader = pagesStream.pipeThrough(flatten).pipeThrough(label).getReader()
const items = []
while (true) {
const { done, value } = await reader.read()
if (done) break
items.push(value)
}
reader.releaseLock()
After — streamfu:
import { flatMap, list, map } from "@sgmonda/streamfu"
const items = await list(
map(flatMap(pagesStream, (page) => page), (item) => `#${item.id}: ${item.title}`),
)
9. Quick queries on a stream
Answer five questions about the data without five manual loops.
Before — native Web Streams:
// Need 5 copies — tee() only gives 2, so we chain:
const [s1, rest1] = dataStream.tee()
const [s2, rest2] = rest1.tee()
const [s3, rest3] = rest2.tee()
const [s4, s5] = rest3.tee()
// Each requires its own while(true) loop...
// (50+ lines of boilerplate omitted)
After — streamfu:
import { at, branch, every, includes, indexOf, some } from "@sgmonda/streamfu"
const [s1, s2, s3, s4, s5] = branch(dataStream, 5)
const [hasFortyTwo, allPositive, anyOver100, indexOf7, third] = await Promise.all([
includes(s1, 42),
every(s2, (n) => n > 0),
some(s3, (n) => n > 100),
indexOf(s4, 7),
at(s5, 2),
])
10. Generate, splice, and process
Create a range, replace elements, and log results.
Before — native Web Streams:
// Generate 1..10 manually
const numbers = new ReadableStream({
start(ctrl) {
for (let i = 1; i <= 10; i++) ctrl.enqueue(i)
ctrl.close()
},
})
// Splice: remove 3 items at index 3, insert 99 and 100
// (20+ lines of TransformStream boilerplate)
// Double each value
// (another TransformStream...)
const reader = numbers.pipeThrough(spliceTransform).pipeThrough(doubleTransform).getReader()
while (true) {
const { done, value } = await reader.read()
if (done) break
console.log(value)
}
reader.releaseLock()
After — streamfu:
import { forEach, map, pipe, range, splice } from "@sgmonda/streamfu"
await forEach(
pipe(
range(1, 10),
(r) => splice(r, 3, 3, 99, 100),
(r) => map(r, (n) => n * 2),
),
(value) => console.log(value),
)