Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 59 additions & 48 deletions lib/internal/streams/iter/share.js
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ class ShareImpl {
resolve: null,
reject: null,
detached: false,
pendingNext: PromiseResolve(),
};

this.#consumers.add(state);
Expand All @@ -129,62 +130,72 @@ class ShareImpl {
return {
__proto__: null,
[SymbolAsyncIterator]() {
return {
__proto__: null,
async next() {
if (self.#sourceError) {
state.detached = true;
self.#consumers.delete(state);
throw self.#sourceError;
const getNext = async () => {
if (self.#sourceError) {
state.detached = true;
self.#consumers.delete(state);
throw self.#sourceError;
}

// Loop until we get data, source is exhausted, or
// consumer is detached. Multiple consumers may be woken
// after a single pull - those that find no data at their
// cursor must re-pull rather than terminating prematurely.
for (;;) {
if (state.detached) {
if (self.#sourceError) throw self.#sourceError;
return { __proto__: null, done: true, value: undefined };
}

// Loop until we get data, source is exhausted, or
// consumer is detached. Multiple consumers may be woken
// after a single pull - those that find no data at their
// cursor must re-pull rather than terminating prematurely.
for (;;) {
if (state.detached) {
if (self.#sourceError) throw self.#sourceError;
return { __proto__: null, done: true, value: undefined };
}
if (self.#cancelled) {
state.detached = true;
self.#deleteConsumer(state);
return { __proto__: null, done: true, value: undefined };
}

if (self.#cancelled) {
state.detached = true;
self.#deleteConsumer(state);
return { __proto__: null, done: true, value: undefined };
// Check if data is available in buffer
const bufferIndex = state.cursor - self.#bufferStart;
if (bufferIndex < self.#buffer.length) {
const chunk = self.#buffer.get(bufferIndex);
const cursor = state.cursor;
state.cursor++;
if (cursor === self.#cachedMinCursor &&
--self.#cachedMinCursorConsumers === 0) {
self.#tryTrimBuffer();
}
return { __proto__: null, done: false, value: chunk };
}

// Check if data is available in buffer
const bufferIndex = state.cursor - self.#bufferStart;
if (bufferIndex < self.#buffer.length) {
const chunk = self.#buffer.get(bufferIndex);
const cursor = state.cursor;
state.cursor++;
if (cursor === self.#cachedMinCursor &&
--self.#cachedMinCursorConsumers === 0) {
self.#tryTrimBuffer();
}
return { __proto__: null, done: false, value: chunk };
}
if (self.#sourceExhausted) {
state.detached = true;
self.#deleteConsumer(state);
if (self.#sourceError) throw self.#sourceError;
return { __proto__: null, done: true, value: undefined };
}

if (self.#sourceExhausted) {
state.detached = true;
self.#deleteConsumer(state);
if (self.#sourceError) throw self.#sourceError;
return { __proto__: null, done: true, value: undefined };
}
// Need to pull from source - check buffer limit
const canPull = await self.#waitForBufferSpace();
if (!canPull) {
state.detached = true;
self.#deleteConsumer(state);
if (self.#sourceError) throw self.#sourceError;
return { __proto__: null, done: true, value: undefined };
}

// Need to pull from source - check buffer limit
const canPull = await self.#waitForBufferSpace();
if (!canPull) {
state.detached = true;
self.#deleteConsumer(state);
if (self.#sourceError) throw self.#sourceError;
return { __proto__: null, done: true, value: undefined };
}
await self.#pullFromSource();
}
};

await self.#pullFromSource();
}
return {
__proto__: null,
next() {
const next = PromisePrototypeThen(
state.pendingNext,
getNext,
getNext);
state.pendingNext =
PromisePrototypeThen(next, undefined, () => {});
return next;
},

async return() {
Expand Down
19 changes: 19 additions & 0 deletions test/parallel/test-stream-iter-share-async.js
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,24 @@ async function testShareMultipleConsumersConcurrentPull() {
assert.strictEqual(t3, expected);
}

async function testShareConsumerConcurrentNextCalls() {
async function* source() {
const enc = new TextEncoder();
yield [enc.encode('first')];
yield [enc.encode('second')];
}

const shared = share(source());
const it = shared.pull()[Symbol.asyncIterator]();
const first = it.next();
const second = it.next();

const [r1, r2] = await Promise.all([first, second]);
const dec = new TextDecoder();
assert.strictEqual(dec.decode(r1.value[0]), 'first');
assert.strictEqual(dec.decode(r2.value[0]), 'second');
}

// share() accepts string source directly (normalized via from())
async function testShareStringSource() {
const shared = share('hello-share');
Expand All @@ -330,5 +348,6 @@ Promise.all([
testShareLateJoiningConsumer(),
testShareConsumerBreak(),
testShareMultipleConsumersConcurrentPull(),
testShareConsumerConcurrentNextCalls(),
testShareStringSource(),
]).then(common.mustCall());
Loading