stream: propagate abort reason in share and broadcast · nodejs/node@0008d01 · GitHub
Skip to content

Commit 0008d01

Browse files
trivikraduh95
authored andcommitted
stream: propagate abort reason in share and broadcast
Pass signal.reason to the multi-consumer cancel paths so signal abort is reported as AbortError instead of clean iterator completion. Also make detached share consumers rethrow a stored source error when they resume after cancellation, preserving the abort reason for pending pulls. Fixes: #63357 Signed-off-by: Kamat, Trivikram <16024985+trivikr@users.noreply.github.com> Assisted-by: openai:gpt-5.5 PR-URL: #63358 Fixes: #63357 Reviewed-By: Antoine du Hamel <duhamelantoine1995@gmail.com>
1 parent e286fa1 commit 0008d01

4 files changed

Lines changed: 87 additions & 24 deletions

File tree

lib/internal/streams/iter/broadcast.js

Lines changed: 5 additions & 1 deletion

lib/internal/streams/iter/share.js

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,7 @@ class ShareImpl {
144144
// cursor must re-pull rather than terminating prematurely.
145145
for (;;) {
146146
if (state.detached) {
147+
if (self.#sourceError) throw self.#sourceError;
147148
return { __proto__: null, done: true, value: undefined };
148149
}
149150

@@ -628,6 +629,10 @@ class SyncShareImpl {
628629
}
629630
}
630631

632+
function onShareCancel(shareImpl, signal) {
633+
onSignalAbort(signal, () => shareImpl.cancel(signal.reason));
634+
}
635+
631636
// =============================================================================
632637
// Public API
633638
// =============================================================================
@@ -657,7 +662,7 @@ function share(source, options = { __proto__: null }) {
657662
const shareImpl = new ShareImpl(normalized, opts);
658663

659664
if (signal) {
660-
onSignalAbort(signal, () => shareImpl.cancel());
665+
onShareCancel(shareImpl, signal);
661666
}
662667

663668
return shareImpl;

test/parallel/test-stream-iter-broadcast-from.js

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -70,11 +70,12 @@ async function testAbortSignal() {
7070

7171
ac.abort();
7272

73-
const batches = [];
74-
for await (const batch of consumer) {
75-
batches.push(batch);
76-
}
77-
assert.strictEqual(batches.length, 0);
73+
await assert.rejects(async () => {
74+
// eslint-disable-next-line no-unused-vars
75+
for await (const _ of consumer) {
76+
assert.fail('Should not reach here');
77+
}
78+
}, { name: 'AbortError' });
7879
}
7980

8081
async function testAlreadyAbortedSignal() {
@@ -84,11 +85,12 @@ async function testAlreadyAbortedSignal() {
8485
const { broadcast: bc } = broadcast({ signal: ac.signal });
8586
const consumer = bc.push();
8687

87-
const batches = [];
88-
for await (const batch of consumer) {
89-
batches.push(batch);
90-
}
91-
assert.strictEqual(batches.length, 0);
88+
await assert.rejects(async () => {
89+
// eslint-disable-next-line no-unused-vars
90+
for await (const _ of consumer) {
91+
assert.fail('Should not reach here');
92+
}
93+
}, { name: 'AbortError' });
9294
}
9395

9496
// =============================================================================

test/parallel/test-stream-iter-share-async.js

Lines changed: 64 additions & 12 deletions

0 commit comments

Comments
 (0)