stream: propagate abort reason in share and broadcast by trivikr · Pull Request #63358 · nodejs/node · GitHub
Skip to content
6 changes: 5 additions & 1 deletion lib/internal/streams/iter/broadcast.js
7 changes: 6 additions & 1 deletion lib/internal/streams/iter/share.js
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ class ShareImpl {
// cursor must re-pull rather than terminating prematurely.
for (;;) {
if (state.detached) {
if (self.#sourceError) throw self.#sourceError;
return { __proto__: null, done: true, value: undefined };
}

Expand Down Expand Up @@ -628,6 +629,10 @@ class SyncShareImpl {
}
}

function onShareCancel(shareImpl, signal) {
onSignalAbort(signal, () => shareImpl.cancel(signal.reason));
}

// =============================================================================
// Public API
// =============================================================================
Expand Down Expand Up @@ -657,7 +662,7 @@ function share(source, options = { __proto__: null }) {
const shareImpl = new ShareImpl(normalized, opts);

if (signal) {
onSignalAbort(signal, () => shareImpl.cancel());
onShareCancel(shareImpl, signal);
}

return shareImpl;
Expand Down
22 changes: 12 additions & 10 deletions test/parallel/test-stream-iter-broadcast-from.js
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,12 @@ async function testAbortSignal() {

ac.abort();

const batches = [];
for await (const batch of consumer) {
batches.push(batch);
}
assert.strictEqual(batches.length, 0);
await assert.rejects(async () => {
// eslint-disable-next-line no-unused-vars
for await (const _ of consumer) {
assert.fail('Should not reach here');
}
}, { name: 'AbortError' });
}

async function testAlreadyAbortedSignal() {
Expand All @@ -84,11 +85,12 @@ async function testAlreadyAbortedSignal() {
const { broadcast: bc } = broadcast({ signal: ac.signal });
const consumer = bc.push();

const batches = [];
for await (const batch of consumer) {
batches.push(batch);
}
assert.strictEqual(batches.length, 0);
await assert.rejects(async () => {
// eslint-disable-next-line no-unused-vars
for await (const _ of consumer) {
assert.fail('Should not reach here');
}
}, { name: 'AbortError' });
}

// =============================================================================
Expand Down
76 changes: 64 additions & 12 deletions test/parallel/test-stream-iter-share-async.js
Loading