Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
- 🚨 **NEVER use `npx`, `pnpm dlx`, or `yarn dlx`** — use `pnpm exec <package>` for devDep binaries, or `pnpm run <script>` for package.json scripts. If a tool is needed, add it as a pinned devDependency first.
- **minimumReleaseAge**: NEVER add packages to `minimumReleaseAgeExclude` in CI. Locally, ASK before adding — the age threshold is a security control.
- File existence: ALWAYS `existsSync` from `node:fs`. NEVER `fs.access`, `fs.stat`-for-existence, or an async `fileExists` wrapper. Import form: `import { existsSync, promises as fs } from 'node:fs'`.
- `Promise.race` / `Promise.any`: NEVER pass a long-lived promise (interrupt signal, pool member) into a race inside a loop. Each call re-attaches `.then` handlers to every arm; handlers accumulate on surviving promises until they settle. For concurrency limiters, use a single-waiter "slot available" signal (resolved by each task's `.then`) instead of re-racing `executing[]`. See nodejs/node#17469 and `@watchable/unpromise`. Race with two fresh arms (e.g. one-shot `withTimeout`) is safe.

---

Expand Down
65 changes: 50 additions & 15 deletions src/socket-sdk-class.ts
Original file line number Diff line number Diff line change
Expand Up @@ -867,10 +867,52 @@ export class SocketSdk {
/* c8 ignore stop */
const { components } = componentsObj
const { length: componentsCount } = components
const running = new Map<
AsyncGenerator<BatchPackageFetchResultType>,
Promise<GeneratorStep>
>()
// Tracks in-flight generators only for pool-size accounting.
// Completed steps and errors flow through the single-waiter queue below,
// not through per-generator promises re-raced each iteration — repeated
// Promise.race() over the same pool accumulates unreleased .then
// handlers on each still-pending arm until the pool drains.
// See https://github.com/nodejs/node/issues/17469.
const running = new Set<AsyncGenerator<BatchPackageFetchResultType>>()
const completed: GeneratorStep[] = []
let waiter:
| {
reject: (err: unknown) => void
resolve: (step: GeneratorStep) => void
}
| undefined
let pendingError: { err: unknown } | undefined
const deliverStep = (step: GeneratorStep) => {
if (waiter) {
const w = waiter
waiter = undefined
w.resolve(step)
} else {
completed.push(step)
}
}
const deliverError = (err: unknown) => {
if (waiter) {
const w = waiter
waiter = undefined
w.reject(err)
} else if (!pendingError) {
pendingError = { err }
}
}
const takeStep = (): Promise<GeneratorStep> => {
if (pendingError) {
const { err } = pendingError
pendingError = undefined
return Promise.reject(err)
}
if (completed.length) {
return Promise.resolve(completed.shift()!)
}
const { promise, reject, resolve } = promiseWithResolvers<GeneratorStep>()
waiter = { reject, resolve }
return promise
}
let index = 0
const enqueueGen = () => {
if (index >= componentsCount) {
Expand All @@ -888,17 +930,12 @@ export class SocketSdk {
const continueGen = (
generator: AsyncGenerator<BatchPackageFetchResultType>,
) => {
const {
promise,
reject: rejectFn,
resolve: resolveFn,
} = promiseWithResolvers<GeneratorStep>()
running.set(generator, promise)
running.add(generator)
void generator
.next()
.then(
iteratorResult => resolveFn({ generator, iteratorResult }),
rejectFn,
iteratorResult => deliverStep({ generator, iteratorResult }),
deliverError,
)
}
// Start initial batch of generators.
Expand All @@ -907,9 +944,7 @@ export class SocketSdk {
}
while (running.size > 0) {
// eslint-disable-next-line no-await-in-loop
const { generator, iteratorResult }: GeneratorStep = await Promise.race(
running.values(),
)
const { generator, iteratorResult }: GeneratorStep = await takeStep()
running.delete(generator)
// Yield the value if one is given, even when done:true.
if (iteratorResult.value) {
Expand Down