From 0984bcaa61b12d52bcea5695b27ac7b6065ba53e Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Mon, 18 May 2026 19:16:57 +0200 Subject: [PATCH 1/2] http2: emit session close before stream close PR-URL: https://github.com/nodejs/node/pull/63414 Signed-off-by: Matteo Collina --- lib/internal/http2/core.js | 37 ++++++++++--- ...lient-session-close-before-stream-close.js | 54 +++++++++++++++++++ 2 files changed, 84 insertions(+), 7 deletions(-) create mode 100644 test/parallel/test-http2-client-session-close-before-stream-close.js diff --git a/lib/internal/http2/core.js b/lib/internal/http2/core.js index 273ddd15414b51..a4b4afe1e3a13c 100644 --- a/lib/internal/http2/core.js +++ b/lib/internal/http2/core.js @@ -343,6 +343,8 @@ const SESSION_FLAGS_PENDING = 0x0; const SESSION_FLAGS_READY = 0x1; const SESSION_FLAGS_CLOSED = 0x2; const SESSION_FLAGS_DESTROYED = 0x4; +const SESSION_FLAGS_CLOSE_QUEUED = 0x8; +const SESSION_FLAGS_CLOSE_EMITTED = 0x10; // Top level to avoid creating a closure function emit(self, ...args) { @@ -1153,14 +1155,29 @@ function setupHandle(socket, type, options) { process.nextTick(emit, this, 'connect', this, socket); } -// Emits a close event followed by an error event if err is truthy. Used +// Emits an error event followed by a close event if err is truthy. Used // by Http2Session.prototype.destroy() function emitClose(self, error) { + const state = self[kState]; + if (state.flags & SESSION_FLAGS_CLOSE_EMITTED) + return; + + state.flags |= SESSION_FLAGS_CLOSE_EMITTED; + if (error) self.emit('error', error); self.emit('close'); } +function emitCloseNextTick(self, error) { + const state = self[kState]; + if (state.flags & (SESSION_FLAGS_CLOSE_QUEUED | SESSION_FLAGS_CLOSE_EMITTED)) + return; + + state.flags |= SESSION_FLAGS_CLOSE_QUEUED; + process.nextTick(emitClose, self, error); +} + function cleanupSession(session) { const socket = session[kSocket]; const handle = session[kHandle]; @@ -1209,7 +1226,7 @@ function finishSessionClose(session, error) { } }); } else { - process.nextTick(emitClose, session, error); + emitCloseNextTick(session, error); } } @@ -1224,6 +1241,16 @@ function closeSession(session, code, error) { session.setTimeout(0); session.removeAllListeners('timeout'); + const socket = session[kSocket]; + const handle = session[kHandle]; + const socketDestroyed = socket?.destroyed === true; + + // If the transport has already closed, queue the session close event before + // stream callbacks are scheduled so clients can invalidate cached sessions + // before associated streams finish closing. + if (socketDestroyed) + emitCloseNextTick(session, error); + // Destroy any pending and open streams if (state.pendingStreams.size > 0 || state.streams.size > 0) { const cancel = new ERR_HTTP2_STREAM_CANCEL(error); @@ -1231,14 +1258,10 @@ function closeSession(session, code, error) { state.streams.forEach((stream) => stream.destroy(error)); } - // Disassociate from the socket and server. - const socket = session[kSocket]; - const handle = session[kHandle]; - // Destroy the handle if it exists at this point. if (handle !== undefined) { handle.ondone = finishSessionClose.bind(null, session, error); - handle.destroy(code, socket.destroyed); + handle.destroy(code, socketDestroyed); } else { finishSessionClose(session, error); } diff --git a/test/parallel/test-http2-client-session-close-before-stream-close.js b/test/parallel/test-http2-client-session-close-before-stream-close.js new file mode 100644 index 00000000000000..35deb385551713 --- /dev/null +++ b/test/parallel/test-http2-client-session-close-before-stream-close.js @@ -0,0 +1,54 @@ +'use strict'; + +const common = require('../common'); +if (!common.hasCrypto) + common.skip('missing crypto'); + +const assert = require('assert'); +const http2 = require('http2'); + +const server = http2.createServer(); +let serverSocket; + +server.on('connection', common.mustCall((socket) => { + serverSocket = socket; + socket.on('error', () => {}); +})); + +server.on('sessionError', () => {}); +server.on('stream', common.mustCall((stream, headers) => { + if (headers[':path'] === '/close') { + stream.respond({ ':status': 200 }); + stream.write('partial', common.mustCall(() => { + setImmediate(() => serverSocket.destroy()); + })); + return; + } + + stream.respond({ ':status': 200 }); + stream.end('ok'); +})); + +server.listen(0, common.mustCall(() => { + const session = http2.connect(`http://localhost:${server.address().port}`); + let cachedSession = session; + const events = []; + + session.on('error', common.mustNotCall()); + session.on('close', common.mustCall(() => { + events.push('session-close'); + cachedSession = undefined; + server.close(); + })); + + const req = session.request({ ':path': '/close' }); + req.on('response', common.mustCall()); + req.on('close', common.mustCall(() => { + events.push('stream-close'); + assert.strictEqual(session.closed, true); + assert.strictEqual(session.destroyed, true); + assert.strictEqual(cachedSession, undefined); + assert.deepStrictEqual(events, ['session-close', 'stream-close']); + })); + req.resume(); +})); From 5fc94f750570ffcc3d53e687cd8b0355e8007eae Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Sun, 24 May 2026 13:32:10 +0200 Subject: [PATCH 2/2] http2: make request() invalid-state errors async PR-URL: https://github.com/nodejs/node/pull/63414 Signed-off-by: Matteo Collina --- doc/api/http2.md | 8 +- lib/internal/http2/core.js | 73 ++++++++----------- test/parallel/test-http2-client-destroy.js | 15 +++- ...lient-session-close-before-stream-close.js | 21 ++++-- 4 files changed, 62 insertions(+), 55 deletions(-) diff --git a/doc/api/http2.md b/doc/api/http2.md index b179743d85a121..3f810168e6ff85 100644 --- a/doc/api/http2.md +++ b/doc/api/http2.md @@ -1113,10 +1113,12 @@ creates and returns an `Http2Stream` instance that can be used to send an HTTP/2 request to the connected server. When a `ClientHttp2Session` is first created, the socket may not yet be -connected. if `clienthttp2session.request()` is called during this time, the +connected. If `clienthttp2session.request()` is called during this time, the actual request will be deferred until the socket is ready to go. -If the `session` is closed before the actual request be executed, an -`ERR_HTTP2_GOAWAY_SESSION` is thrown. + +If the session becomes unavailable before the request can be created, the +returned stream will emit `ERR_HTTP2_GOAWAY_SESSION` or +`ERR_HTTP2_INVALID_SESSION` asynchronously. This method is only available if `http2session.type` is equal to `http2.constants.NGHTTP2_SESSION_CLIENT`. diff --git a/lib/internal/http2/core.js b/lib/internal/http2/core.js index a4b4afe1e3a13c..1c6edd65cae8f0 100644 --- a/lib/internal/http2/core.js +++ b/lib/internal/http2/core.js @@ -343,8 +343,6 @@ const SESSION_FLAGS_PENDING = 0x0; const SESSION_FLAGS_READY = 0x1; const SESSION_FLAGS_CLOSED = 0x2; const SESSION_FLAGS_DESTROYED = 0x4; -const SESSION_FLAGS_CLOSE_QUEUED = 0x8; -const SESSION_FLAGS_CLOSE_EMITTED = 0x10; // Top level to avoid creating a closure function emit(self, ...args) { @@ -840,6 +838,10 @@ function requestOnConnect(headersList, options) { } } +function requestOnError(error) { + this.destroy(error); +} + // Validates that priority options are correct, specifically: // 1. options.weight must be a number // 2. options.parent must be a positive number @@ -1158,26 +1160,11 @@ function setupHandle(socket, type, options) { // Emits an error event followed by a close event if err is truthy. Used // by Http2Session.prototype.destroy() function emitClose(self, error) { - const state = self[kState]; - if (state.flags & SESSION_FLAGS_CLOSE_EMITTED) - return; - - state.flags |= SESSION_FLAGS_CLOSE_EMITTED; - if (error) self.emit('error', error); self.emit('close'); } -function emitCloseNextTick(self, error) { - const state = self[kState]; - if (state.flags & (SESSION_FLAGS_CLOSE_QUEUED | SESSION_FLAGS_CLOSE_EMITTED)) - return; - - state.flags |= SESSION_FLAGS_CLOSE_QUEUED; - process.nextTick(emitClose, self, error); -} - function cleanupSession(session) { const socket = session[kSocket]; const handle = session[kHandle]; @@ -1226,7 +1213,7 @@ function finishSessionClose(session, error) { } }); } else { - emitCloseNextTick(session, error); + process.nextTick(emitClose, session, error); } } @@ -1243,13 +1230,6 @@ function closeSession(session, code, error) { const socket = session[kSocket]; const handle = session[kHandle]; - const socketDestroyed = socket?.destroyed === true; - - // If the transport has already closed, queue the session close event before - // stream callbacks are scheduled so clients can invalidate cached sessions - // before associated streams finish closing. - if (socketDestroyed) - emitCloseNextTick(session, error); // Destroy any pending and open streams if (state.pendingStreams.size > 0 || state.streams.size > 0) { @@ -1261,7 +1241,7 @@ function closeSession(session, code, error) { // Destroy the handle if it exists at this point. if (handle !== undefined) { handle.ondone = finishSessionClose.bind(null, session, error); - handle.destroy(code, socketDestroyed); + handle.destroy(code, socket.destroyed); } else { finishSessionClose(session, error); } @@ -1832,11 +1812,15 @@ class ClientHttp2Session extends Http2Session { request(headersParam, options) { debugSessionObj(this, 'initiating request'); - if (this.destroyed) - throw new ERR_HTTP2_INVALID_SESSION(); - - if (this.closed) - throw new ERR_HTTP2_GOAWAY_SESSION(); + // Keep argument validation synchronous, but defer session-state failures + // to the returned stream so request retries from stream callbacks do not + // throw before session lifecycle handlers run. + let requestError; + if (this.destroyed) { + requestError = new ERR_HTTP2_INVALID_SESSION(); + } else if (this.closed) { + requestError = new ERR_HTTP2_GOAWAY_SESSION(); + } this[kUpdateTimer](); @@ -1922,19 +1906,24 @@ class ClientHttp2Session extends Http2Session { } } - const onConnect = reqAsync.bind(requestOnConnect.bind(stream, headersList, options)); - if (this.connecting) { - if (this[kPendingRequestCalls] !== null) { - this[kPendingRequestCalls].push(onConnect); + if (requestError) { + process.nextTick(reqAsync.bind(requestOnError.bind(stream, requestError))); + } else { + const onConnect = reqAsync.bind( + requestOnConnect.bind(stream, headersList, options)); + if (this.connecting) { + if (this[kPendingRequestCalls] !== null) { + this[kPendingRequestCalls].push(onConnect); + } else { + this[kPendingRequestCalls] = [onConnect]; + this.once('connect', () => { + this[kPendingRequestCalls].forEach((f) => f()); + this[kPendingRequestCalls] = null; + }); + } } else { - this[kPendingRequestCalls] = [onConnect]; - this.once('connect', () => { - this[kPendingRequestCalls].forEach((f) => f()); - this[kPendingRequestCalls] = null; - }); + onConnect(); } - } else { - onConnect(); } if (onClientStreamCreatedChannel.hasSubscribers) { diff --git a/test/parallel/test-http2-client-destroy.js b/test/parallel/test-http2-client-destroy.js index ff98c23e864f74..7034f98abb6836 100644 --- a/test/parallel/test-http2-client-destroy.js +++ b/test/parallel/test-http2-client-destroy.js @@ -81,7 +81,19 @@ const { listenerCount } = require('events'); assert.throws(() => client.ping(), sessionError); assert.throws(() => client.settings({}), sessionError); assert.throws(() => client.goaway(), sessionError); - assert.throws(() => client.request(), sessionError); + + const pendingReq = client.request(); + pendingReq.on('response', common.mustNotCall()); + pendingReq.on('error', common.expectsError(sessionError)); + pendingReq.on('close', common.mustCall()); + + client.on('close', common.mustCall(() => { + const postCloseReq = client.request(); + postCloseReq.on('response', common.mustNotCall()); + postCloseReq.on('error', common.expectsError(sessionError)); + postCloseReq.on('close', common.mustCall()); + })); + client.close(); // Should be a non-op at this point // Wait for setImmediate call from destroy() to complete @@ -92,7 +104,6 @@ const { listenerCount } = require('events'); assert.throws(() => client.ping(), sessionError); assert.throws(() => client.settings({}), sessionError); assert.throws(() => client.goaway(), sessionError); - assert.throws(() => client.request(), sessionError); client.close(); // Should be a non-op at this point })); diff --git a/test/parallel/test-http2-client-session-close-before-stream-close.js b/test/parallel/test-http2-client-session-close-before-stream-close.js index 35deb385551713..5ebcd8522dfb41 100644 --- a/test/parallel/test-http2-client-session-close-before-stream-close.js +++ b/test/parallel/test-http2-client-session-close-before-stream-close.js @@ -32,23 +32,28 @@ server.on('stream', common.mustCall((stream, headers) => { server.listen(0, common.mustCall(() => { const session = http2.connect(`http://localhost:${server.address().port}`); let cachedSession = session; - const events = []; - session.on('error', common.mustNotCall()); + session.on('error', () => {}); session.on('close', common.mustCall(() => { - events.push('session-close'); cachedSession = undefined; server.close(); })); const req = session.request({ ':path': '/close' }); req.on('response', common.mustCall()); + req.on('error', () => {}); req.on('close', common.mustCall(() => { - events.push('stream-close'); - assert.strictEqual(session.closed, true); - assert.strictEqual(session.destroyed, true); - assert.strictEqual(cachedSession, undefined); - assert.deepStrictEqual(events, ['session-close', 'stream-close']); + // This must not throw synchronously even though the session is no longer + // usable. Depending on teardown timing, the returned stream may report a + // closed session before the destroy state is fully observable here. + const req2 = session.request({ ':path': '/again' }); + + req2.on('error', common.mustCall((err) => { + assert.ok( + err.code === 'ERR_HTTP2_INVALID_SESSION' || + err.code === 'ERR_HTTP2_GOAWAY_SESSION'); + assert.strictEqual(cachedSession, undefined); + })); })); req.resume(); }));