stream: initial approach to include strategy options on Readable.toWeb() · nodejs/node@a057510 · GitHub
Skip to content

Commit a057510

Browse files
Warkanlockdanielleadams
authored andcommitted
stream: initial approach to include strategy options on Readable.toWeb()
PR-URL: #43515 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: James M Snell <jasnell@gmail.com>
1 parent f32aec8 commit a057510

4 files changed

Lines changed: 109 additions & 12 deletions

File tree

doc/api/stream.md

Lines changed: 5 additions & 1 deletion

lib/internal/streams/readable.js

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1405,8 +1405,10 @@ Readable.fromWeb = function(readableStream, options) {
14051405
options);
14061406
};
14071407

1408-
Readable.toWeb = function(streamReadable) {
1409-
return lazyWebStreams().newReadableStreamFromStreamReadable(streamReadable);
1408+
Readable.toWeb = function(streamReadable, options) {
1409+
return lazyWebStreams().newReadableStreamFromStreamReadable(
1410+
streamReadable,
1411+
options);
14101412
};
14111413

14121414
Readable.wrap = function(src, options) {

lib/internal/webstreams/adapters.js

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -359,10 +359,14 @@ function newStreamWritableFromWritableStream(writableStream, options = kEmptyObj
359359
}
360360

361361
/**
362+
* @typedef {import('./queuingstrategies').QueuingStrategy} QueuingStrategy
362363
* @param {Readable} streamReadable
364+
* @param {{
365+
* strategy : QueuingStrategy
366+
* }} [options]
363367
* @returns {ReadableStream}
364368
*/
365-
function newReadableStreamFromStreamReadable(streamReadable) {
369+
function newReadableStreamFromStreamReadable(streamReadable, options = kEmptyObject) {
366370
// Not using the internal/streams/utils isReadableNodeStream utility
367371
// here because it will return false if streamReadable is a Duplex
368372
// whose readable option is false. For a Duplex that is not readable,
@@ -382,14 +386,26 @@ function newReadableStreamFromStreamReadable(streamReadable) {
382386

383387
const objectMode = streamReadable.readableObjectMode;
384388
const highWaterMark = streamReadable.readableHighWaterMark;
385-
// When not running in objectMode explicitly, we just fall
386-
// back to a minimal strategy that just specifies the highWaterMark
387-
// and no size algorithm. Using a ByteLengthQueuingStrategy here
388-
// is unnecessary.
389-
const strategy =
390-
objectMode ?
391-
new CountQueuingStrategy({ highWaterMark }) :
392-
{ highWaterMark };
389+
390+
const evaluateStrategyOrFallback = (strategy) => {
391+
// If there is a strategy available, use it
392+
if (strategy)
393+
return strategy;
394+
395+
if (objectMode) {
396+
// When running in objectMode explicitly but no strategy, we just fall
397+
// back to CountQueuingStrategy
398+
return new CountQueuingStrategy({ highWaterMark });
399+
}
400+
401+
// When not running in objectMode explicitly, we just fall
402+
// back to a minimal strategy that just specifies the highWaterMark
403+
// and no size algorithm. Using a ByteLengthQueuingStrategy here
404+
// is unnecessary.
405+
return { highWaterMark };
406+
};
407+
408+
const strategy = evaluateStrategyOrFallback(options?.strategy);
393409

394410
let controller;
395411

Lines changed: 75 additions & 0 deletions

0 commit comments

Comments
 (0)