From 9e427dd15056d17302bcbbe041996d69d8ddc579 Mon Sep 17 00:00:00 2001 From: "Kamat, Trivikram" <16024985+trivikr@users.noreply.github.com> Date: Fri, 22 May 2026 19:29:24 -0700 Subject: [PATCH 1/3] stream: reject pull() reads on abort Make pull() race pending source reads against the provided AbortSignal so aborting can reject a pending next() even when the source is waiting before yielding data. Fixes: https://github.com/nodejs/node/issues/63497 Signed-off-by: Kamat, Trivikram <16024985+trivikr@users.noreply.github.com> Assisted-by: openai:gpt-5.5 --- lib/internal/streams/iter/pull.js | 83 ++++++++++++++++++-- test/parallel/test-stream-iter-pull-async.js | 40 ++++++++++ 2 files changed, 117 insertions(+), 6 deletions(-) diff --git a/lib/internal/streams/iter/pull.js b/lib/internal/streams/iter/pull.js index 3ff88b251d182a..62781047a6b358 100644 --- a/lib/internal/streams/iter/pull.js +++ b/lib/internal/streams/iter/pull.js @@ -13,6 +13,9 @@ const { ArrayPrototypePush, ArrayPrototypeSlice, PromisePrototypeThen, + PromiseWithResolvers, + SafePromisePrototypeFinally, + SafePromiseRace, SymbolAsyncIterator, SymbolIterator, TypedArrayPrototypeGetByteLength, @@ -607,6 +610,77 @@ async function* applyValidatedStatefulAsyncTransform(source, transform, options) options.signal?.throwIfAborted(); } +/** + * Read one item from an async iterator, rejecting early if the signal aborts. + * @param {AsyncIterator} iterator - The iterator to read from. + * @param {AbortSignal|undefined} signal - Optional abort signal. + * @returns {Promise>|IteratorResult} + */ +function abortableNext(iterator, signal) { + if (signal === undefined) { + return iterator.next(); + } + + signal.throwIfAborted(); + + const next = iterator.next(); + const { promise, reject } = PromiseWithResolvers(); + const onAbort = () => reject(signal.reason); + signal.addEventListener('abort', onAbort, { __proto__: null, once: true }); + if (signal.aborted) { + onAbort(); + } + + return SafePromisePrototypeFinally(SafePromiseRace([next, promise]), () => { + signal.removeEventListener('abort', onAbort); + }); +} + +/** + * Wrap an async source so each pending read is abort-aware. + * @param {AsyncIterable} source - The source to read from. + * @param {AbortSignal|undefined} signal - Optional abort signal. + * @returns {AsyncIterable} + */ +function yieldAbortable(source, signal) { + if (signal === undefined) { + return source; + } + + return { + __proto__: null, + async *[SymbolAsyncIterator]() { + const iterator = source[SymbolAsyncIterator](); + let completed = false; + let aborted = false; + + try { + while (true) { + const { done, value } = await abortableNext(iterator, signal); + if (done) { + completed = true; + return; + } + signal.throwIfAborted(); + yield value; + } + } catch (error) { + aborted = signal.aborted; + throw error; + } finally { + if (!completed && typeof iterator.return === 'function') { + const result = iterator.return(); + if (aborted) { + PromisePrototypeThen(result, undefined, () => {}); + } else { + await result; + } + } + } + }, + }; +} + /** * Create an async pipeline from source through transforms. * @yields {Uint8Array[]} @@ -615,17 +689,14 @@ async function* createAsyncPipeline(source, transforms, signal) { // Check for abort signal?.throwIfAborted(); - const normalized = source; - // Fast path: no transforms, just yield normalized source directly if (transforms.length === 0) { - for await (const batch of normalized) { - signal?.throwIfAborted(); - yield batch; - } + yield* yieldAbortable(source, signal); return; } + const normalized = yieldAbortable(source, signal); + // Create internal controller for transform cancellation. // Note: if signal was already aborted, we threw above - no need to check here. const controller = new AbortController(); diff --git a/test/parallel/test-stream-iter-pull-async.js b/test/parallel/test-stream-iter-pull-async.js index 157cc5e265ea34..c75a4d305503b9 100644 --- a/test/parallel/test-stream-iter-pull-async.js +++ b/test/parallel/test-stream-iter-pull-async.js @@ -156,6 +156,44 @@ async function testPullSignalAbortMidIteration() { await assert.rejects(() => iter.next(), { name: 'AbortError' }); } +async function testPullSignalAbortWhileSourceNextPending() { + const source = { + [Symbol.asyncIterator]() { + return { + async next() { + await new Promise(() => {}); + }, + }; + }, + }; + const ac = new AbortController(); + const iter = pull(source, { signal: ac.signal })[Symbol.asyncIterator](); + const next = iter.next(); + ac.abort(); + await assert.rejects(next, { name: 'AbortError' }); +} + +async function testPullSignalAbortWithTransformWhileSourceNextPending() { + const source = { + [Symbol.asyncIterator]() { + return { + async next() { + await new Promise(() => {}); + }, + }; + }, + }; + const ac = new AbortController(); + const iter = pull( + source, + (chunks) => chunks, + { signal: ac.signal }, + )[Symbol.asyncIterator](); + const next = iter.next(); + ac.abort(); + await assert.rejects(next, { name: 'AbortError' }); +} + // Pull consumer break (return()) cleans up transform signal async function testPullConsumerBreakCleanup() { let signalAborted = false; @@ -351,6 +389,8 @@ async function testTransformOptionsNotShared() { testPullSourceError(), testTapCallbackError(), testPullSignalAbortMidIteration(), + testPullSignalAbortWhileSourceNextPending(), + testPullSignalAbortWithTransformWhileSourceNextPending(), testPullConsumerBreakCleanup(), testPullTransformReturnsPromise(), testPullTransformYieldsStrings(), From fb086906746f0b20c06611122a685ca04af156cf Mon Sep 17 00:00:00 2001 From: "Kamat, Trivikram" <16024985+trivikr@users.noreply.github.com> Date: Sat, 23 May 2026 17:13:15 -0700 Subject: [PATCH 2/3] stream: move abort callback helper out of closure --- lib/internal/streams/iter/pull.js | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/lib/internal/streams/iter/pull.js b/lib/internal/streams/iter/pull.js index 62781047a6b358..8653256e6081c8 100644 --- a/lib/internal/streams/iter/pull.js +++ b/lib/internal/streams/iter/pull.js @@ -610,6 +610,10 @@ async function* applyValidatedStatefulAsyncTransform(source, transform, options) options.signal?.throwIfAborted(); } +function getOnAbort(reject, signal) { + return () => reject(signal.reason); +} + /** * Read one item from an async iterator, rejecting early if the signal aborts. * @param {AsyncIterator} iterator - The iterator to read from. @@ -625,7 +629,7 @@ function abortableNext(iterator, signal) { const next = iterator.next(); const { promise, reject } = PromiseWithResolvers(); - const onAbort = () => reject(signal.reason); + const onAbort = getOnAbort(reject, signal); signal.addEventListener('abort', onAbort, { __proto__: null, once: true }); if (signal.aborted) { onAbort(); From 5db38ba62981f6dd51485f2c5e99c4e7fa46b6bc Mon Sep 17 00:00:00 2001 From: "Kamat, Trivikram" <16024985+trivikr@users.noreply.github.com> Date: Sat, 23 May 2026 17:16:54 -0700 Subject: [PATCH 3/3] stream: use markPromiseAsHandled in pull iterator abort cleanup --- lib/internal/streams/iter/pull.js | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/lib/internal/streams/iter/pull.js b/lib/internal/streams/iter/pull.js index 8653256e6081c8..dc62499db3dd1d 100644 --- a/lib/internal/streams/iter/pull.js +++ b/lib/internal/streams/iter/pull.js @@ -22,6 +22,10 @@ const { Uint8Array, } = primordials; +const { + markPromiseAsHandled, +} = internalBinding('util'); + const { codes: { ERR_INVALID_ARG_TYPE, @@ -675,7 +679,7 @@ function yieldAbortable(source, signal) { if (!completed && typeof iterator.return === 'function') { const result = iterator.return(); if (aborted) { - PromisePrototypeThen(result, undefined, () => {}); + markPromiseAsHandled(result); } else { await result; }