Skip to main content
Version: 2.0.0

Consumable pattern

The Consumable pattern is intended to be used with Web Streams API. It allows the consumer to notify the producer when the value is no longer needed.

The problem

In Web Streams API, because ReadableStreamDefaultController.prototype.enqueue is a synchronous method, which returns immediately after the data is enqueued, the ReadableStream can't wait for the data to be consumed. For example:

const buffer = new Uint8Array(1);

const readable = new ReadableStream({
start(controller) {
buffer[0] = 1;
controller.enqueue(buffer);
console.log("enqueue", 1);

buffer[0] = 2;
controller.enqueue(buffer);
console.log("enqueue", 2);

controller.close();
},
});

const transform = new TransformStream({
transform(chunk, controller) {
console.log("transform", chunk[0]);
controller.enqueue(chunk);
},
});

const writable = new WritableStream({
write(chunk) {
console.log("write", chunk[0]);
},
});

await readable.pipeThrough(transform).pipeTo(writable);

The output will be:

enqueue 1
enqueue 2
transform 2
write 2
transform 2
write 2

This means a ReadableStream can't just allocate one object (e.g. Uint8Array), then enqueue it multiple times with different content (doing this can reduce memory allocations and GC pressure). The consumer will only receive the last content.

info

This issue only happens when using ReadableStream.prototype.pipeTo or ReadableStream.prototype.pipeThrough.

When not using ReadableStreams, but using WritableStreamDefaultWriter.prototype.write method to write data directly, it will return a Promise that resolves after the data is written to the sink.

But in most times, we will use ReadableStreams and pipe them around.

Consumable<T>

The Consumable class is a wrapper around a value of type T. It allows the consumer to notify the producer when the value is no longer needed.

import { Consumable } from "@yume-chan/stream-extra";

function useData(data: Consumable<Uint8Array>) {
console.log(data.value); // use the value
data.consume(); // notify the producer
}

async function produceData() {
const data = new Consumable(new Uint8Array([1]));
useData(data);
await data.consumed;
}

To re-write the previous example using Consumable pattern:

import {
Consumable,
ReadableStream,
TransformStream,
WritableStream,
} from "@yume-chan/stream-extra";

const buffer = new Uint8Array(1);

const readable = new ReadableStream({
async start(controller) {
buffer[0] = 1;

const consumable1 = new Consumable(buffer);
controller.enqueue(consumable1);
await consumable1.consumed;
console.log("enqueue", 1);

buffer[0] = 2;

const consumable2 = new Consumable(buffer);
controller.enqueue(consumable2);
await consumable2.consumed;
console.log("enqueue", 2);

controller.close();
},
});

const transform = new TransformStream({
transform(chunk, controller) {
console.log("transform", chunk.value[0]);
controller.enqueue(chunk);
},
});

const writable = new WritableStream({
write(chunk) {
console.log("write", chunk.value[0]);
chunk.consume();
},
});

await readable.pipeThrough(transform).pipeTo(writable);

Here, we create a Consumable for each enqueue operation, and await for the consumed promise to be settled before modifying the buffer.

It doesn't completely eliminate allocations, but if the size of Consumable objects is smaller than the Uint8Array you're enqueuing (which is the usual case), it's still beneficial.

This time the output will be:

transform 1
write 1
enqueue 1
transform 2
write 2
enqueue 2

Consumable.ReadableStream

To simplify using Consumable with ReadableStreams, you can use Consumable.ReadableStream:

import { Consumable } from "@yume-chan/stream-extra";

const buffer = new Uint8Array(1);

const readable = new Consumable.ReadableStream({
async start(controller) {
buffer[0] = 1;

await controller.enqueue(buffer);
console.log("enqueue", 1);

buffer[0] = 2;

await controller.enqueue(buffer);
console.log("enqueue", 2);

controller.close();
},
});

const transform = new TransformStream({
transform(chunk, controller) {
console.log("transform", chunk.value[0]);
controller.enqueue(chunk);
},
});

const writable = new WritableStream({
write(chunk) {
console.log("write", chunk.value[0]);
chunk.consume();
},
});

await readable.pipeThrough(transform).pipeTo(writable);

The enqueue method is now asynchronous, and returns a Promise that resolves when the value is consumed.

danger

You must await the returned Promise for Consumables to work properly.

tryConsume

Consumable.prototype.tryConsume method is a helper method to consume the value by calling a callback:

declare class Consumable<T> {
tryConsume<U>(callback: (value: T) => U): U;
}

tryConsume invokes the callback immediately with the inner value.

