diff --git a/lib/internal/streams/iter/share.js b/lib/internal/streams/iter/share.js index 3cd24409222712..f755550712efa7 100644 --- a/lib/internal/streams/iter/share.js +++ b/lib/internal/streams/iter/share.js @@ -113,6 +113,7 @@ class ShareImpl { resolve: null, reject: null, detached: false, + pendingNext: PromiseResolve(), }; this.#consumers.add(state); @@ -129,62 +130,72 @@ class ShareImpl { return { __proto__: null, [SymbolAsyncIterator]() { - return { - __proto__: null, - async next() { - if (self.#sourceError) { - state.detached = true; - self.#consumers.delete(state); - throw self.#sourceError; + const getNext = async () => { + if (self.#sourceError) { + state.detached = true; + self.#consumers.delete(state); + throw self.#sourceError; + } + + // Loop until we get data, source is exhausted, or + // consumer is detached. Multiple consumers may be woken + // after a single pull - those that find no data at their + // cursor must re-pull rather than terminating prematurely. + for (;;) { + if (state.detached) { + if (self.#sourceError) throw self.#sourceError; + return { __proto__: null, done: true, value: undefined }; } - // Loop until we get data, source is exhausted, or - // consumer is detached. Multiple consumers may be woken - // after a single pull - those that find no data at their - // cursor must re-pull rather than terminating prematurely. - for (;;) { - if (state.detached) { - if (self.#sourceError) throw self.#sourceError; - return { __proto__: null, done: true, value: undefined }; - } + if (self.#cancelled) { + state.detached = true; + self.#deleteConsumer(state); + return { __proto__: null, done: true, value: undefined }; + } - if (self.#cancelled) { - state.detached = true; - self.#deleteConsumer(state); - return { __proto__: null, done: true, value: undefined }; + // Check if data is available in buffer + const bufferIndex = state.cursor - self.#bufferStart; + if (bufferIndex < self.#buffer.length) { + const chunk = self.#buffer.get(bufferIndex); + const cursor = state.cursor; + state.cursor++; + if (cursor === self.#cachedMinCursor && + --self.#cachedMinCursorConsumers === 0) { + self.#tryTrimBuffer(); } + return { __proto__: null, done: false, value: chunk }; + } - // Check if data is available in buffer - const bufferIndex = state.cursor - self.#bufferStart; - if (bufferIndex < self.#buffer.length) { - const chunk = self.#buffer.get(bufferIndex); - const cursor = state.cursor; - state.cursor++; - if (cursor === self.#cachedMinCursor && - --self.#cachedMinCursorConsumers === 0) { - self.#tryTrimBuffer(); - } - return { __proto__: null, done: false, value: chunk }; - } + if (self.#sourceExhausted) { + state.detached = true; + self.#deleteConsumer(state); + if (self.#sourceError) throw self.#sourceError; + return { __proto__: null, done: true, value: undefined }; + } - if (self.#sourceExhausted) { - state.detached = true; - self.#deleteConsumer(state); - if (self.#sourceError) throw self.#sourceError; - return { __proto__: null, done: true, value: undefined }; - } + // Need to pull from source - check buffer limit + const canPull = await self.#waitForBufferSpace(); + if (!canPull) { + state.detached = true; + self.#deleteConsumer(state); + if (self.#sourceError) throw self.#sourceError; + return { __proto__: null, done: true, value: undefined }; + } - // Need to pull from source - check buffer limit - const canPull = await self.#waitForBufferSpace(); - if (!canPull) { - state.detached = true; - self.#deleteConsumer(state); - if (self.#sourceError) throw self.#sourceError; - return { __proto__: null, done: true, value: undefined }; - } + await self.#pullFromSource(); + } + }; - await self.#pullFromSource(); - } + return { + __proto__: null, + next() { + const next = PromisePrototypeThen( + state.pendingNext, + getNext, + getNext); + state.pendingNext = + PromisePrototypeThen(next, undefined, () => {}); + return next; }, async return() { diff --git a/test/parallel/test-stream-iter-share-async.js b/test/parallel/test-stream-iter-share-async.js index 076fe0a4037aa0..2633ae64e2b907 100644 --- a/test/parallel/test-stream-iter-share-async.js +++ b/test/parallel/test-stream-iter-share-async.js @@ -309,6 +309,24 @@ async function testShareMultipleConsumersConcurrentPull() { assert.strictEqual(t3, expected); } +async function testShareConsumerConcurrentNextCalls() { + async function* source() { + const enc = new TextEncoder(); + yield [enc.encode('first')]; + yield [enc.encode('second')]; + } + + const shared = share(source()); + const it = shared.pull()[Symbol.asyncIterator](); + const first = it.next(); + const second = it.next(); + + const [r1, r2] = await Promise.all([first, second]); + const dec = new TextDecoder(); + assert.strictEqual(dec.decode(r1.value[0]), 'first'); + assert.strictEqual(dec.decode(r2.value[0]), 'second'); +} + // share() accepts string source directly (normalized via from()) async function testShareStringSource() { const shared = share('hello-share'); @@ -330,5 +348,6 @@ Promise.all([ testShareLateJoiningConsumer(), testShareConsumerBreak(), testShareMultipleConsumersConcurrentPull(), + testShareConsumerConcurrentNextCalls(), testShareStringSource(), ]).then(common.mustCall());