stream: do not pass `readable.compose()` output via `Readable.from()` · nodejs/node@f5233df · GitHub
Skip to content

Commit f5233df

Browse files
Renegade334aduh95
authored andcommitted
stream: do not pass readable.compose() output via Readable.from()
PR-URL: #60907 Fixes: #55203 Reviewed-By: Raz Luvaton <rluvaton@gmail.com> Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
1 parent 2d72ea6 commit f5233df

4 files changed

Lines changed: 68 additions & 40 deletions

File tree

doc/api/stream.md

Lines changed: 12 additions & 6 deletions

lib/internal/streams/operators.js

Lines changed: 0 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ const { AbortController, AbortSignal } = require('internal/abort_controller');
1818
const {
1919
AbortError,
2020
codes: {
21-
ERR_INVALID_ARG_VALUE,
2221
ERR_MISSING_ARGS,
2322
ERR_OUT_OF_RANGE,
2423
},
@@ -31,40 +30,10 @@ const {
3130
} = require('internal/validators');
3231
const { kWeakHandler, kResistStopPropagation } = require('internal/event_target');
3332
const { finished } = require('internal/streams/end-of-stream');
34-
const staticCompose = require('internal/streams/compose');
35-
const {
36-
addAbortSignalNoValidate,
37-
} = require('internal/streams/add-abort-signal');
38-
const { isWritable, isNodeStream } = require('internal/streams/utils');
3933

4034
const kEmpty = Symbol('kEmpty');
4135
const kEof = Symbol('kEof');
4236

43-
function compose(stream, options) {
44-
if (options != null) {
45-
validateObject(options, 'options');
46-
}
47-
if (options?.signal != null) {
48-
validateAbortSignal(options.signal, 'options.signal');
49-
}
50-
51-
if (isNodeStream(stream) && !isWritable(stream)) {
52-
throw new ERR_INVALID_ARG_VALUE('stream', stream, 'must be writable');
53-
}
54-
55-
const composedStream = staticCompose(this, stream);
56-
57-
if (options?.signal) {
58-
// Not validating as we already validated before
59-
addAbortSignalNoValidate(
60-
options.signal,
61-
composedStream,
62-
);
63-
}
64-
65-
return composedStream;
66-
}
67-
6837
function map(fn, options) {
6938
validateFunction(fn, 'fn');
7039
if (options != null) {
@@ -408,7 +377,6 @@ module.exports.streamReturningOperators = {
408377
flatMap,
409378
map,
410379
take,
411-
compose,
412380
};
413381

414382
module.exports.promiseReturningOperators = {

lib/internal/streams/readable.js

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ const { Buffer } = require('buffer');
4747

4848
const {
4949
addAbortSignal,
50+
addAbortSignalNoValidate,
5051
} = require('internal/streams/add-abort-signal');
5152
const { eos } = require('internal/streams/end-of-stream');
5253

@@ -85,7 +86,10 @@ const {
8586
ERR_UNKNOWN_ENCODING,
8687
},
8788
} = require('internal/errors');
88-
const { validateObject } = require('internal/validators');
89+
const {
90+
validateAbortSignal,
91+
validateObject,
92+
} = require('internal/validators');
8993

9094
const FastBuffer = Buffer[SymbolSpecies];
9195

@@ -1409,6 +1413,30 @@ async function* createAsyncIterator(stream, options) {
14091413
}
14101414
}
14111415

1416+
let composeImpl;
1417+
1418+
Readable.prototype.compose = function compose(stream, options) {
1419+
if (options != null) {
1420+
validateObject(options, 'options');
1421+
}
1422+
if (options?.signal != null) {
1423+
validateAbortSignal(options.signal, 'options.signal');
1424+
}
1425+
1426+
composeImpl ??= require('internal/streams/compose');
1427+
const composedStream = composeImpl(this, stream);
1428+
1429+
if (options?.signal) {
1430+
// Not validating as we already validated before
1431+
addAbortSignalNoValidate(
1432+
options.signal,
1433+
composedStream,
1434+
);
1435+
}
1436+
1437+
return composedStream;
1438+
};
1439+
14121440
// Making it explicit these properties are not enumerable
14131441
// because otherwise some prototype manipulation in
14141442
// userland will fail.

test/parallel/test-stream-compose-operator.js renamed to test/parallel/test-stream-readable-compose.js

Lines changed: 27 additions & 1 deletion

0 commit comments

Comments
 (0)