@@ -273,6 +273,17 @@ function* processTransformResultSync(result) {
273273 result ) ;
274274}
275275
276+ /**
277+ * Append normalized transform result batches to an array (sync).
278+ * @param {Array<Uint8Array[]> } target
279+ * @param {* } result
280+ */
281+ function appendTransformResultSync ( target , result ) {
282+ for ( const batch of processTransformResultSync ( result ) ) {
283+ ArrayPrototypePush ( target , batch ) ;
284+ }
285+ }
286+
276287/**
277288 * Process transform result (async).
278289 * @yields {Uint8Array[]}
@@ -356,6 +367,18 @@ async function* processTransformResultAsync(result) {
356367 result ) ;
357368}
358369
370+ /**
371+ * Append normalized transform result batches to an array (async).
372+ * @param {Array<Uint8Array[]> } target
373+ * @param {* } result
374+ * @returns {Promise<void> }
375+ */
376+ async function appendTransformResultAsync ( target , result ) {
377+ for await ( const batch of processTransformResultAsync ( result ) ) {
378+ ArrayPrototypePush ( target , batch ) ;
379+ }
380+ }
381+
359382// =============================================================================
360383// Sync Pipeline Implementation
361384// =============================================================================
@@ -398,18 +421,19 @@ function* applyFusedStatelessSyncTransforms(source, run) {
398421 yield * processTransformResultSync ( current ) ;
399422 }
400423 }
401- // Flush
402- let current = null ;
424+ // Flush each transform after all upstream data, including data emitted by
425+ // earlier flushes, has been processed by that transform.
426+ let pending = [ ] ;
403427 for ( let i = 0 ; i < run . length ; i ++ ) {
404- const result = run [ i ] ( current ) ;
405- if ( result === null ) {
406- current = null ;
407- continue ;
428+ const next = [ ] ;
429+ for ( let j = 0 ; j < pending . length ; j ++ ) {
430+ appendTransformResultSync ( next , run [ i ] ( pending [ j ] ) ) ;
408431 }
409- current = result ;
432+ appendTransformResultSync ( next , run [ i ] ( null ) ) ;
433+ pending = next ;
410434 }
411- if ( current != null ) {
412- yield * processTransformResultSync ( current ) ;
435+ for ( let i = 0 ; i < pending . length ; i ++ ) {
436+ yield pending [ i ] ;
413437 }
414438}
415439
@@ -522,30 +546,23 @@ async function* applyFusedStatelessAsyncTransforms(source, run, signal) {
522546 yield * processTransformResultAsync ( current ) ;
523547 }
524548 }
525- // Flush: send null through each transform in order
526- let current = null ;
549+ // Flush each transform after all upstream data, including data emitted by
550+ // earlier flushes, has been processed by that transform.
551+ let pending = [ ] ;
527552 for ( let i = 0 ; i < run . length ; i ++ ) {
528- const result = run [ i ] ( current , { __proto__ : null , signal } ) ;
529- if ( result === null ) {
530- current = null ;
531- continue ;
532- }
533- if ( isPromise ( result ) ) {
534- current = await result ;
535- } else {
536- current = result ;
553+ const next = [ ] ;
554+ for ( let j = 0 ; j < pending . length ; j ++ ) {
555+ await appendTransformResultAsync (
556+ next ,
557+ run [ i ] ( pending [ j ] , { __proto__ : null , signal } ) ) ;
537558 }
559+ await appendTransformResultAsync (
560+ next ,
561+ run [ i ] ( null , { __proto__ : null , signal } ) ) ;
562+ pending = next ;
538563 }
539- if ( current !== null ) {
540- if ( isUint8ArrayBatch ( current ) ) {
541- if ( current . length > 0 ) yield current ;
542- } else if ( isUint8Array ( current ) ) {
543- yield [ current ] ;
544- } else if ( typeof current === 'string' ) {
545- yield [ toUint8Array ( current ) ] ;
546- } else {
547- yield * processTransformResultAsync ( current ) ;
548- }
564+ for ( let i = 0 ; i < pending . length ; i ++ ) {
565+ yield pending [ i ] ;
549566 }
550567}
551568
0 commit comments