From 610b6948e9731222aa2ed3a6d781d50caf3b82f6 Mon Sep 17 00:00:00 2001 From: Sunil Pai Date: Fri, 12 Jun 2026 12:07:35 +0100 Subject: [PATCH 1/3] fix(partysocket): reliable buffered messages and close events across socket replacement MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Messages passed to send() while a socket isn't open are buffered in an internal queue. When the React hooks replace the socket because connection options changed (e.g. an auth token refresh), that buffer was silently lost with the old instance — and because the old socket's close event was dispatched asynchronously, consumers that detached their listeners during the swap never observed a terminal close either. Both failure modes strand callers waiting on a reply forever (see cloudflare/agents#1738). ReconnectingWebSocket: - close() now dispatches its close event synchronously (mirroring the synthetic close reconnect() already dispatched) and detaches the inner socket's listeners so the real close event isn't delivered twice. After close() returns, readyState reports CLOSED immediately. Re-entrant close() inside a close listener is a no-op, and reconnect() right after close() no longer dispatches a duplicate close. - send() returns a boolean: true if transmitted immediately over an open connection, false if buffered or dropped (maxEnqueuedMessages). Callers implementing request/response protocols can use this to know whether a request is actually in flight. - New drainQueuedMessages() removes and returns the unsent buffer so a discarded socket can hand it to a replacement. - send() after close() warns once per close cycle: the message is buffered but nothing will ever flush it unless reconnect() is called, which usually indicates a stale socket reference in the caller. - Inner sockets we've detached from get a no-op error listener so an aborted handshake (close while CONNECTING) doesn't surface as an unhandled error. React hooks (usePartySocket / useWebSocket): - useStableSocket now migrates the old socket's unsent buffer when it replaces the socket. By default messages transfer only when the destination is unchanged (only credentials like query changed); if destination options (room, party, path, host, URL, ...) changed, the messages are discarded with a warning rather than delivered to a destination they weren't composed for. The new transferEnqueuedMessages option forces either behavior (true = always, false = never). Tests cover the new send() return value, drainQueuedMessages(), synchronous close (open and CONNECTING, re-entrancy, close-then- reconnect dedup), the send-after-close warning, and wire-level hook tests for transfer on credential change, order preservation, drop on destination change, and both transferEnqueuedMessages overrides. Co-authored-by: Cursor --- .changeset/spotty-eels-warn.md | 11 + packages/partysocket/src/react.ts | 18 +- .../partysocket/src/tests/integration.test.ts | 13 +- .../src/tests/react-hooks.test.tsx | 240 +++++++++++++++ .../src/tests/reconnecting.test.ts | 284 +++++++++++++++++- packages/partysocket/src/use-socket.ts | 83 ++++- packages/partysocket/src/use-ws.ts | 7 +- packages/partysocket/src/ws.ts | 99 +++++- 8 files changed, 721 insertions(+), 34 deletions(-) create mode 100644 .changeset/spotty-eels-warn.md diff --git a/.changeset/spotty-eels-warn.md b/.changeset/spotty-eels-warn.md new file mode 100644 index 00000000..c306b92d --- /dev/null +++ b/.changeset/spotty-eels-warn.md @@ -0,0 +1,11 @@ +--- +"partysocket": minor +--- + +Make buffered messages and connection teardown reliable across socket replacement and explicit close (see cloudflare/agents#1738). + +- **`close()` now dispatches its `close` event synchronously** (mirroring how `reconnect()` already dispatched its synthetic close). Consumers that detach their listeners right after closing — like the React hooks during cleanup — previously never observed the terminal close event, leaving "connection closed" handling (pending-call rejection, state resets) to never run. After `close()` returns, `readyState` reports `CLOSED` immediately, even while the underlying socket finishes its closing handshake. Code that attached a `close` listener _after_ calling `close()` and relied on the event arriving asynchronously must attach the listener first. +- **`send()` now returns a `boolean`**: `true` if the message was transmitted immediately over an open connection, `false` if it was buffered (delivered when the connection next opens, always before the `open` event is dispatched) or dropped because `maxEnqueuedMessages` was reached. Callers implementing request/response protocols can use this to know whether a request is actually in flight. +- **New `drainQueuedMessages()` method** removes and returns all messages that were buffered by `send()` but never transmitted, so a socket that's being discarded can hand its unsent buffer to a replacement instead of silently losing it. +- **React hooks (`usePartySocket`, `useWebSocket`) now migrate buffered messages when they replace the socket** because connection options changed. By default, buffered messages transfer only when the destination is unchanged (e.g. only `query` — credentials — changed); if destination options (`room`, `party`, `path`, `host`, URL, ...) changed, the messages are discarded with a warning rather than delivered to a destination they weren't composed for. The new `transferEnqueuedMessages` option overrides this: `true` always transfers, `false` never does. +- **Warn when `send()` is called after `close()`.** Messages sent against a permanently closed socket are buffered into a queue that nothing will ever flush (unless `reconnect()` is called later), which silently strands the message — and any caller waiting on a reply. This usually indicates a stale socket reference in the caller. The message is still buffered for backwards compatibility; the warning is emitted once per close cycle and resets on `reconnect()`. diff --git a/packages/partysocket/src/react.ts b/packages/partysocket/src/react.ts index 1277285e..3c756164 100644 --- a/packages/partysocket/src/react.ts +++ b/packages/partysocket/src/react.ts @@ -11,7 +11,7 @@ import type { SocketOptions } from "./use-socket"; type UsePartySocketOptions = Omit & EventHandlerOptions & - Pick & { + Pick & { host?: string | undefined; }; @@ -44,6 +44,22 @@ export default function usePartySocket(options: UsePartySocketOptions) { options.basePath, options.prefix, ...getOptionsThatShouldCauseRestartWhenChanged(options) + ]), + // Identifies *where* the socket connects. Deliberately excludes + // `query` (credentials — a token refresh shouldn't drop buffered + // messages) and `id` (connection identity, not destination). Used + // to decide whether messages buffered in a replaced socket can be + // re-sent on its replacement. + createSocketDestinationKey: (options) => + JSON.stringify([ + options.host, + options.room, + options.party, + options.path, + options.protocol, + options.protocols, + options.basePath, + options.prefix ]) }); diff --git a/packages/partysocket/src/tests/integration.test.ts b/packages/partysocket/src/tests/integration.test.ts index d2a7f12a..268be606 100644 --- a/packages/partysocket/src/tests/integration.test.ts +++ b/packages/partysocket/src/tests/integration.test.ts @@ -68,16 +68,17 @@ describe.skipIf(!!process.env.GITHUB_ACTIONS)( ps.send(testMessage); }); + // close() dispatches its close event synchronously, so the listener + // must be attached before close() is called + ps.addEventListener("close", () => { + expect(ps.readyState).toBe(WebSocket.CLOSED); + resolve(); + }); + ps.addEventListener("message", async (event) => { const text = await getMessageText(event.data); expect(text).toContain(testMessage); ps.close(); - - // Wait for close event before resolving - ps.addEventListener("close", () => { - expect(ps.readyState).toBe(WebSocket.CLOSED); - resolve(); - }); }); }); }); diff --git a/packages/partysocket/src/tests/react-hooks.test.tsx b/packages/partysocket/src/tests/react-hooks.test.tsx index b8762b23..97cc37c1 100644 --- a/packages/partysocket/src/tests/react-hooks.test.tsx +++ b/packages/partysocket/src/tests/react-hooks.test.tsx @@ -1563,6 +1563,246 @@ describe.skipIf(!!process.env.GITHUB_ACTIONS)( } ); +const QUEUE_PORT = 50150; + +describe.skipIf(!!process.env.GITHUB_ACTIONS)( + "buffered message handling across socket replacement", + () => { + let wss: WebSocketServer; + let connections: { url: string; messages: string[] }[]; + + beforeAll(() => { + connections = []; + return new Promise((resolve) => { + wss = new WebSocketServer({ port: QUEUE_PORT }, () => resolve()); + wss.on("connection", (ws, req) => { + const record = { url: req.url ?? "", messages: [] as string[] }; + connections.push(record); + ws.on("message", (data) => { + record.messages.push(data.toString()); + }); + }); + }); + }); + + afterAll(() => { + return new Promise((resolve) => { + wss.clients.forEach((client) => { + client.terminate(); + }); + wss.close(() => { + resolve(); + }); + }); + }); + + test("transfers buffered messages to the replacement socket when only query changes", async () => { + connections.length = 0; + // silence the send-after-close warning from sending while disabled + const warnSpy = vitest.spyOn(console, "warn").mockReturnValue(); + + const { result, rerender } = renderHook( + ({ + enabled, + query + }: { + enabled: boolean; + query: Record; + }) => + usePartySocket({ + host: `localhost:${QUEUE_PORT}`, + room: "queue-room", + query, + enabled, + ...FAST_RECONNECT + }), + { initialProps: { enabled: false, query: { token: "old" } } } + ); + + // buffered: the socket is disabled, nothing is connected + result.current.send("hello-from-queue"); + + // re-enable with a fresh token — same destination, different credentials + rerender({ enabled: true, query: { token: "fresh" } }); + + await waitFor( + () => { + expect(connections.length).toBeGreaterThanOrEqual(1); + const latest = connections[connections.length - 1]; + expect(latest.url).toContain("token=fresh"); + expect(latest.messages).toContain("hello-from-queue"); + }, + { timeout: 10000 } + ); + + result.current.close(); + warnSpy.mockRestore(); + }, 30000); + + test("transferred messages arrive in the order they were sent", async () => { + connections.length = 0; + const warnSpy = vitest.spyOn(console, "warn").mockReturnValue(); + + const { result, rerender } = renderHook( + ({ + enabled, + query + }: { + enabled: boolean; + query: Record; + }) => + usePartySocket({ + host: `localhost:${QUEUE_PORT}`, + room: "order-room", + query, + enabled, + ...FAST_RECONNECT + }), + { initialProps: { enabled: false, query: { token: "old" } } } + ); + + result.current.send("first"); + result.current.send("second"); + result.current.send("third"); + + rerender({ enabled: true, query: { token: "fresh" } }); + + await waitFor( + () => { + const latest = connections[connections.length - 1]; + expect(latest?.messages).toEqual(["first", "second", "third"]); + }, + { timeout: 10000 } + ); + + result.current.close(); + warnSpy.mockRestore(); + }, 30000); + + test("drops buffered messages with a warning when the destination changes", async () => { + connections.length = 0; + const warnSpy = vitest.spyOn(console, "warn").mockReturnValue(); + + const { result, rerender } = renderHook( + ({ enabled, room }: { enabled: boolean; room: string }) => + usePartySocket({ + host: `localhost:${QUEUE_PORT}`, + room, + enabled, + ...FAST_RECONNECT + }), + { initialProps: { enabled: false, room: "room-a" } } + ); + + // buffered while disabled, destined for room-a + result.current.send("secret-for-room-a"); + + // re-enable pointing at a different room — the buffered message must + // NOT follow the socket to room-b + rerender({ enabled: true, room: "room-b" }); + + await waitFor( + () => { + expect(connections.length).toBeGreaterThanOrEqual(1); + expect(connections[connections.length - 1].url).toContain("room-b"); + }, + { timeout: 10000 } + ); + + // allow any in-flight frames to arrive before asserting absence + await new Promise((r) => setTimeout(r, 300)); + for (const connection of connections) { + expect(connection.messages).not.toContain("secret-for-room-a"); + } + expect( + warnSpy.mock.calls.some((call) => + String(call[0]).includes("discarded 1 buffered message") + ) + ).toBe(true); + + result.current.close(); + warnSpy.mockRestore(); + }, 30000); + + test("transferEnqueuedMessages: true forces transfer across a destination change", async () => { + connections.length = 0; + const warnSpy = vitest.spyOn(console, "warn").mockReturnValue(); + + const { result, rerender } = renderHook( + ({ enabled, room }: { enabled: boolean; room: string }) => + usePartySocket({ + host: `localhost:${QUEUE_PORT}`, + room, + enabled, + transferEnqueuedMessages: true, + ...FAST_RECONNECT + }), + { initialProps: { enabled: false, room: "force-a" } } + ); + + result.current.send("follow-me"); + + rerender({ enabled: true, room: "force-b" }); + + await waitFor( + () => { + const target = connections.find((c) => c.url.includes("force-b")); + expect(target).toBeDefined(); + expect(target?.messages).toContain("follow-me"); + }, + { timeout: 10000 } + ); + + result.current.close(); + warnSpy.mockRestore(); + }, 30000); + + test("transferEnqueuedMessages: false drops buffered messages even when only query changes", async () => { + connections.length = 0; + const warnSpy = vitest.spyOn(console, "warn").mockReturnValue(); + + const { result, rerender } = renderHook( + ({ + enabled, + query + }: { + enabled: boolean; + query: Record; + }) => + usePartySocket({ + host: `localhost:${QUEUE_PORT}`, + room: "no-transfer-room", + query, + enabled, + transferEnqueuedMessages: false, + ...FAST_RECONNECT + }), + { initialProps: { enabled: false, query: { token: "old" } } } + ); + + result.current.send("do-not-deliver"); + + rerender({ enabled: true, query: { token: "fresh" } }); + + await waitFor( + () => { + const target = connections.find((c) => c.url.includes("token=fresh")); + expect(target).toBeDefined(); + }, + { timeout: 10000 } + ); + + await new Promise((r) => setTimeout(r, 300)); + for (const connection of connections) { + expect(connection.messages).not.toContain("do-not-deliver"); + } + + result.current.close(); + warnSpy.mockRestore(); + }, 30000); + } +); + const WIRE_PORT = 50145; describe.skipIf(!!process.env.GITHUB_ACTIONS)( diff --git a/packages/partysocket/src/tests/reconnecting.test.ts b/packages/partysocket/src/tests/reconnecting.test.ts index 532d3ad5..33101474 100644 --- a/packages/partysocket/src/tests/reconnecting.test.ts +++ b/packages/partysocket/src/tests/reconnecting.test.ts @@ -12,10 +12,13 @@ import { test, vitest } from "vitest"; +import { createServer } from "node:net"; import { type WebSocket as NodeWebSocket, WebSocketServer } from "ws"; import ReconnectingWebSocket from "../ws"; +import type { AddressInfo } from "node:net"; + import type { ErrorEvent } from "../ws"; const PORT = 50123; @@ -509,7 +512,9 @@ testDone("start closed", (done, fail) => { ws.addEventListener("message", (msg) => { expect(msg.data).toEqual(new Blob([anyMessageText])); ws.close(1000, "some reason"); - expect(ws.readyState).toBe(ws.CLOSING); + // close() is permanent and dispatches its close event synchronously, + // so the socket reports CLOSED as soon as close() returns + expect(ws.readyState).toBe(ws.CLOSED); }); ws.addEventListener("close", (event) => { @@ -553,7 +558,9 @@ testDone("connect, send, receive, close", (done, fail) => { ws.addEventListener("message", (msg) => { expect(msg.data).toEqual(new Blob([anyMessageText])); ws.close(1000, "some reason"); - expect(ws.readyState).toBe(ws.CLOSING); + // close() is permanent and dispatches its close event synchronously, + // so the socket reports CLOSED as soon as close() returns + expect(ws.readyState).toBe(ws.CLOSED); }); ws.addEventListener("close", (event) => { @@ -602,10 +609,12 @@ testDone("connect, send, receive, reconnect", (done) => { if (currentRound < totalRounds) { ws.reconnect(1000, "reconnect"); expect(ws.retryCount).toBe(0); + expect(ws.readyState).toBe(ws.CLOSING); } else { ws.close(1000, "close"); + // close() is permanent: reports CLOSED as soon as it returns + expect(ws.readyState).toBe(ws.CLOSED); } - expect(ws.readyState).toBe(ws.CLOSING); }; ws.addEventListener("close", (event) => { @@ -945,6 +954,275 @@ testDone("reconnect after closing", (done, fail) => { }); }); +testDone( + "send() returns true when transmitted and false when buffered", + (done, fail) => { + const ws = new ReconnectingWebSocket(URL); + + // not yet open — buffered + try { + expect(ws.send("queued")).toBe(false); + } catch (e) { + fail(e); + return; + } + + ws.addEventListener("open", () => { + try { + expect(ws.send("immediate")).toBe(true); + } catch (e) { + fail(e); + return; + } + ws.close(); + done(); + }); + } +); + +test("drainQueuedMessages() removes and returns buffered messages", () => { + const ws = new ReconnectingWebSocket(URL, undefined, { startClosed: true }); + + ws.send("one"); + ws.send("two"); + expect(ws.bufferedAmount).toBe("one".length + "two".length); + + expect(ws.drainQueuedMessages()).toEqual(["one", "two"]); + expect(ws.bufferedAmount).toBe(0); + expect(ws.drainQueuedMessages()).toEqual([]); + + ws.close(); +}); + +testDone( + "close() dispatches its close event synchronously, exactly once", + (done, fail) => { + const ws = new ReconnectingWebSocket(URL); + let closeEvents = 0; + ws.addEventListener("close", () => { + closeEvents++; + }); + + ws.addEventListener("open", () => { + try { + ws.close(1000, "sync close"); + // the close event must have fired during close() itself — this is + // what guarantees consumers that detach their listeners right + // after closing still observe the terminal close + expect(closeEvents).toBe(1); + expect(ws.readyState).toBe(ws.CLOSED); + } catch (e) { + fail(e); + return; + } + // give the real (inner) close handshake time to finish — it must + // not produce a second close event + setTimeout(() => { + try { + expect(closeEvents).toBe(1); + } catch (e) { + fail(e); + return; + } + done(); + }, 300); + }); + } +); + +test("close() while still CONNECTING dispatches its close event synchronously", async () => { + // a raw TCP server that accepts connections but never answers the + // WebSocket upgrade — the client socket stays in CONNECTING + const stallServer = createServer(() => {}); + const stallPort = await new Promise((resolve) => { + stallServer.listen(0, () => { + resolve((stallServer.address() as AddressInfo).port); + }); + }); + + try { + const ws = new ReconnectingWebSocket( + `ws://localhost:${stallPort}/`, + undefined, + { + maxRetries: 0, + connectionTimeout: 10000 + } + ); + let closeEvents = 0; + ws.addEventListener("close", () => { + closeEvents++; + }); + + // wait for the inner socket to be created (connection attempt started) + await new Promise((r) => setTimeout(r, 100)); + expect(ws.readyState).toBe(ws.CONNECTING); + + ws.close(); + expect(closeEvents).toBe(1); + expect(ws.readyState).toBe(ws.CLOSED); + + // the aborted handshake must not produce a second close (or an + // unhandled error — its error event is absorbed after detach) + await new Promise((r) => setTimeout(r, 200)); + expect(closeEvents).toBe(1); + } finally { + stallServer.close(); + } +}); + +testDone( + "close() inside a close listener does not dispatch a duplicate close event", + (done, fail) => { + const ws = new ReconnectingWebSocket(URL); + let closeEvents = 0; + + ws.addEventListener("close", () => { + closeEvents++; + // re-entrant close — must be a no-op, not recurse + ws.close(); + }); + + ws.addEventListener("open", () => { + ws.close(); + setTimeout(() => { + try { + expect(closeEvents).toBe(1); + } catch (e) { + fail(e); + return; + } + done(); + }, 300); + }); + } +); + +testDone( + "reconnect() immediately after close() does not dispatch a duplicate close event", + (done, fail) => { + const ws = new ReconnectingWebSocket(URL, undefined, { + minReconnectionDelay: 10, + maxReconnectionDelay: 50 + }); + let closeEvents = 0; + let opens = 0; + + ws.addEventListener("close", () => { + closeEvents++; + }); + + ws.addEventListener("open", () => { + opens++; + if (opens === 1) { + ws.close(); + // reconnect before the inner closing handshake completes + ws.reconnect(); + } else { + try { + // exactly one close event (from close()) — reconnect() must not + // dispatch another synthetic close for an already-closed socket + expect(closeEvents).toBe(1); + } catch (e) { + fail(e); + return; + } + ws.close(); + done(); + } + }); + } +); + +testDone("warns when send() is called after close()", (done, fail) => { + const warnSpy = vitest.spyOn(console, "warn").mockReturnValue(); + const ws = new ReconnectingWebSocket(URL); + + ws.addEventListener("open", () => { + ws.close(); + }); + + ws.addEventListener("close", () => { + try { + expect(warnSpy).not.toHaveBeenCalled(); + + // The socket is permanently closed (no reconnect scheduled) — + // sending now buffers into a queue nothing will ever flush. + ws.send("late message"); + expect(warnSpy).toHaveBeenCalledTimes(1); + expect(warnSpy.mock.calls[0][0]).toMatch( + /send\(\) was called after close\(\)/ + ); + + // The message is still buffered (back-compat: reconnect() can + // flush it), and the warning is deduped per close cycle. + ws.send("another late message"); + expect(warnSpy).toHaveBeenCalledTimes(1); + expect(ws.bufferedAmount).toBe( + "late message".length + "another late message".length + ); + } catch (e) { + fail(e); + return; + } + done(); + }); +}); + +testDone( + "messages sent after close() are delivered after reconnect(), and the warning resets", + (done, fail) => { + const warnSpy = vitest.spyOn(console, "warn").mockReturnValue(); + const ws = new ReconnectingWebSocket(URL, undefined, { + minReconnectionDelay: 10, + maxReconnectionDelay: 50 + }); + + let opens = 0; + ws.addEventListener("open", () => { + opens++; + if (opens === 1) { + ws.close(); + } + }); + + function onConnection(client: NodeWebSocket) { + client.on("message", (data: Buffer) => { + // eslint-disable-next-line @typescript-eslint/no-base-to-string + if (data.toString() === "buffered while closed") { + try { + // reconnect() resets the warn flag for the next close cycle + // @ts-expect-error - accessing private field + expect(ws._didWarnAboutClosedSend).toBe(false); + } catch (e) { + fail(e); + return; + } + wss.off("connection", onConnection); + ws.close(); + done(); + } + }); + } + wss.on("connection", onConnection); + + let closes = 0; + ws.addEventListener("close", () => { + closes++; + if (closes === 1) { + ws.send("buffered while closed"); + try { + expect(warnSpy).toHaveBeenCalledTimes(1); + } catch (e) { + fail(e); + return; + } + ws.reconnect(); + } + }); + } +); + testDone( "reconnect() works after maxRetries has been exhausted", (done, fail) => { diff --git a/packages/partysocket/src/use-socket.ts b/packages/partysocket/src/use-socket.ts index 4c9cb70c..0446dee6 100644 --- a/packages/partysocket/src/use-socket.ts +++ b/packages/partysocket/src/use-socket.ts @@ -6,6 +6,22 @@ import type { Options } from "./ws"; export type SocketOptions = Options & { /** Whether the socket should be connected. Defaults to true. */ enabled?: boolean; + /** + * Controls what happens to messages buffered by send() (sent while the + * connection wasn't open) when the hook replaces the socket because + * connection options changed. + * + * - `undefined` (default): transfer the buffered messages to the new + * socket only when the destination is unchanged — i.e. only + * credential-style options like `query` changed. If destination + * options (room, party, path, host, URL, ...) changed, the messages + * are discarded with a warning, since delivering messages composed + * for one destination to a different one is rarely what you want. + * - `true`: always transfer buffered messages to the new socket. + * - `false`: never transfer; buffered messages are discarded with a + * warning when the socket is replaced. + */ + transferEnqueuedMessages?: boolean; }; /** When any of the option values are changed, we should reinitialize the socket */ @@ -34,11 +50,20 @@ export function useStableSocket< >({ options, createSocket, - createSocketMemoKey: createOptionsMemoKey + createSocketMemoKey: createOptionsMemoKey, + createSocketDestinationKey }: { options: TOpts; createSocket: (options: TOpts) => T; createSocketMemoKey: (options: TOpts) => string; + /** + * Serializes the parts of the options that identify *where* the socket + * connects (room, party, path, host, URL — but not credentials like + * query params). Used to decide whether messages buffered in a replaced + * socket can safely be re-sent on its replacement: a matching + * destination key means the messages still go to the same place. + */ + createSocketDestinationKey?: (options: TOpts) => string; }) { // extract enabled with default value of true const { enabled = true } = options; @@ -81,6 +106,52 @@ export function useStableSocket< // to remember that options drifted and create a new socket on re-enable. const optionsChangedWhileDisabledRef = useRef(false); + // the options the *current* socket was created with. Messages buffered in + // a socket were destined for these options — when the socket is replaced, + // we compare destination keys against them (not against the previous + // render's options, which may have drifted while disabled). + const socketCreatedWithOptionsRef = useRef(socketOptions); + + // Creates the replacement socket for an options change, migrating the old + // socket's unsent message buffer per the transferEnqueuedMessages policy. + // Anything in that buffer was never transmitted, so re-sending it on the + // new socket cannot double-deliver; the only question is whether the new + // socket still points at the same destination. + // Held in a ref (like createSocketRef) so the effect below doesn't need + // it as a dependency — it always reads the latest render's options. + const createReplacementSocket = (oldSocket: T): T => { + const newSocket = createSocketRef.current({ + ...socketOptions, + startClosed: true + }); + + const queued = oldSocket.drainQueuedMessages(); + if (queued.length > 0) { + const sameDestination = createSocketDestinationKey + ? createSocketDestinationKey(socketCreatedWithOptionsRef.current) === + createSocketDestinationKey(socketOptions) + : false; + const shouldTransfer = + socketOptions.transferEnqueuedMessages ?? sameDestination; + if (shouldTransfer) { + for (const message of queued) { + newSocket.send(message); + } + } else { + console.warn( + `PartySocket: discarded ${queued.length} buffered message(s) while replacing the socket, ` + + "because the connection destination changed. Pass transferEnqueuedMessages: true to " + + "deliver buffered messages to the new destination instead." + ); + } + } + + socketCreatedWithOptionsRef.current = socketOptions; + return newSocket; + }; + const createReplacementSocketRef = useRef(createReplacementSocket); + createReplacementSocketRef.current = createReplacementSocket; + // finally, initialize the socket useEffect(() => { const optionsChanged = prevSocketOptionsRef.current !== socketOptions; @@ -114,10 +185,7 @@ export function useStableSocket< } // options changed while disabled — create new socket with current config - const newSocket = createSocketRef.current({ - ...socketOptions, - startClosed: true - }); + const newSocket = createReplacementSocketRef.current(socket); setSocket(newSocket); return () => { newSocket.close(); @@ -133,10 +201,7 @@ export function useStableSocket< // startClosed: true so it's inert until the else branch below // connects it on the next render. This ensures the socket is safe // to clean up if the component unmounts before that re-render. - const newSocket = createSocketRef.current({ - ...socketOptions, - startClosed: true - }); + const newSocket = createReplacementSocketRef.current(socket); // update socket reference (this will cause the effect to run again) setSocket(newSocket); diff --git a/packages/partysocket/src/use-ws.ts b/packages/partysocket/src/use-ws.ts index 6e77fae4..f7b78799 100644 --- a/packages/partysocket/src/use-ws.ts +++ b/packages/partysocket/src/use-ws.ts @@ -27,7 +27,12 @@ export default function useWebSocket( url, protocols, ...getOptionsThatShouldCauseRestartWhenChanged(options) - ]) + ]), + // For a plain WebSocket the URL *is* the destination (credentials + // can't be distinguished from it), so buffered messages only carry + // over when the socket was replaced for non-URL reasons (e.g. a + // retry/debug option changed). + createSocketDestinationKey: () => JSON.stringify([url, protocols]) }); useAttachWebSocketEventHandlers(socket, options); diff --git a/packages/partysocket/src/ws.ts b/packages/partysocket/src/ws.ts index 92111eaa..c74eafc5 100644 --- a/packages/partysocket/src/ws.ts +++ b/packages/partysocket/src/ws.ts @@ -136,6 +136,10 @@ const DEFAULT = { let didWarnAboutMissingWebSocket = false; +// no-op error listener attached to inner sockets we've detached from, so +// their late error events don't surface as unhandled errors +function absorbError() {} + export type UrlProvider = string | (() => string) | (() => Promise); export type ProtocolsProvider = | null @@ -159,6 +163,7 @@ export default class ReconnectingWebSocket extends (EventTarget as TypedEventTar private _connectLock = false; private _binaryType: BinaryType = "blob"; private _closeCalled = false; + private _didWarnAboutClosedSend = false; private _messageQueue: Message[] = []; private _debugLogger = console.log.bind(console); @@ -270,6 +275,13 @@ export default class ReconnectingWebSocket extends (EventTarget as TypedEventTar * The current state of the connection; this is one of the Ready state constants */ get readyState(): number { + if (this._closeCalled) { + // close() permanently closes the socket (no automatic reconnects) + // and dispatches its close event synchronously — report CLOSED + // immediately, even while the underlying socket is still finishing + // its closing handshake. + return ReconnectingWebSocket.CLOSED; + } if (this._ws) { return this._ws.readyState; } @@ -315,7 +327,14 @@ export default class ReconnectingWebSocket extends (EventTarget as TypedEventTar /** * Closes the WebSocket connection or connection attempt, if any. If the connection is already - * CLOSED, this method does nothing + * CLOSED or CLOSING, this method does nothing. + * + * The `close` event is dispatched synchronously (mirroring how + * `reconnect()` dispatches its synthetic close). This guarantees + * consumers observe a terminal event for every explicit close, even + * if their listeners are detached right after this call — previously + * the real (asynchronous) browser close event could fire after + * listeners were removed and go unobserved entirely. */ public close(code = 1000, reason?: string) { this._closeCalled = true; @@ -325,11 +344,17 @@ export default class ReconnectingWebSocket extends (EventTarget as TypedEventTar this._debug("close enqueued: no ws instance"); return; } - if (this._ws.readyState === this.CLOSED) { - this._debug("close: already closed"); + if ( + this._ws.readyState === this.CLOSED || + this._ws.readyState === this.CLOSING + ) { + this._debug("close: already closing or closed"); return; } - this._ws.close(code, reason); + // _disconnect detaches the inner socket's listeners (so the real + // close event is not dispatched a second time later) and dispatches + // a synthetic close event synchronously. + this._disconnect(code, reason); } /** @@ -339,8 +364,15 @@ export default class ReconnectingWebSocket extends (EventTarget as TypedEventTar public reconnect(code?: number, reason?: string) { this._shouldReconnect = true; this._closeCalled = false; + this._didWarnAboutClosedSend = false; this._retryCount = -1; - if (!this._ws || this._ws.readyState === this.CLOSED) { + if ( + !this._ws || + this._ws.readyState === this.CLOSED || + // CLOSING means a close was already initiated (and its close event + // already dispatched synthetically) — don't dispatch another one. + this._ws.readyState === this.CLOSING + ) { this._connect(); } else { this._disconnect(code, reason); @@ -349,20 +381,54 @@ export default class ReconnectingWebSocket extends (EventTarget as TypedEventTar } /** - * Enqueue specified data to be transmitted to the server over the WebSocket connection + * Enqueue specified data to be transmitted to the server over the WebSocket connection. + * + * @returns `true` if the message was transmitted immediately over an open + * connection; `false` if it was buffered (sent when the connection next + * opens — the buffer is always flushed before the `open` event is + * dispatched) or dropped because `maxEnqueuedMessages` was reached. */ - public send(data: Message) { + public send(data: Message): boolean { if (this._ws && this._ws.readyState === this.OPEN) { this._debug("send", data); this._ws.send(data); - } else { - const { maxEnqueuedMessages = DEFAULT.maxEnqueuedMessages } = - this._options; - if (this._messageQueue.length < maxEnqueuedMessages) { - this._debug("enqueue", data); - this._messageQueue.push(data); - } + return true; } + if (this._closeCalled && !this._didWarnAboutClosedSend) { + // After close() the socket will not reconnect on its own, so + // buffered messages are silently lost unless reconnect() is + // called later. Surface this instead of swallowing it — sends + // against a permanently closed socket usually indicate a stale + // reference bug in the caller. + this._didWarnAboutClosedSend = true; + console.warn( + "ReconnectingWebSocket: send() was called after close(). The message has been buffered, " + + "but it will only be delivered if reconnect() is called on this socket. " + + "If this socket has been discarded, the message is lost — this usually means " + + "a stale socket reference is being used." + ); + } + const { maxEnqueuedMessages = DEFAULT.maxEnqueuedMessages } = this._options; + if (this._messageQueue.length < maxEnqueuedMessages) { + this._debug("enqueue", data); + this._messageQueue.push(data); + } + return false; + } + + /** + * Removes and returns all messages that were passed to send() but never + * transmitted (they were buffered while the connection wasn't open). + * + * Useful when a socket is being discarded and replaced (e.g. the React + * hooks recreate the socket when connection options change): the + * replacement socket can re-send these messages, instead of them being + * silently lost with the old instance. + */ + public drainQueuedMessages(): Message[] { + const queue = this._messageQueue; + this._messageQueue = []; + return queue; } private _debug(...args: unknown[]) { @@ -622,6 +688,11 @@ const partysocket = new PartySocket({ this._ws.removeEventListener("message", this._handleMessage); // @ts-expect-error we need to fix event/listerner types this._ws.removeEventListener("error", this._handleError); + // The detached socket can still emit a late error (e.g. "WebSocket was + // closed before the connection was established" when it's closed while + // CONNECTING). We no longer care about this socket — absorb the event + // so it doesn't surface as an unhandled error. + this._ws.addEventListener("error", absorbError); } private _addListeners() { From fd85f7e6c1c81b1d80a9b457a117de0e24e05152 Mon Sep 17 00:00:00 2001 From: Sunil Pai Date: Fri, 12 Jun 2026 12:16:27 +0100 Subject: [PATCH 2/3] fix versions --- package-lock.json | 16 ++++++++-------- packages/hono-party/package.json | 2 +- packages/partyfn/package.json | 2 +- packages/partysub/package.json | 4 ++-- packages/partysync/package.json | 4 ++-- packages/partywhen/package.json | 2 +- packages/y-partyserver/package.json | 2 +- 7 files changed, 16 insertions(+), 16 deletions(-) diff --git a/package-lock.json b/package-lock.json index 461eaf48..21404848 100644 --- a/package-lock.json +++ b/package-lock.json @@ -11382,7 +11382,7 @@ "devDependencies": { "@cloudflare/workers-types": "^4.20260426.1", "hono": "^4.12.15", - "partyserver": "^0.5.5" + "partyserver": "^0.5.6" }, "peerDependencies": { "@cloudflare/workers-types": "^4.20260424.1", @@ -11410,7 +11410,7 @@ "license": "ISC", "dependencies": { "nanoid": "^5.1.9", - "partysocket": "^1.1.18" + "partysocket": "^1.1.19" } }, "packages/partyfn/node_modules/nanoid": { @@ -11515,8 +11515,8 @@ "license": "ISC", "devDependencies": { "@cloudflare/workers-types": "^4.20260426.1", - "partyserver": "^0.5.5", - "partysocket": "^1.1.18" + "partyserver": "^0.5.6", + "partysocket": "^1.1.19" }, "peerDependencies": { "@cloudflare/workers-types": "^4.20260424.1", @@ -11534,8 +11534,8 @@ "devDependencies": { "@cloudflare/workers-types": "^4.20260426.1", "partyfn": "^0.1.0", - "partyserver": "^0.5.5", - "partysocket": "^1.1.18" + "partyserver": "^0.5.6", + "partysocket": "^1.1.19" }, "peerDependencies": { "@cloudflare/workers-types": "^4.20260424.1", @@ -11575,7 +11575,7 @@ "license": "ISC", "dependencies": { "cron-parser": "^5.5.0", - "partyserver": "^0.5.5" + "partyserver": "^0.5.6" } }, "packages/y-partyserver": { @@ -11591,7 +11591,7 @@ "@cloudflare/workers-types": "^4.20260426.1", "@types/lodash.debounce": "^4.0.9", "@types/node": "25.6.0", - "partyserver": "^0.5.5", + "partyserver": "^0.5.6", "ws": "^8.20.0", "yjs": "^13.6.30" }, diff --git a/packages/hono-party/package.json b/packages/hono-party/package.json index b85e97a8..bf1746ad 100644 --- a/packages/hono-party/package.json +++ b/packages/hono-party/package.json @@ -37,6 +37,6 @@ "devDependencies": { "@cloudflare/workers-types": "^4.20260426.1", "hono": "^4.12.15", - "partyserver": "^0.5.5" + "partyserver": "^0.5.6" } } diff --git a/packages/partyfn/package.json b/packages/partyfn/package.json index 7a38bb5d..2228d895 100644 --- a/packages/partyfn/package.json +++ b/packages/partyfn/package.json @@ -19,7 +19,7 @@ ], "dependencies": { "nanoid": "^5.1.9", - "partysocket": "^1.1.18" + "partysocket": "^1.1.19" }, "scripts": { "build": "tsx scripts/build.ts" diff --git a/packages/partysub/package.json b/packages/partysub/package.json index 26905362..a52508b9 100644 --- a/packages/partysub/package.json +++ b/packages/partysub/package.json @@ -46,7 +46,7 @@ }, "devDependencies": { "@cloudflare/workers-types": "^4.20260426.1", - "partyserver": "^0.5.5", - "partysocket": "^1.1.18" + "partyserver": "^0.5.6", + "partysocket": "^1.1.19" } } diff --git a/packages/partysync/package.json b/packages/partysync/package.json index a2cf2c51..96053c18 100644 --- a/packages/partysync/package.json +++ b/packages/partysync/package.json @@ -51,8 +51,8 @@ "devDependencies": { "@cloudflare/workers-types": "^4.20260426.1", "partyfn": "^0.1.0", - "partyserver": "^0.5.5", - "partysocket": "^1.1.18" + "partyserver": "^0.5.6", + "partysocket": "^1.1.19" }, "peerDependencies": { "@cloudflare/workers-types": "^4.20260424.1", diff --git a/packages/partywhen/package.json b/packages/partywhen/package.json index 9e1099cb..b10be359 100644 --- a/packages/partywhen/package.json +++ b/packages/partywhen/package.json @@ -29,6 +29,6 @@ "description": "A library for scheduling and running tasks in Cloudflare Workers", "dependencies": { "cron-parser": "^5.5.0", - "partyserver": "^0.5.5" + "partyserver": "^0.5.6" } } diff --git a/packages/y-partyserver/package.json b/packages/y-partyserver/package.json index 45f863e7..10234118 100644 --- a/packages/y-partyserver/package.json +++ b/packages/y-partyserver/package.json @@ -65,7 +65,7 @@ "@cloudflare/workers-types": "^4.20260426.1", "@types/lodash.debounce": "^4.0.9", "@types/node": "25.6.0", - "partyserver": "^0.5.5", + "partyserver": "^0.5.6", "ws": "^8.20.0", "yjs": "^13.6.30" }, From 4e99943c00f4a3814153790e2c1be73b4bb23564 Mon Sep 17 00:00:00 2001 From: Sunil Pai Date: Fri, 12 Jun 2026 12:18:23 +0100 Subject: [PATCH 3/3] format --- packages/partyserver/CHANGELOG.md | 7 ------- packages/partysocket/CHANGELOG.md | 2 -- 2 files changed, 9 deletions(-) diff --git a/packages/partyserver/CHANGELOG.md b/packages/partyserver/CHANGELOG.md index a8328462..29305a94 100644 --- a/packages/partyserver/CHANGELOG.md +++ b/packages/partyserver/CHANGELOG.md @@ -31,7 +31,6 @@ The fix is at the call site, not in PartyServer: pass `id: someBoundDONamespace.idFromName(facetName)` to `ctx.facets.get(...)`. The facet then gets its own native `ctx.id.name === facetName` and PartyServer's `name` getter does the right thing automatically. No `setName()` is required, no `__ps_name` storage record is written, and cold-wake recovery happens for free because the factory re-runs and `idFromName` is deterministic. This release adds: - - **A "Using PartyServer with Durable Object Facets" section in the README** that walks through the recommended pattern with a code example, calls out the implicit-id footgun explicitly, and documents that plain-string `id` values are not a substitute for `idFromName(facetName)` (workerd treats string ids as `idFromString`-like, so the resulting facet has no `ctx.id.name`). - **`setName()` docstring updated** to clarify that facets are NOT a `setName()` use case — point to the explicit-`id` pattern instead. The original `setName()` `ctx.id.name` mismatch throw is preserved as a typo guard for the `idFromName` happy path. - **End-to-end facet test coverage** against the real workerd `ctx.facets.get(...)` API. A `FacetParent` / `FacetChild` fixture exercises both the implicit-id path (pinning the runtime contract that `this.name` returns the parent's name in that flow — i.e., behavior-as-documentation so framework authors are unsurprised) and the explicit-id path (recommended; verifies that all reasonable id-construction strategies work and that cold wake recovers without any storage record). Plain-string `id` is also tested; the test asserts it does NOT carry a name, pinning the contract so callers don't get tempted by the type signature. @@ -60,7 +59,6 @@ ``` Backward compatible: - - For DOs addressed via `idFromName()` / `getByName()` (the happy path), `setName()` continues to NOT write storage — `ctx.id.name` is the source of truth and `setName()` is just a no-op-plus-onStart. - The pre-existing direct-storage-write pattern keeps working — the storage write becomes idempotent with what `setName()` would do. @@ -75,7 +73,6 @@ 0.5.0 moved the legacy storage hydrate into `alarm()` only, breaking Cloudflare Agents facets and any other framework that writes `__ps_name` directly before calling `__unsafe_ensureInitialized()`. Facet DOs are spawned via `ctx.facets.get(...)` rather than `idFromName()` and therefore have `ctx.id.name === undefined`; they relied on PartyServer reading the storage record back to populate `this.name` before `onStart()`. Changes: - - Move the legacy `__ps_name` hydrate from `alarm()` into `#ensureInitialized()`, still gated on `!ctx.id.name && !#_name` so it costs nothing on the happy path (normal `idFromName()`/`getByName()` DOs skip the storage read entirely). - `Server.fetch()` now delegates to `#ensureInitialized()` for the hydrate instead of doing its own. The `x-partykit-room` header fallback remains as a last resort when neither `ctx.id.name` nor a legacy storage record is available. - `Server.alarm()` is simplified — it no longer needs its own hydrate call since `#ensureInitialized()` handles it. @@ -90,7 +87,6 @@ Durable Objects now expose `ctx.id.name` on every entry point (constructor, fetch, alarm, hibernating websocket handlers) when the DO is addressed via `idFromName()`/`getByName()`. PartyServer now uses this as the primary source of `this.name`, which simplifies routing, eliminates storage writes, and makes `this.name` available inside the constructor. Changes in `partyserver`: - - `this.name` resolves from `this.ctx.id.name`. The apologetic `workerd#2240` error message is gone. - `this.name` is now available **inside the constructor** and from class field initializers, not just after `setName()`/`fetch()` has run. - `routePartykitRequest` no longer issues a `setName()`/`_initAndFetch()` RPC before `fetch()`. The WebSocket path goes from 2 RPCs to 1; the HTTP path remains 1 RPC. Props, when supplied, are delivered to the DO via the `x-partykit-props` request header, set after `onBeforeConnect`/`onBeforeRequest` hooks run. @@ -102,7 +98,6 @@ - When reading `this.name` throws, it is because `ctx.id.name` is undefined and no legacy fallback has populated the name: the DO was addressed via `idFromString()` or `newUniqueId()` (both unsupported), the runtime is too old to expose `ctx.id.name`, or a pre-2026-03-15 alarm fired before the legacy storage fallback ran. Changes in all affected packages (`partyserver`, `partysub`, `partysync`, `y-partyserver`, `hono-party`): - - `@cloudflare/workers-types` peer dependency bumped from `^4.20240729.0` to `^4.20260424.1`. The old range predates `ctx.id.name` in the type surface. Not supported: addressing PartyServer DOs via `idFromString()` or `newUniqueId()`. These paths return `ctx.id.name === undefined` inside the DO and will surface as a clear error from `this.name`. PartyServer has always assumed name-based addressing via `getServerByName` / `routePartykitRequest`; this release makes that assumption explicit. @@ -423,14 +418,12 @@ ### Patch Changes - [`528adea`](https://github.com/threepointone/partyserver/commit/528adeaced6dce6e888d2f54cc75c3569bf2c277) Thanks [@threepointone](https://github.com/threepointone)! - some fixes and tweaks - - getServerByName was throwing on all requests - `Env` is now an optional arg when defining `Server` - `y-partyserver/provider` can now take an optional `prefix` arg to use a custom url to connect - `routePartyKitRequest`/`getServerByName` now accepts `jurisdiction` bonus: - - added a bunch of fixtures - added stubs for docs diff --git a/packages/partysocket/CHANGELOG.md b/packages/partysocket/CHANGELOG.md index 353fc10e..847360eb 100644 --- a/packages/partysocket/CHANGELOG.md +++ b/packages/partysocket/CHANGELOG.md @@ -337,7 +337,6 @@ ### Patch Changes - [#251](https://github.com/partykit/partykit/pull/251) [`049bcac`](https://github.com/partykit/partykit/commit/049bcac42aa49e4bddec975c63b7d7984112e450) Thanks [@threepointone](https://github.com/threepointone)! - small tweaks to `init` - - replace `process.env.PARTYKIT_HOST` with just `PARTYKIT_HOST` - add a `tsconfig.json` - add partykit to devDependencies in `init` @@ -358,7 +357,6 @@ - [#211](https://github.com/partykit/partykit/pull/211) [`fffe721`](https://github.com/partykit/partykit/commit/fffe72148e5cc425e80c90b6bf180192df410080) Thanks [@threepointone](https://github.com/threepointone)! - update dependencies - [#191](https://github.com/partykit/partykit/pull/191) [`39cf5ce`](https://github.com/partykit/partykit/commit/39cf5cebf5e699bc50ace8b6d25cd82c807e863a) Thanks [@jevakallio](https://github.com/jevakallio)! - Improve PartySocket types and React hooks API: - - Add websocket lifecycle event handlers to usePartyKit options to reduce need for effects in userland - Allow usePartySocket to provide startClosed option to initialize without opening connection - Fix types for PartySocket#removeEventListener