diff --git a/.changeset/spotty-eels-warn.md b/.changeset/spotty-eels-warn.md new file mode 100644 index 00000000..c306b92d --- /dev/null +++ b/.changeset/spotty-eels-warn.md @@ -0,0 +1,11 @@ +--- +"partysocket": minor +--- + +Make buffered messages and connection teardown reliable across socket replacement and explicit close (see cloudflare/agents#1738). + +- **`close()` now dispatches its `close` event synchronously** (mirroring how `reconnect()` already dispatched its synthetic close). Consumers that detach their listeners right after closing — like the React hooks during cleanup — previously never observed the terminal close event, leaving "connection closed" handling (pending-call rejection, state resets) to never run. After `close()` returns, `readyState` reports `CLOSED` immediately, even while the underlying socket finishes its closing handshake. Code that attached a `close` listener _after_ calling `close()` and relied on the event arriving asynchronously must attach the listener first. +- **`send()` now returns a `boolean`**: `true` if the message was transmitted immediately over an open connection, `false` if it was buffered (delivered when the connection next opens, always before the `open` event is dispatched) or dropped because `maxEnqueuedMessages` was reached. Callers implementing request/response protocols can use this to know whether a request is actually in flight. +- **New `drainQueuedMessages()` method** removes and returns all messages that were buffered by `send()` but never transmitted, so a socket that's being discarded can hand its unsent buffer to a replacement instead of silently losing it. +- **React hooks (`usePartySocket`, `useWebSocket`) now migrate buffered messages when they replace the socket** because connection options changed. By default, buffered messages transfer only when the destination is unchanged (e.g. only `query` — credentials — changed); if destination options (`room`, `party`, `path`, `host`, URL, ...) changed, the messages are discarded with a warning rather than delivered to a destination they weren't composed for. The new `transferEnqueuedMessages` option overrides this: `true` always transfers, `false` never does. +- **Warn when `send()` is called after `close()`.** Messages sent against a permanently closed socket are buffered into a queue that nothing will ever flush (unless `reconnect()` is called later), which silently strands the message — and any caller waiting on a reply. This usually indicates a stale socket reference in the caller. The message is still buffered for backwards compatibility; the warning is emitted once per close cycle and resets on `reconnect()`. diff --git a/package-lock.json b/package-lock.json index 461eaf48..21404848 100644 --- a/package-lock.json +++ b/package-lock.json @@ -11382,7 +11382,7 @@ "devDependencies": { "@cloudflare/workers-types": "^4.20260426.1", "hono": "^4.12.15", - "partyserver": "^0.5.5" + "partyserver": "^0.5.6" }, "peerDependencies": { "@cloudflare/workers-types": "^4.20260424.1", @@ -11410,7 +11410,7 @@ "license": "ISC", "dependencies": { "nanoid": "^5.1.9", - "partysocket": "^1.1.18" + "partysocket": "^1.1.19" } }, "packages/partyfn/node_modules/nanoid": { @@ -11515,8 +11515,8 @@ "license": "ISC", "devDependencies": { "@cloudflare/workers-types": "^4.20260426.1", - "partyserver": "^0.5.5", - "partysocket": "^1.1.18" + "partyserver": "^0.5.6", + "partysocket": "^1.1.19" }, "peerDependencies": { "@cloudflare/workers-types": "^4.20260424.1", @@ -11534,8 +11534,8 @@ "devDependencies": { "@cloudflare/workers-types": "^4.20260426.1", "partyfn": "^0.1.0", - "partyserver": "^0.5.5", - "partysocket": "^1.1.18" + "partyserver": "^0.5.6", + "partysocket": "^1.1.19" }, "peerDependencies": { "@cloudflare/workers-types": "^4.20260424.1", @@ -11575,7 +11575,7 @@ "license": "ISC", "dependencies": { "cron-parser": "^5.5.0", - "partyserver": "^0.5.5" + "partyserver": "^0.5.6" } }, "packages/y-partyserver": { @@ -11591,7 +11591,7 @@ "@cloudflare/workers-types": "^4.20260426.1", "@types/lodash.debounce": "^4.0.9", "@types/node": "25.6.0", - "partyserver": "^0.5.5", + "partyserver": "^0.5.6", "ws": "^8.20.0", "yjs": "^13.6.30" }, diff --git a/packages/hono-party/package.json b/packages/hono-party/package.json index b85e97a8..bf1746ad 100644 --- a/packages/hono-party/package.json +++ b/packages/hono-party/package.json @@ -37,6 +37,6 @@ "devDependencies": { "@cloudflare/workers-types": "^4.20260426.1", "hono": "^4.12.15", - "partyserver": "^0.5.5" + "partyserver": "^0.5.6" } } diff --git a/packages/partyfn/package.json b/packages/partyfn/package.json index 7a38bb5d..2228d895 100644 --- a/packages/partyfn/package.json +++ b/packages/partyfn/package.json @@ -19,7 +19,7 @@ ], "dependencies": { "nanoid": "^5.1.9", - "partysocket": "^1.1.18" + "partysocket": "^1.1.19" }, "scripts": { "build": "tsx scripts/build.ts" diff --git a/packages/partyserver/CHANGELOG.md b/packages/partyserver/CHANGELOG.md index a8328462..29305a94 100644 --- a/packages/partyserver/CHANGELOG.md +++ b/packages/partyserver/CHANGELOG.md @@ -31,7 +31,6 @@ The fix is at the call site, not in PartyServer: pass `id: someBoundDONamespace.idFromName(facetName)` to `ctx.facets.get(...)`. The facet then gets its own native `ctx.id.name === facetName` and PartyServer's `name` getter does the right thing automatically. No `setName()` is required, no `__ps_name` storage record is written, and cold-wake recovery happens for free because the factory re-runs and `idFromName` is deterministic. This release adds: - - **A "Using PartyServer with Durable Object Facets" section in the README** that walks through the recommended pattern with a code example, calls out the implicit-id footgun explicitly, and documents that plain-string `id` values are not a substitute for `idFromName(facetName)` (workerd treats string ids as `idFromString`-like, so the resulting facet has no `ctx.id.name`). - **`setName()` docstring updated** to clarify that facets are NOT a `setName()` use case — point to the explicit-`id` pattern instead. The original `setName()` `ctx.id.name` mismatch throw is preserved as a typo guard for the `idFromName` happy path. - **End-to-end facet test coverage** against the real workerd `ctx.facets.get(...)` API. A `FacetParent` / `FacetChild` fixture exercises both the implicit-id path (pinning the runtime contract that `this.name` returns the parent's name in that flow — i.e., behavior-as-documentation so framework authors are unsurprised) and the explicit-id path (recommended; verifies that all reasonable id-construction strategies work and that cold wake recovers without any storage record). Plain-string `id` is also tested; the test asserts it does NOT carry a name, pinning the contract so callers don't get tempted by the type signature. @@ -60,7 +59,6 @@ ``` Backward compatible: - - For DOs addressed via `idFromName()` / `getByName()` (the happy path), `setName()` continues to NOT write storage — `ctx.id.name` is the source of truth and `setName()` is just a no-op-plus-onStart. - The pre-existing direct-storage-write pattern keeps working — the storage write becomes idempotent with what `setName()` would do. @@ -75,7 +73,6 @@ 0.5.0 moved the legacy storage hydrate into `alarm()` only, breaking Cloudflare Agents facets and any other framework that writes `__ps_name` directly before calling `__unsafe_ensureInitialized()`. Facet DOs are spawned via `ctx.facets.get(...)` rather than `idFromName()` and therefore have `ctx.id.name === undefined`; they relied on PartyServer reading the storage record back to populate `this.name` before `onStart()`. Changes: - - Move the legacy `__ps_name` hydrate from `alarm()` into `#ensureInitialized()`, still gated on `!ctx.id.name && !#_name` so it costs nothing on the happy path (normal `idFromName()`/`getByName()` DOs skip the storage read entirely). - `Server.fetch()` now delegates to `#ensureInitialized()` for the hydrate instead of doing its own. The `x-partykit-room` header fallback remains as a last resort when neither `ctx.id.name` nor a legacy storage record is available. - `Server.alarm()` is simplified — it no longer needs its own hydrate call since `#ensureInitialized()` handles it. @@ -90,7 +87,6 @@ Durable Objects now expose `ctx.id.name` on every entry point (constructor, fetch, alarm, hibernating websocket handlers) when the DO is addressed via `idFromName()`/`getByName()`. PartyServer now uses this as the primary source of `this.name`, which simplifies routing, eliminates storage writes, and makes `this.name` available inside the constructor. Changes in `partyserver`: - - `this.name` resolves from `this.ctx.id.name`. The apologetic `workerd#2240` error message is gone. - `this.name` is now available **inside the constructor** and from class field initializers, not just after `setName()`/`fetch()` has run. - `routePartykitRequest` no longer issues a `setName()`/`_initAndFetch()` RPC before `fetch()`. The WebSocket path goes from 2 RPCs to 1; the HTTP path remains 1 RPC. Props, when supplied, are delivered to the DO via the `x-partykit-props` request header, set after `onBeforeConnect`/`onBeforeRequest` hooks run. @@ -102,7 +98,6 @@ - When reading `this.name` throws, it is because `ctx.id.name` is undefined and no legacy fallback has populated the name: the DO was addressed via `idFromString()` or `newUniqueId()` (both unsupported), the runtime is too old to expose `ctx.id.name`, or a pre-2026-03-15 alarm fired before the legacy storage fallback ran. Changes in all affected packages (`partyserver`, `partysub`, `partysync`, `y-partyserver`, `hono-party`): - - `@cloudflare/workers-types` peer dependency bumped from `^4.20240729.0` to `^4.20260424.1`. The old range predates `ctx.id.name` in the type surface. Not supported: addressing PartyServer DOs via `idFromString()` or `newUniqueId()`. These paths return `ctx.id.name === undefined` inside the DO and will surface as a clear error from `this.name`. PartyServer has always assumed name-based addressing via `getServerByName` / `routePartykitRequest`; this release makes that assumption explicit. @@ -423,14 +418,12 @@ ### Patch Changes - [`528adea`](https://github.com/threepointone/partyserver/commit/528adeaced6dce6e888d2f54cc75c3569bf2c277) Thanks [@threepointone](https://github.com/threepointone)! - some fixes and tweaks - - getServerByName was throwing on all requests - `Env` is now an optional arg when defining `Server` - `y-partyserver/provider` can now take an optional `prefix` arg to use a custom url to connect - `routePartyKitRequest`/`getServerByName` now accepts `jurisdiction` bonus: - - added a bunch of fixtures - added stubs for docs diff --git a/packages/partysocket/CHANGELOG.md b/packages/partysocket/CHANGELOG.md index 353fc10e..847360eb 100644 --- a/packages/partysocket/CHANGELOG.md +++ b/packages/partysocket/CHANGELOG.md @@ -337,7 +337,6 @@ ### Patch Changes - [#251](https://github.com/partykit/partykit/pull/251) [`049bcac`](https://github.com/partykit/partykit/commit/049bcac42aa49e4bddec975c63b7d7984112e450) Thanks [@threepointone](https://github.com/threepointone)! - small tweaks to `init` - - replace `process.env.PARTYKIT_HOST` with just `PARTYKIT_HOST` - add a `tsconfig.json` - add partykit to devDependencies in `init` @@ -358,7 +357,6 @@ - [#211](https://github.com/partykit/partykit/pull/211) [`fffe721`](https://github.com/partykit/partykit/commit/fffe72148e5cc425e80c90b6bf180192df410080) Thanks [@threepointone](https://github.com/threepointone)! - update dependencies - [#191](https://github.com/partykit/partykit/pull/191) [`39cf5ce`](https://github.com/partykit/partykit/commit/39cf5cebf5e699bc50ace8b6d25cd82c807e863a) Thanks [@jevakallio](https://github.com/jevakallio)! - Improve PartySocket types and React hooks API: - - Add websocket lifecycle event handlers to usePartyKit options to reduce need for effects in userland - Allow usePartySocket to provide startClosed option to initialize without opening connection - Fix types for PartySocket#removeEventListener diff --git a/packages/partysocket/src/react.ts b/packages/partysocket/src/react.ts index 1277285e..3c756164 100644 --- a/packages/partysocket/src/react.ts +++ b/packages/partysocket/src/react.ts @@ -11,7 +11,7 @@ import type { SocketOptions } from "./use-socket"; type UsePartySocketOptions = Omit & EventHandlerOptions & - Pick & { + Pick & { host?: string | undefined; }; @@ -44,6 +44,22 @@ export default function usePartySocket(options: UsePartySocketOptions) { options.basePath, options.prefix, ...getOptionsThatShouldCauseRestartWhenChanged(options) + ]), + // Identifies *where* the socket connects. Deliberately excludes + // `query` (credentials — a token refresh shouldn't drop buffered + // messages) and `id` (connection identity, not destination). Used + // to decide whether messages buffered in a replaced socket can be + // re-sent on its replacement. + createSocketDestinationKey: (options) => + JSON.stringify([ + options.host, + options.room, + options.party, + options.path, + options.protocol, + options.protocols, + options.basePath, + options.prefix ]) }); diff --git a/packages/partysocket/src/tests/integration.test.ts b/packages/partysocket/src/tests/integration.test.ts index d2a7f12a..268be606 100644 --- a/packages/partysocket/src/tests/integration.test.ts +++ b/packages/partysocket/src/tests/integration.test.ts @@ -68,16 +68,17 @@ describe.skipIf(!!process.env.GITHUB_ACTIONS)( ps.send(testMessage); }); + // close() dispatches its close event synchronously, so the listener + // must be attached before close() is called + ps.addEventListener("close", () => { + expect(ps.readyState).toBe(WebSocket.CLOSED); + resolve(); + }); + ps.addEventListener("message", async (event) => { const text = await getMessageText(event.data); expect(text).toContain(testMessage); ps.close(); - - // Wait for close event before resolving - ps.addEventListener("close", () => { - expect(ps.readyState).toBe(WebSocket.CLOSED); - resolve(); - }); }); }); }); diff --git a/packages/partysocket/src/tests/react-hooks.test.tsx b/packages/partysocket/src/tests/react-hooks.test.tsx index b8762b23..97cc37c1 100644 --- a/packages/partysocket/src/tests/react-hooks.test.tsx +++ b/packages/partysocket/src/tests/react-hooks.test.tsx @@ -1563,6 +1563,246 @@ describe.skipIf(!!process.env.GITHUB_ACTIONS)( } ); +const QUEUE_PORT = 50150; + +describe.skipIf(!!process.env.GITHUB_ACTIONS)( + "buffered message handling across socket replacement", + () => { + let wss: WebSocketServer; + let connections: { url: string; messages: string[] }[]; + + beforeAll(() => { + connections = []; + return new Promise((resolve) => { + wss = new WebSocketServer({ port: QUEUE_PORT }, () => resolve()); + wss.on("connection", (ws, req) => { + const record = { url: req.url ?? "", messages: [] as string[] }; + connections.push(record); + ws.on("message", (data) => { + record.messages.push(data.toString()); + }); + }); + }); + }); + + afterAll(() => { + return new Promise((resolve) => { + wss.clients.forEach((client) => { + client.terminate(); + }); + wss.close(() => { + resolve(); + }); + }); + }); + + test("transfers buffered messages to the replacement socket when only query changes", async () => { + connections.length = 0; + // silence the send-after-close warning from sending while disabled + const warnSpy = vitest.spyOn(console, "warn").mockReturnValue(); + + const { result, rerender } = renderHook( + ({ + enabled, + query + }: { + enabled: boolean; + query: Record; + }) => + usePartySocket({ + host: `localhost:${QUEUE_PORT}`, + room: "queue-room", + query, + enabled, + ...FAST_RECONNECT + }), + { initialProps: { enabled: false, query: { token: "old" } } } + ); + + // buffered: the socket is disabled, nothing is connected + result.current.send("hello-from-queue"); + + // re-enable with a fresh token — same destination, different credentials + rerender({ enabled: true, query: { token: "fresh" } }); + + await waitFor( + () => { + expect(connections.length).toBeGreaterThanOrEqual(1); + const latest = connections[connections.length - 1]; + expect(latest.url).toContain("token=fresh"); + expect(latest.messages).toContain("hello-from-queue"); + }, + { timeout: 10000 } + ); + + result.current.close(); + warnSpy.mockRestore(); + }, 30000); + + test("transferred messages arrive in the order they were sent", async () => { + connections.length = 0; + const warnSpy = vitest.spyOn(console, "warn").mockReturnValue(); + + const { result, rerender } = renderHook( + ({ + enabled, + query + }: { + enabled: boolean; + query: Record; + }) => + usePartySocket({ + host: `localhost:${QUEUE_PORT}`, + room: "order-room", + query, + enabled, + ...FAST_RECONNECT + }), + { initialProps: { enabled: false, query: { token: "old" } } } + ); + + result.current.send("first"); + result.current.send("second"); + result.current.send("third"); + + rerender({ enabled: true, query: { token: "fresh" } }); + + await waitFor( + () => { + const latest = connections[connections.length - 1]; + expect(latest?.messages).toEqual(["first", "second", "third"]); + }, + { timeout: 10000 } + ); + + result.current.close(); + warnSpy.mockRestore(); + }, 30000); + + test("drops buffered messages with a warning when the destination changes", async () => { + connections.length = 0; + const warnSpy = vitest.spyOn(console, "warn").mockReturnValue(); + + const { result, rerender } = renderHook( + ({ enabled, room }: { enabled: boolean; room: string }) => + usePartySocket({ + host: `localhost:${QUEUE_PORT}`, + room, + enabled, + ...FAST_RECONNECT + }), + { initialProps: { enabled: false, room: "room-a" } } + ); + + // buffered while disabled, destined for room-a + result.current.send("secret-for-room-a"); + + // re-enable pointing at a different room — the buffered message must + // NOT follow the socket to room-b + rerender({ enabled: true, room: "room-b" }); + + await waitFor( + () => { + expect(connections.length).toBeGreaterThanOrEqual(1); + expect(connections[connections.length - 1].url).toContain("room-b"); + }, + { timeout: 10000 } + ); + + // allow any in-flight frames to arrive before asserting absence + await new Promise((r) => setTimeout(r, 300)); + for (const connection of connections) { + expect(connection.messages).not.toContain("secret-for-room-a"); + } + expect( + warnSpy.mock.calls.some((call) => + String(call[0]).includes("discarded 1 buffered message") + ) + ).toBe(true); + + result.current.close(); + warnSpy.mockRestore(); + }, 30000); + + test("transferEnqueuedMessages: true forces transfer across a destination change", async () => { + connections.length = 0; + const warnSpy = vitest.spyOn(console, "warn").mockReturnValue(); + + const { result, rerender } = renderHook( + ({ enabled, room }: { enabled: boolean; room: string }) => + usePartySocket({ + host: `localhost:${QUEUE_PORT}`, + room, + enabled, + transferEnqueuedMessages: true, + ...FAST_RECONNECT + }), + { initialProps: { enabled: false, room: "force-a" } } + ); + + result.current.send("follow-me"); + + rerender({ enabled: true, room: "force-b" }); + + await waitFor( + () => { + const target = connections.find((c) => c.url.includes("force-b")); + expect(target).toBeDefined(); + expect(target?.messages).toContain("follow-me"); + }, + { timeout: 10000 } + ); + + result.current.close(); + warnSpy.mockRestore(); + }, 30000); + + test("transferEnqueuedMessages: false drops buffered messages even when only query changes", async () => { + connections.length = 0; + const warnSpy = vitest.spyOn(console, "warn").mockReturnValue(); + + const { result, rerender } = renderHook( + ({ + enabled, + query + }: { + enabled: boolean; + query: Record; + }) => + usePartySocket({ + host: `localhost:${QUEUE_PORT}`, + room: "no-transfer-room", + query, + enabled, + transferEnqueuedMessages: false, + ...FAST_RECONNECT + }), + { initialProps: { enabled: false, query: { token: "old" } } } + ); + + result.current.send("do-not-deliver"); + + rerender({ enabled: true, query: { token: "fresh" } }); + + await waitFor( + () => { + const target = connections.find((c) => c.url.includes("token=fresh")); + expect(target).toBeDefined(); + }, + { timeout: 10000 } + ); + + await new Promise((r) => setTimeout(r, 300)); + for (const connection of connections) { + expect(connection.messages).not.toContain("do-not-deliver"); + } + + result.current.close(); + warnSpy.mockRestore(); + }, 30000); + } +); + const WIRE_PORT = 50145; describe.skipIf(!!process.env.GITHUB_ACTIONS)( diff --git a/packages/partysocket/src/tests/reconnecting.test.ts b/packages/partysocket/src/tests/reconnecting.test.ts index 532d3ad5..33101474 100644 --- a/packages/partysocket/src/tests/reconnecting.test.ts +++ b/packages/partysocket/src/tests/reconnecting.test.ts @@ -12,10 +12,13 @@ import { test, vitest } from "vitest"; +import { createServer } from "node:net"; import { type WebSocket as NodeWebSocket, WebSocketServer } from "ws"; import ReconnectingWebSocket from "../ws"; +import type { AddressInfo } from "node:net"; + import type { ErrorEvent } from "../ws"; const PORT = 50123; @@ -509,7 +512,9 @@ testDone("start closed", (done, fail) => { ws.addEventListener("message", (msg) => { expect(msg.data).toEqual(new Blob([anyMessageText])); ws.close(1000, "some reason"); - expect(ws.readyState).toBe(ws.CLOSING); + // close() is permanent and dispatches its close event synchronously, + // so the socket reports CLOSED as soon as close() returns + expect(ws.readyState).toBe(ws.CLOSED); }); ws.addEventListener("close", (event) => { @@ -553,7 +558,9 @@ testDone("connect, send, receive, close", (done, fail) => { ws.addEventListener("message", (msg) => { expect(msg.data).toEqual(new Blob([anyMessageText])); ws.close(1000, "some reason"); - expect(ws.readyState).toBe(ws.CLOSING); + // close() is permanent and dispatches its close event synchronously, + // so the socket reports CLOSED as soon as close() returns + expect(ws.readyState).toBe(ws.CLOSED); }); ws.addEventListener("close", (event) => { @@ -602,10 +609,12 @@ testDone("connect, send, receive, reconnect", (done) => { if (currentRound < totalRounds) { ws.reconnect(1000, "reconnect"); expect(ws.retryCount).toBe(0); + expect(ws.readyState).toBe(ws.CLOSING); } else { ws.close(1000, "close"); + // close() is permanent: reports CLOSED as soon as it returns + expect(ws.readyState).toBe(ws.CLOSED); } - expect(ws.readyState).toBe(ws.CLOSING); }; ws.addEventListener("close", (event) => { @@ -945,6 +954,275 @@ testDone("reconnect after closing", (done, fail) => { }); }); +testDone( + "send() returns true when transmitted and false when buffered", + (done, fail) => { + const ws = new ReconnectingWebSocket(URL); + + // not yet open — buffered + try { + expect(ws.send("queued")).toBe(false); + } catch (e) { + fail(e); + return; + } + + ws.addEventListener("open", () => { + try { + expect(ws.send("immediate")).toBe(true); + } catch (e) { + fail(e); + return; + } + ws.close(); + done(); + }); + } +); + +test("drainQueuedMessages() removes and returns buffered messages", () => { + const ws = new ReconnectingWebSocket(URL, undefined, { startClosed: true }); + + ws.send("one"); + ws.send("two"); + expect(ws.bufferedAmount).toBe("one".length + "two".length); + + expect(ws.drainQueuedMessages()).toEqual(["one", "two"]); + expect(ws.bufferedAmount).toBe(0); + expect(ws.drainQueuedMessages()).toEqual([]); + + ws.close(); +}); + +testDone( + "close() dispatches its close event synchronously, exactly once", + (done, fail) => { + const ws = new ReconnectingWebSocket(URL); + let closeEvents = 0; + ws.addEventListener("close", () => { + closeEvents++; + }); + + ws.addEventListener("open", () => { + try { + ws.close(1000, "sync close"); + // the close event must have fired during close() itself — this is + // what guarantees consumers that detach their listeners right + // after closing still observe the terminal close + expect(closeEvents).toBe(1); + expect(ws.readyState).toBe(ws.CLOSED); + } catch (e) { + fail(e); + return; + } + // give the real (inner) close handshake time to finish — it must + // not produce a second close event + setTimeout(() => { + try { + expect(closeEvents).toBe(1); + } catch (e) { + fail(e); + return; + } + done(); + }, 300); + }); + } +); + +test("close() while still CONNECTING dispatches its close event synchronously", async () => { + // a raw TCP server that accepts connections but never answers the + // WebSocket upgrade — the client socket stays in CONNECTING + const stallServer = createServer(() => {}); + const stallPort = await new Promise((resolve) => { + stallServer.listen(0, () => { + resolve((stallServer.address() as AddressInfo).port); + }); + }); + + try { + const ws = new ReconnectingWebSocket( + `ws://localhost:${stallPort}/`, + undefined, + { + maxRetries: 0, + connectionTimeout: 10000 + } + ); + let closeEvents = 0; + ws.addEventListener("close", () => { + closeEvents++; + }); + + // wait for the inner socket to be created (connection attempt started) + await new Promise((r) => setTimeout(r, 100)); + expect(ws.readyState).toBe(ws.CONNECTING); + + ws.close(); + expect(closeEvents).toBe(1); + expect(ws.readyState).toBe(ws.CLOSED); + + // the aborted handshake must not produce a second close (or an + // unhandled error — its error event is absorbed after detach) + await new Promise((r) => setTimeout(r, 200)); + expect(closeEvents).toBe(1); + } finally { + stallServer.close(); + } +}); + +testDone( + "close() inside a close listener does not dispatch a duplicate close event", + (done, fail) => { + const ws = new ReconnectingWebSocket(URL); + let closeEvents = 0; + + ws.addEventListener("close", () => { + closeEvents++; + // re-entrant close — must be a no-op, not recurse + ws.close(); + }); + + ws.addEventListener("open", () => { + ws.close(); + setTimeout(() => { + try { + expect(closeEvents).toBe(1); + } catch (e) { + fail(e); + return; + } + done(); + }, 300); + }); + } +); + +testDone( + "reconnect() immediately after close() does not dispatch a duplicate close event", + (done, fail) => { + const ws = new ReconnectingWebSocket(URL, undefined, { + minReconnectionDelay: 10, + maxReconnectionDelay: 50 + }); + let closeEvents = 0; + let opens = 0; + + ws.addEventListener("close", () => { + closeEvents++; + }); + + ws.addEventListener("open", () => { + opens++; + if (opens === 1) { + ws.close(); + // reconnect before the inner closing handshake completes + ws.reconnect(); + } else { + try { + // exactly one close event (from close()) — reconnect() must not + // dispatch another synthetic close for an already-closed socket + expect(closeEvents).toBe(1); + } catch (e) { + fail(e); + return; + } + ws.close(); + done(); + } + }); + } +); + +testDone("warns when send() is called after close()", (done, fail) => { + const warnSpy = vitest.spyOn(console, "warn").mockReturnValue(); + const ws = new ReconnectingWebSocket(URL); + + ws.addEventListener("open", () => { + ws.close(); + }); + + ws.addEventListener("close", () => { + try { + expect(warnSpy).not.toHaveBeenCalled(); + + // The socket is permanently closed (no reconnect scheduled) — + // sending now buffers into a queue nothing will ever flush. + ws.send("late message"); + expect(warnSpy).toHaveBeenCalledTimes(1); + expect(warnSpy.mock.calls[0][0]).toMatch( + /send\(\) was called after close\(\)/ + ); + + // The message is still buffered (back-compat: reconnect() can + // flush it), and the warning is deduped per close cycle. + ws.send("another late message"); + expect(warnSpy).toHaveBeenCalledTimes(1); + expect(ws.bufferedAmount).toBe( + "late message".length + "another late message".length + ); + } catch (e) { + fail(e); + return; + } + done(); + }); +}); + +testDone( + "messages sent after close() are delivered after reconnect(), and the warning resets", + (done, fail) => { + const warnSpy = vitest.spyOn(console, "warn").mockReturnValue(); + const ws = new ReconnectingWebSocket(URL, undefined, { + minReconnectionDelay: 10, + maxReconnectionDelay: 50 + }); + + let opens = 0; + ws.addEventListener("open", () => { + opens++; + if (opens === 1) { + ws.close(); + } + }); + + function onConnection(client: NodeWebSocket) { + client.on("message", (data: Buffer) => { + // eslint-disable-next-line @typescript-eslint/no-base-to-string + if (data.toString() === "buffered while closed") { + try { + // reconnect() resets the warn flag for the next close cycle + // @ts-expect-error - accessing private field + expect(ws._didWarnAboutClosedSend).toBe(false); + } catch (e) { + fail(e); + return; + } + wss.off("connection", onConnection); + ws.close(); + done(); + } + }); + } + wss.on("connection", onConnection); + + let closes = 0; + ws.addEventListener("close", () => { + closes++; + if (closes === 1) { + ws.send("buffered while closed"); + try { + expect(warnSpy).toHaveBeenCalledTimes(1); + } catch (e) { + fail(e); + return; + } + ws.reconnect(); + } + }); + } +); + testDone( "reconnect() works after maxRetries has been exhausted", (done, fail) => { diff --git a/packages/partysocket/src/use-socket.ts b/packages/partysocket/src/use-socket.ts index 4c9cb70c..0446dee6 100644 --- a/packages/partysocket/src/use-socket.ts +++ b/packages/partysocket/src/use-socket.ts @@ -6,6 +6,22 @@ import type { Options } from "./ws"; export type SocketOptions = Options & { /** Whether the socket should be connected. Defaults to true. */ enabled?: boolean; + /** + * Controls what happens to messages buffered by send() (sent while the + * connection wasn't open) when the hook replaces the socket because + * connection options changed. + * + * - `undefined` (default): transfer the buffered messages to the new + * socket only when the destination is unchanged — i.e. only + * credential-style options like `query` changed. If destination + * options (room, party, path, host, URL, ...) changed, the messages + * are discarded with a warning, since delivering messages composed + * for one destination to a different one is rarely what you want. + * - `true`: always transfer buffered messages to the new socket. + * - `false`: never transfer; buffered messages are discarded with a + * warning when the socket is replaced. + */ + transferEnqueuedMessages?: boolean; }; /** When any of the option values are changed, we should reinitialize the socket */ @@ -34,11 +50,20 @@ export function useStableSocket< >({ options, createSocket, - createSocketMemoKey: createOptionsMemoKey + createSocketMemoKey: createOptionsMemoKey, + createSocketDestinationKey }: { options: TOpts; createSocket: (options: TOpts) => T; createSocketMemoKey: (options: TOpts) => string; + /** + * Serializes the parts of the options that identify *where* the socket + * connects (room, party, path, host, URL — but not credentials like + * query params). Used to decide whether messages buffered in a replaced + * socket can safely be re-sent on its replacement: a matching + * destination key means the messages still go to the same place. + */ + createSocketDestinationKey?: (options: TOpts) => string; }) { // extract enabled with default value of true const { enabled = true } = options; @@ -81,6 +106,52 @@ export function useStableSocket< // to remember that options drifted and create a new socket on re-enable. const optionsChangedWhileDisabledRef = useRef(false); + // the options the *current* socket was created with. Messages buffered in + // a socket were destined for these options — when the socket is replaced, + // we compare destination keys against them (not against the previous + // render's options, which may have drifted while disabled). + const socketCreatedWithOptionsRef = useRef(socketOptions); + + // Creates the replacement socket for an options change, migrating the old + // socket's unsent message buffer per the transferEnqueuedMessages policy. + // Anything in that buffer was never transmitted, so re-sending it on the + // new socket cannot double-deliver; the only question is whether the new + // socket still points at the same destination. + // Held in a ref (like createSocketRef) so the effect below doesn't need + // it as a dependency — it always reads the latest render's options. + const createReplacementSocket = (oldSocket: T): T => { + const newSocket = createSocketRef.current({ + ...socketOptions, + startClosed: true + }); + + const queued = oldSocket.drainQueuedMessages(); + if (queued.length > 0) { + const sameDestination = createSocketDestinationKey + ? createSocketDestinationKey(socketCreatedWithOptionsRef.current) === + createSocketDestinationKey(socketOptions) + : false; + const shouldTransfer = + socketOptions.transferEnqueuedMessages ?? sameDestination; + if (shouldTransfer) { + for (const message of queued) { + newSocket.send(message); + } + } else { + console.warn( + `PartySocket: discarded ${queued.length} buffered message(s) while replacing the socket, ` + + "because the connection destination changed. Pass transferEnqueuedMessages: true to " + + "deliver buffered messages to the new destination instead." + ); + } + } + + socketCreatedWithOptionsRef.current = socketOptions; + return newSocket; + }; + const createReplacementSocketRef = useRef(createReplacementSocket); + createReplacementSocketRef.current = createReplacementSocket; + // finally, initialize the socket useEffect(() => { const optionsChanged = prevSocketOptionsRef.current !== socketOptions; @@ -114,10 +185,7 @@ export function useStableSocket< } // options changed while disabled — create new socket with current config - const newSocket = createSocketRef.current({ - ...socketOptions, - startClosed: true - }); + const newSocket = createReplacementSocketRef.current(socket); setSocket(newSocket); return () => { newSocket.close(); @@ -133,10 +201,7 @@ export function useStableSocket< // startClosed: true so it's inert until the else branch below // connects it on the next render. This ensures the socket is safe // to clean up if the component unmounts before that re-render. - const newSocket = createSocketRef.current({ - ...socketOptions, - startClosed: true - }); + const newSocket = createReplacementSocketRef.current(socket); // update socket reference (this will cause the effect to run again) setSocket(newSocket); diff --git a/packages/partysocket/src/use-ws.ts b/packages/partysocket/src/use-ws.ts index 6e77fae4..f7b78799 100644 --- a/packages/partysocket/src/use-ws.ts +++ b/packages/partysocket/src/use-ws.ts @@ -27,7 +27,12 @@ export default function useWebSocket( url, protocols, ...getOptionsThatShouldCauseRestartWhenChanged(options) - ]) + ]), + // For a plain WebSocket the URL *is* the destination (credentials + // can't be distinguished from it), so buffered messages only carry + // over when the socket was replaced for non-URL reasons (e.g. a + // retry/debug option changed). + createSocketDestinationKey: () => JSON.stringify([url, protocols]) }); useAttachWebSocketEventHandlers(socket, options); diff --git a/packages/partysocket/src/ws.ts b/packages/partysocket/src/ws.ts index 92111eaa..c74eafc5 100644 --- a/packages/partysocket/src/ws.ts +++ b/packages/partysocket/src/ws.ts @@ -136,6 +136,10 @@ const DEFAULT = { let didWarnAboutMissingWebSocket = false; +// no-op error listener attached to inner sockets we've detached from, so +// their late error events don't surface as unhandled errors +function absorbError() {} + export type UrlProvider = string | (() => string) | (() => Promise); export type ProtocolsProvider = | null @@ -159,6 +163,7 @@ export default class ReconnectingWebSocket extends (EventTarget as TypedEventTar private _connectLock = false; private _binaryType: BinaryType = "blob"; private _closeCalled = false; + private _didWarnAboutClosedSend = false; private _messageQueue: Message[] = []; private _debugLogger = console.log.bind(console); @@ -270,6 +275,13 @@ export default class ReconnectingWebSocket extends (EventTarget as TypedEventTar * The current state of the connection; this is one of the Ready state constants */ get readyState(): number { + if (this._closeCalled) { + // close() permanently closes the socket (no automatic reconnects) + // and dispatches its close event synchronously — report CLOSED + // immediately, even while the underlying socket is still finishing + // its closing handshake. + return ReconnectingWebSocket.CLOSED; + } if (this._ws) { return this._ws.readyState; } @@ -315,7 +327,14 @@ export default class ReconnectingWebSocket extends (EventTarget as TypedEventTar /** * Closes the WebSocket connection or connection attempt, if any. If the connection is already - * CLOSED, this method does nothing + * CLOSED or CLOSING, this method does nothing. + * + * The `close` event is dispatched synchronously (mirroring how + * `reconnect()` dispatches its synthetic close). This guarantees + * consumers observe a terminal event for every explicit close, even + * if their listeners are detached right after this call — previously + * the real (asynchronous) browser close event could fire after + * listeners were removed and go unobserved entirely. */ public close(code = 1000, reason?: string) { this._closeCalled = true; @@ -325,11 +344,17 @@ export default class ReconnectingWebSocket extends (EventTarget as TypedEventTar this._debug("close enqueued: no ws instance"); return; } - if (this._ws.readyState === this.CLOSED) { - this._debug("close: already closed"); + if ( + this._ws.readyState === this.CLOSED || + this._ws.readyState === this.CLOSING + ) { + this._debug("close: already closing or closed"); return; } - this._ws.close(code, reason); + // _disconnect detaches the inner socket's listeners (so the real + // close event is not dispatched a second time later) and dispatches + // a synthetic close event synchronously. + this._disconnect(code, reason); } /** @@ -339,8 +364,15 @@ export default class ReconnectingWebSocket extends (EventTarget as TypedEventTar public reconnect(code?: number, reason?: string) { this._shouldReconnect = true; this._closeCalled = false; + this._didWarnAboutClosedSend = false; this._retryCount = -1; - if (!this._ws || this._ws.readyState === this.CLOSED) { + if ( + !this._ws || + this._ws.readyState === this.CLOSED || + // CLOSING means a close was already initiated (and its close event + // already dispatched synthetically) — don't dispatch another one. + this._ws.readyState === this.CLOSING + ) { this._connect(); } else { this._disconnect(code, reason); @@ -349,20 +381,54 @@ export default class ReconnectingWebSocket extends (EventTarget as TypedEventTar } /** - * Enqueue specified data to be transmitted to the server over the WebSocket connection + * Enqueue specified data to be transmitted to the server over the WebSocket connection. + * + * @returns `true` if the message was transmitted immediately over an open + * connection; `false` if it was buffered (sent when the connection next + * opens — the buffer is always flushed before the `open` event is + * dispatched) or dropped because `maxEnqueuedMessages` was reached. */ - public send(data: Message) { + public send(data: Message): boolean { if (this._ws && this._ws.readyState === this.OPEN) { this._debug("send", data); this._ws.send(data); - } else { - const { maxEnqueuedMessages = DEFAULT.maxEnqueuedMessages } = - this._options; - if (this._messageQueue.length < maxEnqueuedMessages) { - this._debug("enqueue", data); - this._messageQueue.push(data); - } + return true; } + if (this._closeCalled && !this._didWarnAboutClosedSend) { + // After close() the socket will not reconnect on its own, so + // buffered messages are silently lost unless reconnect() is + // called later. Surface this instead of swallowing it — sends + // against a permanently closed socket usually indicate a stale + // reference bug in the caller. + this._didWarnAboutClosedSend = true; + console.warn( + "ReconnectingWebSocket: send() was called after close(). The message has been buffered, " + + "but it will only be delivered if reconnect() is called on this socket. " + + "If this socket has been discarded, the message is lost — this usually means " + + "a stale socket reference is being used." + ); + } + const { maxEnqueuedMessages = DEFAULT.maxEnqueuedMessages } = this._options; + if (this._messageQueue.length < maxEnqueuedMessages) { + this._debug("enqueue", data); + this._messageQueue.push(data); + } + return false; + } + + /** + * Removes and returns all messages that were passed to send() but never + * transmitted (they were buffered while the connection wasn't open). + * + * Useful when a socket is being discarded and replaced (e.g. the React + * hooks recreate the socket when connection options change): the + * replacement socket can re-send these messages, instead of them being + * silently lost with the old instance. + */ + public drainQueuedMessages(): Message[] { + const queue = this._messageQueue; + this._messageQueue = []; + return queue; } private _debug(...args: unknown[]) { @@ -622,6 +688,11 @@ const partysocket = new PartySocket({ this._ws.removeEventListener("message", this._handleMessage); // @ts-expect-error we need to fix event/listerner types this._ws.removeEventListener("error", this._handleError); + // The detached socket can still emit a late error (e.g. "WebSocket was + // closed before the connection was established" when it's closed while + // CONNECTING). We no longer care about this socket — absorb the event + // so it doesn't surface as an unhandled error. + this._ws.addEventListener("error", absorbError); } private _addListeners() { diff --git a/packages/partysub/package.json b/packages/partysub/package.json index 26905362..a52508b9 100644 --- a/packages/partysub/package.json +++ b/packages/partysub/package.json @@ -46,7 +46,7 @@ }, "devDependencies": { "@cloudflare/workers-types": "^4.20260426.1", - "partyserver": "^0.5.5", - "partysocket": "^1.1.18" + "partyserver": "^0.5.6", + "partysocket": "^1.1.19" } } diff --git a/packages/partysync/package.json b/packages/partysync/package.json index a2cf2c51..96053c18 100644 --- a/packages/partysync/package.json +++ b/packages/partysync/package.json @@ -51,8 +51,8 @@ "devDependencies": { "@cloudflare/workers-types": "^4.20260426.1", "partyfn": "^0.1.0", - "partyserver": "^0.5.5", - "partysocket": "^1.1.18" + "partyserver": "^0.5.6", + "partysocket": "^1.1.19" }, "peerDependencies": { "@cloudflare/workers-types": "^4.20260424.1", diff --git a/packages/partywhen/package.json b/packages/partywhen/package.json index 9e1099cb..b10be359 100644 --- a/packages/partywhen/package.json +++ b/packages/partywhen/package.json @@ -29,6 +29,6 @@ "description": "A library for scheduling and running tasks in Cloudflare Workers", "dependencies": { "cron-parser": "^5.5.0", - "partyserver": "^0.5.5" + "partyserver": "^0.5.6" } } diff --git a/packages/y-partyserver/package.json b/packages/y-partyserver/package.json index 45f863e7..10234118 100644 --- a/packages/y-partyserver/package.json +++ b/packages/y-partyserver/package.json @@ -65,7 +65,7 @@ "@cloudflare/workers-types": "^4.20260426.1", "@types/lodash.debounce": "^4.0.9", "@types/node": "25.6.0", - "partyserver": "^0.5.5", + "partyserver": "^0.5.6", "ws": "^8.20.0", "yjs": "^13.6.30" },