diff --git a/lib/internal/streams/iter/broadcast.js b/lib/internal/streams/iter/broadcast.js index e6a404729d6e0b..e42c9cdce136f6 100644 --- a/lib/internal/streams/iter/broadcast.js +++ b/lib/internal/streams/iter/broadcast.js @@ -207,6 +207,11 @@ class BroadcastImpl { return kDone; } + if (state.resolve) { + state.detached = true; + return kDone; + } + const { promise, resolve, reject } = PromiseWithResolvers(); state.resolve = resolve; state.reject = reject; @@ -312,6 +317,9 @@ class BroadcastImpl { } consumer.resolve = null; consumer.reject = null; + if (consumer.detached && this.#deleteConsumer(consumer)) { + this.#tryTrimBuffer(); + } } } } @@ -396,6 +404,9 @@ class BroadcastImpl { consumer.resolve = null; consumer.reject = null; resolve({ __proto__: null, done: false, value: chunk }); + if (consumer.detached && this.#deleteConsumer(consumer)) { + this.#tryTrimBuffer(); + } } else { // Still waiting -- put back ArrayPrototypePush(this.#waiters, consumer); diff --git a/test/parallel/test-stream-iter-broadcast-basic.js b/test/parallel/test-stream-iter-broadcast-basic.js index ab2c81304ec2ac..9f216b451bc433 100644 --- a/test/parallel/test-stream-iter-broadcast-basic.js +++ b/test/parallel/test-stream-iter-broadcast-basic.js @@ -243,6 +243,27 @@ async function testLateJoinerSeesBufferedData() { assert.strictEqual(result, 'before-join'); } +async function testOverlappingNextKeepsEarlierRead() { + const { writer, broadcast: bc } = broadcast(); + const it = bc.push()[Symbol.asyncIterator](); + + const first = it.next(); + const second = it.next(); + + await writer.write('x'); + + assert.deepStrictEqual(await second, { + __proto__: null, + done: true, + value: undefined, + }); + + const result = await first; + assert.strictEqual(result.done, false); + assert.strictEqual(Buffer.concat(result.value).toString(), 'x'); + assert.strictEqual(bc.consumerCount, 0); +} + Promise.all([ testBasicBroadcast(), testMultipleWrites(), @@ -257,4 +278,5 @@ Promise.all([ testFailDetachesConsumers(), testWriterFailIdempotent(), testLateJoinerSeesBufferedData(), + testOverlappingNextKeepsEarlierRead(), ]).then(common.mustCall());