If the callback returns a Promise, tryConsume will wait for the Promise to be settled before marking the Consumable as consumed. Otherwise, tryConsume will mark the Consumable as consumed when the callback completes (returns or throws).

import { Consumable } from "@yume-chan/stream-extra";

function useData(data: Consumable<Uint8Array>) {
data.tryConsume((value) => {
console.log(value);
});
}

function useDataAsync(data: Consumable<Uint8Array>) {
data.tryConsume(async (value) => {
// Can be async
await delay(1000);
console.log(value);
});
}

It also forwards (returns) the callback's return value:

import { Consumable } from "@yume-chan/stream-extra";

const consumable = new Consumable(new Uint8Array([1, 2, 3]));
const result = consumable.tryConsume((value) => {
return 42;
});

console.log(result); // 42

Improve debugging experience

Normally, when piping data around streams, Chrome DevTools doesn't track the data flow, so the call stack is always empty:

import { ReadableStream, WritableStream } from "@yume-chan/stream-extra";

new ReadableStream({
start(controller) {
controller.enqueue(new Uint8Array([1, 2, 3]));
controller.close();
},
}).pipeTo(
new WritableStream({
write(chunk) {
debugger; // Here the call stack is empty: it doesn't tell you where the `chunk` comes from
},
})
);

An additional feature of Consumable is that it uses console.createTask to capture the call stack where it was created.

When paired with tryConsume, Chrome DevTools will show the Consumable's creation site as async call stack:

import {
Consumable,
ReadableStream,
WritableStream,
} from "@yume-chan/stream-extra";

new ReadableStream({
start(controller) {
controller.enqueue(new Consumable(new Uint8Array([1, 2, 3])));
controller.close();
},
}).pipeTo(
new WritableStream({
write(chunk) {
chunk.tryConsume((value) => {
debugger; // The call stack will point to line 9 (`new Consumable(...)`)
});
},
})
);

MaybeConsumable<T>

Not all producers need the consumable pattern. Their data might be very lightweight, or they just can't avoid creating new objects.

For convenience, most WritableStreams in Tango accepts both T and Consumable<T>.

The MaybeConsumable<T> type is a shorthand for T | Consumable<T>.

import {
Consumable,
WritableStream,
MaybeConsumable,
} from "@yume-chan/stream-extra";

declare const stream: WritableStream<MaybeConsumable<Uint8Array>>;

const writer = stream.getWriter();
await writer.write(new Consumable(new Uint8Array([4, 5, 6]))); // works
await writer.write(new Uint8Array([1, 2, 3])); // also works

This also allows a blob stream to be directly piped into them:

declare const blob: File | Blob;

blob.stream().pipeTo(writable);

You might choose to use Consumable<T> when either:

  • You need to re-use the buffer, so you want to track when it's consumed.
  • You want to improve debugging experience by synthesizing a call stack.

tryConsume

MaybeConsumable namespace also provides a helper method tryConsume. The value argument can be either T or Consumable<T>.

declare namespace MaybeConsumable {
export function tryConsume<T, R>(
value: T,
callback: (value: T extends Consumable<infer U> ? U : T) => R
): R;
}

If the value is a Consumable, it behaves the same as Consumable#tryConsume. If it's not, it just invokes the callback immediately with the value.

import { MaybeConsumable } from "@yume-chan/stream-extra";

MaybeConsumable.tryConsume(new Uint8Array([1, 2, 3]), (value) => {
console.log(value); // Uint8Array [1, 2, 3]
});

MaybeConsumable.tryConsume(
new Consumable(new Uint8Array([4, 5, 6])),
(value) => {
console.log(value); // Uint8Array [4, 5, 6]
}
);

await MaybeConsumable.tryConsume(
new Consumable(new Uint8Array([7, 8, 9])),
async (value) => {
// Can be async
await delay(1000);
console.log(value); // Uint8Array [7, 8, 9]
}
);

WritableStream

The MaybeConsumable.WritableStream<T> class combines WritableStream<T> and tryConsume to help you correctly handle the data.

import { MaybeConsumable } from "@yume-chan/stream-extra";

const writable = new MaybeConsumable.WritableStream<Uint8Array>({
// `write` can be `async`
async write(chunk) {
console.log(chunk); // You can only use `chunk` in `write` callback
},
});

You must not use the chunk outside the write callback, unless you make a copy yourself. It might be overwritten with other data after your write callback returns.

For example, WebSocket's send method will copy the data into an internal transmission buffer, so it's safe to use that in write:

import { MaybeConsumable } from "@yume-chan/stream-extra";

declare const socket: WebSocket;

const writable = new MaybeConsumable.WritableStream<Uint8Array>({
write(chunk) {
socket.send(chunk);
},
});