From e1c742a31fdc793e2ae99d9e71a1c07278d5645d Mon Sep 17 00:00:00 2001 From: Ross Lawley Date: Tue, 19 May 2026 12:35:22 +0100 Subject: [PATCH] Close connection on responseTo mismatch to prevent pool reuse MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When ReplyMessage detects that the responseTo field in a server response does not match the expected requestId, close the underlying connection. Previously, MongoInternalException propagated as a regular error without closing the stream. Since PooledConnection.close() only prunes connections that are closed or match shouldPrune() criteria (generation, idle time, lifetime), the desynchronized connection was returned to the pool and could be handed to subsequent operations — risking further data corruption. A responseTo mismatch means the TCP stream is definitively desynchronized. The connection is unusable and must not be recycled. JAVA-6210 Co-Authored-By: Claude Opus 4.6 (1M context) --- .../connection/InternalStreamConnection.java | 10 ++++ ...ternalStreamConnectionSpecification.groovy | 50 +++++++++++++++++++ 2 files changed, 60 insertions(+) diff --git a/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java b/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java index 0cad654a73a..7188c9a4707 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java +++ b/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java @@ -583,6 +583,12 @@ private T receiveCommandMessageResponse(final Decoder decoder, final Comm } return commandResult; + } catch (MongoInternalException e) { + if (!commandSuccessful) { + commandEventSender.sendFailedEvent(e); + } + close(); + throw e; } catch (Exception e) { if (!commandSuccessful) { commandEventSender.sendFailedEvent(e); @@ -746,6 +752,10 @@ private void sendCommandMessageAsync(final int messageId, final Decoder d commandEventSender.sendSucceededEvent(responseBuffers); commandResult = getCommandResult(decoder, responseBuffers, messageId, operationContext.getTimeoutContext()); + } catch (MongoInternalException localThrowable) { + close(); + callback.onResult(null, localThrowable); + return; } catch (Throwable localThrowable) { callback.onResult(null, localThrowable); return; diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/InternalStreamConnectionSpecification.groovy b/driver-core/src/test/unit/com/mongodb/internal/connection/InternalStreamConnectionSpecification.groovy index 3cdabf31da3..9a68e91b646 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/connection/InternalStreamConnectionSpecification.groovy +++ b/driver-core/src/test/unit/com/mongodb/internal/connection/InternalStreamConnectionSpecification.groovy @@ -685,6 +685,56 @@ class InternalStreamConnectionSpecification extends Specification { !connection.isClosed() } + def 'should close the connection on a responseTo mismatch'() { + given: + def connection = getOpenedConnection() + def pingCommandDocument = new BsonDocument('ping', new BsonInt32(1)) + def commandMessage = new CommandMessage(database, pingCommandDocument, fieldNameValidator, primary(), messageSettings, MULTIPLE, + null) + def response = '{ok : 1}' + stream.getBuffer(1024) >> { new ByteBufNIO(ByteBuffer.wrap(new byte[1024])) } + // responseTo mismatches the actual requestId, simulating stream desynchronization + stream.read(16, _) >> helper.messageHeader(commandMessage.getId() + 1, response) + stream.read(_, _) >> helper.reply(response) + + when: + connection.sendAndReceive(commandMessage, new BsonDocumentCodec(), OPERATION_CONTEXT) + + then: + thrown(MongoInternalException) + connection.isClosed() + } + + def 'should close the connection on an asynchronous responseTo mismatch'() { + given: + def connection = getOpenedConnection() + def pingCommandDocument = new BsonDocument('ping', new BsonInt32(1)) + def commandMessage = new CommandMessage(database, pingCommandDocument, fieldNameValidator, primary(), messageSettings, MULTIPLE, + null) + def callback = new FutureResultCallback() + def response = '{ok : 1}' + + stream.getBuffer(1024) >> { new ByteBufNIO(ByteBuffer.wrap(new byte[1024])) } + stream.writeAsync(_, _, _) >> { buffers, operationContext, handler -> + handler.completed(null) + } + stream.readAsync(16, _, _) >> { numBytes, operationContext, handler -> + // responseTo mismatches the actual requestId, simulating stream desynchronization + handler.completed(helper.defaultMessageHeader(commandMessage.getId() + 1)) + } + stream.readAsync(_, _, _) >> { numBytes, operationContext, handler -> + handler.completed(helper.defaultReply()) + } + + when: + connection.sendAndReceiveAsync(commandMessage, new BsonDocumentCodec(), OPERATION_CONTEXT, callback) + callback.get() + + then: + thrown(MongoInternalException) + connection.isClosed() + } + def 'should notify all asynchronous writers of an exception'() { given: int numberOfOperations = 3