@@ -109,7 +109,7 @@ async function* fromReadable(val) {
109109 yield * Readable . prototype [ SymbolAsyncIterator ] . call ( val ) ;
110110}
111111
112- async function pump ( iterable , writable , finish ) {
112+ async function pump ( iterable , writable , finish , opts ) {
113113 let error ;
114114 let onresolve = null ;
115115
@@ -153,7 +153,9 @@ async function pump(iterable, writable, finish) {
153153 }
154154 }
155155
156- writable . end ( ) ;
156+ if ( opts ?. end !== false ) {
157+ writable . end ( ) ;
158+ }
157159
158160 await wait ( ) ;
159161
@@ -227,17 +229,22 @@ function pipelineImpl(streams, callback, opts) {
227229 const stream = streams [ i ] ;
228230 const reading = i < streams . length - 1 ;
229231 const writing = i > 0 ;
232+ const end = reading || opts ?. end !== false ;
230233
231234 if ( isNodeStream ( stream ) ) {
232- finishCount ++ ;
233- destroys . push ( destroyer ( stream , reading , writing , ( err ) => {
234- if ( ! err && ! reading && isReadableFinished ( stream , false ) ) {
235- stream . read ( 0 ) ;
236- destroyer ( stream , true , writing , finish ) ;
237- } else {
238- finish ( err ) ;
239- }
240- } ) ) ;
235+ if ( end ) {
236+ finishCount ++ ;
237+ destroys . push ( destroyer ( stream , reading , writing , ( err ) => {
238+ if ( ! err && ! reading && isReadableFinished ( stream , false ) ) {
239+ stream . read ( 0 ) ;
240+ destroyer ( stream , true , writing , finish ) ;
241+ } else {
242+ finish ( err ) ;
243+ }
244+ } ) ) ;
245+ } else {
246+ stream . on ( 'error' , finish ) ;
247+ }
241248 }
242249
243250 if ( i === 0 ) {
@@ -282,14 +289,17 @@ function pipelineImpl(streams, callback, opts) {
282289 then . call ( ret ,
283290 ( val ) => {
284291 value = val ;
285- pt . end ( val ) ;
292+ pt . write ( val ) ;
293+ if ( end ) {
294+ pt . end ( ) ;
295+ }
286296 } , ( err ) => {
287297 pt . destroy ( err ) ;
288298 } ,
289299 ) ;
290300 } else if ( isIterable ( ret , true ) ) {
291301 finishCount ++ ;
292- pump ( ret , pt , finish ) ;
302+ pump ( ret , pt , finish , { end } ) ;
293303 } else {
294304 throw new ERR_INVALID_RETURN_VALUE (
295305 'AsyncIterable or Promise' , 'destination' , ret ) ;
@@ -302,7 +312,7 @@ function pipelineImpl(streams, callback, opts) {
302312 }
303313 } else if ( isNodeStream ( stream ) ) {
304314 if ( isReadableNodeStream ( ret ) ) {
305- ret . pipe ( stream ) ;
315+ ret . pipe ( stream , { end } ) ;
306316
307317 // Compat. Before node v10.12.0 stdio used to throw an error so
308318 // pipe() did/does not end() stdio destinations.
@@ -314,7 +324,7 @@ function pipelineImpl(streams, callback, opts) {
314324 ret = makeAsyncIterable ( ret ) ;
315325
316326 finishCount ++ ;
317- pump ( ret , stream , finish ) ;
327+ pump ( ret , stream , finish , { end } ) ;
318328 }
319329 ret = stream ;
320330 } else {
0 commit comments