Consumable pattern
The Consumable
pattern is intended to be used with Web Streams API. It allows the consumer to notify the producer when the value is no longer needed.
The problem
In Web Streams API, because ReadableStreamDefaultController.prototype.enqueue
is a synchronous method, which returns immediately after the data is enqueued, the ReadableStream
can't wait for the data to be consumed. For example:
const buffer = new Uint8Array(1);
const readable = new ReadableStream({
start(controller) {
buffer[0] = 1;
controller.enqueue(buffer);
console.log("enqueue", 1);
buffer[0] = 2;
controller.enqueue(buffer);
console.log("enqueue", 2);
controller.close();
},
});
const transform = new TransformStream({
transform(chunk, controller) {
console.log("transform", chunk[0]);
controller.enqueue(chunk);
},
});
const writable = new WritableStream({
write(chunk) {
console.log("write", chunk[0]);
},
});
await readable.pipeThrough(transform).pipeTo(writable);
The output will be:
enqueue 1
enqueue 2
transform 2
write 2
transform 2
write 2
This means a ReadableStream
can't just allocate one object (e.g. Uint8Array
), then enqueue it multiple times with different content (doing this can reduce memory allocations and GC pressure). The consumer will only receive the last content.
This issue only happens when using ReadableStream.prototype.pipeTo
or ReadableStream.prototype.pipeThrough
.
When not using ReadableStream
s, but using WritableStreamDefaultWriter.prototype.write
method to write data directly, it will return a Promise
that resolves after the data is written to the sink.
But in most times, we will use ReadableStream
s and pipe them around.
Consumable<T>
The Consumable
class is a wrapper around a value of type T
. It allows the consumer to notify the producer when the value is no longer needed.
import { Consumable } from "@yume-chan/stream-extra";
function useData(data: Consumable<Uint8Array>) {
console.log(data.value); // use the value
data.consume(); // notify the producer
}
async function produceData() {
const data = new Consumable(new Uint8Array([1]));
useData(data);
await data.consumed;
}
To re-write the previous example using Consumable
pattern:
import {
Consumable,
ReadableStream,
TransformStream,
WritableStream,
} from "@yume-chan/stream-extra";
const buffer = new Uint8Array(1);
const readable = new ReadableStream({
async start(controller) {
buffer[0] = 1;
const consumable1 = new Consumable(buffer);
controller.enqueue(consumable1);
await consumable1.consumed;
console.log("enqueue", 1);
buffer[0] = 2;
const consumable2 = new Consumable(buffer);
controller.enqueue(consumable2);
await consumable2.consumed;
console.log("enqueue", 2);
controller.close();
},
});
const transform = new TransformStream({
transform(chunk, controller) {
console.log("transform", chunk.value[0]);
controller.enqueue(chunk);
},
});
const writable = new WritableStream({
write(chunk) {
console.log("write", chunk.value[0]);
chunk.consume();
},
});
await readable.pipeThrough(transform).pipeTo(writable);
Here, we create a Consumable
for each enqueue
operation, and await
for the consumed
promise to be settled before modifying the buffer.
It doesn't completely eliminate allocations, but if the size of Consumable
objects is smaller than the Uint8Array
you're enqueuing (which is the usual case), it's still beneficial.
This time the output will be:
transform 1
write 1
enqueue 1
transform 2
write 2
enqueue 2
Consumable.ReadableStream
To simplify using Consumable
with ReadableStream
s, you can use Consumable.ReadableStream
:
import { Consumable } from "@yume-chan/stream-extra";
const buffer = new Uint8Array(1);
const readable = new Consumable.ReadableStream({
async start(controller) {
buffer[0] = 1;
await controller.enqueue(buffer);
console.log("enqueue", 1);
buffer[0] = 2;
await controller.enqueue(buffer);
console.log("enqueue", 2);
controller.close();
},
});
const transform = new TransformStream({
transform(chunk, controller) {
console.log("transform", chunk.value[0]);
controller.enqueue(chunk);
},
});
const writable = new WritableStream({
write(chunk) {
console.log("write", chunk.value[0]);
chunk.consume();
},
});
await readable.pipeThrough(transform).pipeTo(writable);
The enqueue
method is now asynchronous, and returns a Promise
that resolves when the value is consumed.
You must await
the returned Promise
for Consumable
s to work properly.
tryConsume
Consumable.prototype.tryConsume
method is a helper method to consume the value by calling a callback:
declare class Consumable<T> {
tryConsume<U>(callback: (value: T) => U): U;
}
tryConsume
invokes the callback immediately with the inner value.
If the callback returns a Promise
, tryConsume
will wait for the Promise
to be settled before marking the Consumable
as consumed. Otherwise, tryConsume
will mark the Consumable
as consumed when the callback completes (returns or throws).
- JavaScript
- TypeScript
function useData(data) {
data.tryConsume((value) => {
console.log(value);
});
}
function useDataAsync(data) {
data.tryConsume(async (value) => {
// Can be async
await delay(1000);
console.log(value);
});
}
import { Consumable } from "@yume-chan/stream-extra";
function useData(data: Consumable<Uint8Array>) {
data.tryConsume((value) => {
console.log(value);
});
}
function useDataAsync(data: Consumable<Uint8Array>) {
data.tryConsume(async (value) => {
// Can be async
await delay(1000);
console.log(value);
});
}
It also forwards (returns) the callback's return value:
import { Consumable } from "@yume-chan/stream-extra";
const consumable = new Consumable(new Uint8Array([1, 2, 3]));
const result = consumable.tryConsume((value) => {
return 42;
});
console.log(result); // 42
Improve debugging experience
Normally, when piping data around streams, Chrome DevTools doesn't track the data flow, so the call stack is always empty:
import { ReadableStream, WritableStream } from "@yume-chan/stream-extra";
new ReadableStream({
start(controller) {
controller.enqueue(new Uint8Array([1, 2, 3]));
controller.close();
},
}).pipeTo(
new WritableStream({
write(chunk) {
debugger; // Here the call stack is empty: it doesn't tell you where the `chunk` comes from
},
})
);
An additional feature of Consumable
is that it uses console.createTask
to capture the call stack where it was created.
When paired with tryConsume
, Chrome DevTools will show the Consumable
's creation site as async call stack:
import {
Consumable,
ReadableStream,
WritableStream,
} from "@yume-chan/stream-extra";
new ReadableStream({
start(controller) {
controller.enqueue(new Consumable(new Uint8Array([1, 2, 3])));
controller.close();
},
}).pipeTo(
new WritableStream({
write(chunk) {
chunk.tryConsume((value) => {
debugger; // The call stack will point to line 9 (`new Consumable(...)`)
});
},
})
);
MaybeConsumable<T>
Not all producers need the consumable pattern. Their data might be very lightweight, or they just can't avoid creating new objects.
For convenience, most WritableStream
s in Tango accepts both T
and Consumable<T>
.
The MaybeConsumable<T>
type is a shorthand for T | Consumable<T>
.
import {
Consumable,
WritableStream,
MaybeConsumable,
} from "@yume-chan/stream-extra";
declare const stream: WritableStream<MaybeConsumable<Uint8Array>>;
const writer = stream.getWriter();
await writer.write(new Consumable(new Uint8Array([4, 5, 6]))); // works
await writer.write(new Uint8Array([1, 2, 3])); // also works
This also allows a blob stream to be directly piped into them:
declare const blob: File | Blob;
blob.stream().pipeTo(writable);
You might choose to use Consumable<T>
when either:
- You need to re-use the buffer, so you want to track when it's consumed.
- You want to improve debugging experience by synthesizing a call stack.
tryConsume
MaybeConsumable
namespace also provides a helper method tryConsume
. The value
argument can be either T
or Consumable<T>
.
declare namespace MaybeConsumable {
export function tryConsume<T, R>(
value: T,
callback: (value: T extends Consumable<infer U> ? U : T) => R
): R;
}
If the value
is a Consumable
, it behaves the same as Consumable#tryConsume
. If it's not, it just invokes the callback immediately with the value.
import { MaybeConsumable } from "@yume-chan/stream-extra";
MaybeConsumable.tryConsume(new Uint8Array([1, 2, 3]), (value) => {
console.log(value); // Uint8Array [1, 2, 3]
});
MaybeConsumable.tryConsume(
new Consumable(new Uint8Array([4, 5, 6])),
(value) => {
console.log(value); // Uint8Array [4, 5, 6]
}
);
await MaybeConsumable.tryConsume(
new Consumable(new Uint8Array([7, 8, 9])),
async (value) => {
// Can be async
await delay(1000);
console.log(value); // Uint8Array [7, 8, 9]
}
);
WritableStream
The MaybeConsumable.WritableStream<T>
class combines WritableStream<T>
and tryConsume
to help you correctly handle the data.
import { MaybeConsumable } from "@yume-chan/stream-extra";
const writable = new MaybeConsumable.WritableStream<Uint8Array>({
// `write` can be `async`
async write(chunk) {
console.log(chunk); // You can only use `chunk` in `write` callback
},
});
You must not use the chunk
outside the write
callback, unless you make a copy yourself. It might be overwritten with other data after your write
callback returns.
For example, WebSocket
's send
method will copy the data into an internal transmission buffer, so it's safe to use that in write
:
- JavaScript
- TypeScript
import { MaybeConsumable } from "@yume-chan/stream-extra";
const writable = new MaybeConsumable.WritableStream({
write(chunk) {
socket.send(chunk);
},
});
import { MaybeConsumable } from "@yume-chan/stream-extra";
declare const socket: WebSocket;
const writable = new MaybeConsumable.WritableStream<Uint8Array>({
write(chunk) {
socket.send(chunk);
},
});