Advanced Streams: pipe & pipeline

Streams let Node process data piece by piece as it flows, and pipe() and pipeline() connect readable, transform, and writable streams together while handling backpressure and cleanup for you.

Learn Advanced Streams: pipe & pipeline in our free Node.js course — a beginner-friendly interactive lesson with worked examples, a practice exercise and a…

Part of the free Node.js course at LearnCodingFast — hands-on lessons with examples you run in your browser, plus practice exercises and a quick quiz.

In this lesson you'll go beyond the basics: connect streams with .pipe() , wire multi-stage chains safely with stream.pipeline() , build your own Transform , understand backpressure, and consume a stream with for await .

What You'll Learn in This Lesson

1️⃣ Connecting Streams with .pipe()

A readable stream is a source of data and a writable stream is a destination. The classic way to connect them is source.pipe(destination) . Every chunk the source emits flows into the destination, and when the source ends, the destination is told to finish up.

Crucially, .pipe() manages backpressure automatically: if the writable can't keep up, pipe pauses the readable and resumes it once the writable drains. That single feature is why piping beats manually wiring .on('data') and .write() yourself.

2️⃣ Safe Chains with pipeline() & a Transform

A Transform stream sits in the middle of a chain: it reads chunks and writes new ones. You implement transform(chunk, encoding, callback) and call callback(error, output) — pass null as the error and the data you want to send downstream.

stream.pipeline() (use the promise version from node:stream/promises ) connects source → transform → destination , waits for the whole chain to finish, and — unlike pipe — destroys every stream if any stage fails. It's the production-grade way to combine streams.

3️⃣ Errors, Cleanup & Async Iteration

The biggest reason to reach for pipeline() is error handling. If any stage throws or calls its callback with an error, pipeline rejects with that error and destroys every stream in the chain, so you never leak a half-open file or socket. With the promise API you just wrap it in try/catch .

When you only need to consume a stream (not connect it to a destination), every readable is an async iterable — so a plain for await loop works and applies backpressure naturally.

Your turn. Fill in the two blanks marked ___ to make a Transform that shouts each word, then run it and compare with the expected output.

No blanks this time — just a brief and an outline. Build a Transform that adds 10% tax to each price and prints a grand total at the end using flush() . Run it and check your output against the example in the comments.

📋 Quick Reference — Streams

Practice quiz

What is the main advantage of pipeline() over .pipe()?

  • It is shorter to type
  • It propagates errors and destroys every stream on failure
  • It only works on files
  • It disables backpressure

Answer: It propagates errors and destroys every stream on failure. pipeline() forwards the first error and cleans up every stream, unlike .pipe().

What is backpressure?

  • A readable producing data faster than a writable can consume it
  • An encryption step
  • A way to compress chunks
  • An error event name

Answer: A readable producing data faster than a writable can consume it. Backpressure is the readable outpacing the writable; pipe/pipeline pause the source to handle it.

In a Transform, how do you push data downstream with no error?

  • return chunk
  • this.emit(chunk)
  • callback(null, data)
  • throw data

Answer: callback(null, data). callback(error, output) — pass null as the error and the data to send on.

Which Transform hook runs once, right before the stream ends?

  • start()
  • close()
  • drain()
  • flush()

Answer: flush(). flush(cb) runs after the last chunk but before the stream closes — ideal for a summary line.

How can you consume a readable stream with plain control flow?

  • for await (const chunk of stream)
  • stream.map()
  • JSON.parse(stream)
  • stream.forEach()

Answer: for await (const chunk of stream). Every readable is an async iterable, so for await loops over its chunks.

What does Readable.from(['a','b','c']) create?

  • A writable stream
  • A readable stream emitting those chunks
  • A Transform
  • An array of buffers

Answer: A readable stream emitting those chunks. Readable.from builds a readable stream that emits the supplied items.

Which import gives the awaitable pipeline?

  • node:fs
  • node:events
  • node:stream/promises
  • node:util

Answer: node:stream/promises. import { pipeline } from 'node:stream/promises' returns a promise you can await.

Why use a Transform instead of Array.map for large data?

  • map is deprecated
  • Transform supports more operators
  • It processes one chunk at a time without loading everything into memory
  • It is the only option in Node

Answer: It processes one chunk at a time without loading everything into memory. A Transform keeps a tiny constant memory footprint for huge or continuous data.

What happens if a Transform never calls its callback?

  • The stream stalls and never finishes
  • It retries automatically
  • It throws immediately
  • Nothing, it is fine

Answer: The stream stalls and never finishes. Each chunk must end with exactly one callback call, or the stream hangs.

Does plain .pipe() forward errors and clean up the source on failure?

  • Yes, always
  • Only for files
  • Only in ES modules
  • No — that is why pipeline() exists

Answer: No — that is why pipeline() exists. .pipe() leaves the source open on a destination error; pipeline() fixes that.