stream: implement finished() for ReadableStream and WritableStream · nodejs/node@cde5960 · GitHub
Skip to content

Commit cde5960

Browse files
debadree25juanarbol
authored andcommitted
stream: implement finished() for ReadableStream and WritableStream
Refs: #39316 PR-URL: #46205 Reviewed-By: Robert Nagy <ronagy@icloud.com> Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Darshan Sen <raisinten@gmail.com> Reviewed-By: James M Snell <jasnell@gmail.com>
1 parent 896027c commit cde5960

5 files changed

Lines changed: 301 additions & 9 deletions

File tree

lib/internal/streams/end-of-stream.js

Lines changed: 20 additions & 5 deletions

lib/internal/streams/utils.js

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,16 @@ const {
44
Symbol,
55
SymbolAsyncIterator,
66
SymbolIterator,
7+
SymbolFor,
78
} = primordials;
89

910
const kDestroyed = Symbol('kDestroyed');
1011
const kIsErrored = Symbol('kIsErrored');
1112
const kIsReadable = Symbol('kIsReadable');
1213
const kIsDisturbed = Symbol('kIsDisturbed');
1314

15+
const kIsClosedPromise = SymbolFor('nodejs.webstream.isClosedPromise');
16+
1417
function isReadableNodeStream(obj, strict = false) {
1518
return !!(
1619
obj &&
@@ -55,6 +58,25 @@ function isNodeStream(obj) {
5558
);
5659
}
5760

61+
function isReadableStream(obj) {
62+
return !!(
63+
obj &&
64+
!isNodeStream(obj) &&
65+
typeof obj.pipeThrough === 'function' &&
66+
typeof obj.getReader === 'function' &&
67+
typeof obj.cancel === 'function'
68+
);
69+
}
70+
71+
function isWritableStream(obj) {
72+
return !!(
73+
obj &&
74+
!isNodeStream(obj) &&
75+
typeof obj.getWriter === 'function' &&
76+
typeof obj.abort === 'function'
77+
);
78+
}
79+
5880
function isIterable(obj, isAsync) {
5981
if (obj == null) return false;
6082
if (isAsync === true) return typeof obj[SymbolAsyncIterator] === 'function';
@@ -269,18 +291,21 @@ module.exports = {
269291
kIsErrored,
270292
isReadable,
271293
kIsReadable,
294+
kIsClosedPromise,
272295
isClosed,
273296
isDestroyed,
274297
isDuplexNodeStream,
275298
isFinished,
276299
isIterable,
277300
isReadableNodeStream,
301+
isReadableStream,
278302
isReadableEnded,
279303
isReadableFinished,
280304
isReadableErrored,
281305
isNodeStream,
282306
isWritable,
283307
isWritableNodeStream,
308+
isWritableStream,
284309
isWritableEnded,
285310
isWritableFinished,
286311
isWritableErrored,

lib/internal/webstreams/readablestream.js

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ const {
8585
kIsDisturbed,
8686
kIsErrored,
8787
kIsReadable,
88+
kIsClosedPromise,
8889
} = require('internal/streams/utils');
8990

9091
const {
@@ -258,9 +259,11 @@ class ReadableStream {
258259
port1: undefined,
259260
port2: undefined,
260261
promise: undefined,
261-
}
262+
},
262263
};
263264

265+
this[kIsClosedPromise] = createDeferredPromise();
266+
264267
// The spec requires handling of the strategy first
265268
// here. Specifically, if getting the size and
266269
// highWaterMark from the strategy fail, that has
@@ -652,8 +655,9 @@ function TransferredReadableStream() {
652655
writable: undefined,
653656
port: undefined,
654657
promise: undefined,
655-
}
658+
},
656659
};
660+
this[kIsClosedPromise] = createDeferredPromise();
657661
},
658662
[], ReadableStream));
659663
}
@@ -1213,8 +1217,9 @@ function createTeeReadableStream(start, pull, cancel) {
12131217
writable: undefined,
12141218
port: undefined,
12151219
promise: undefined,
1216-
}
1220+
},
12171221
};
1222+
this[kIsClosedPromise] = createDeferredPromise();
12181223
setupReadableStreamDefaultControllerFromSource(
12191224
this,
12201225
ObjectCreate(null, {
@@ -1887,6 +1892,7 @@ function readableStreamCancel(stream, reason) {
18871892
function readableStreamClose(stream) {
18881893
assert(stream[kState].state === 'readable');
18891894
stream[kState].state = 'closed';
1895+
stream[kIsClosedPromise].resolve();
18901896

18911897
const {
18921898
reader,
@@ -1908,6 +1914,8 @@ function readableStreamError(stream, error) {
19081914
assert(stream[kState].state === 'readable');
19091915
stream[kState].state = 'errored';
19101916
stream[kState].storedError = error;
1917+
stream[kIsClosedPromise].reject(error);
1918+
setPromiseHandled(stream[kIsClosedPromise].promise);
19111919

19121920
const {
19131921
reader

lib/internal/webstreams/writablestream.js

Lines changed: 13 additions & 1 deletion

0 commit comments

Comments
 (0)