diff --git a/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java b/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java index 0cad654a73..7188c9a470 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java +++ b/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java @@ -583,6 +583,12 @@ private T receiveCommandMessageResponse(final Decoder decoder, final Comm } return commandResult; + } catch (MongoInternalException e) { + if (!commandSuccessful) { + commandEventSender.sendFailedEvent(e); + } + close(); + throw e; } catch (Exception e) { if (!commandSuccessful) { commandEventSender.sendFailedEvent(e); @@ -746,6 +752,10 @@ private void sendCommandMessageAsync(final int messageId, final Decoder d commandEventSender.sendSucceededEvent(responseBuffers); commandResult = getCommandResult(decoder, responseBuffers, messageId, operationContext.getTimeoutContext()); + } catch (MongoInternalException localThrowable) { + close(); + callback.onResult(null, localThrowable); + return; } catch (Throwable localThrowable) { callback.onResult(null, localThrowable); return; diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/InternalStreamConnectionSpecification.groovy b/driver-core/src/test/unit/com/mongodb/internal/connection/InternalStreamConnectionSpecification.groovy index 3cdabf31da..9a68e91b64 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/connection/InternalStreamConnectionSpecification.groovy +++ b/driver-core/src/test/unit/com/mongodb/internal/connection/InternalStreamConnectionSpecification.groovy @@ -685,6 +685,56 @@ class InternalStreamConnectionSpecification extends Specification { !connection.isClosed() } + def 'should close the connection on a responseTo mismatch'() { + given: + def connection = getOpenedConnection() + def pingCommandDocument = new BsonDocument('ping', new BsonInt32(1)) + def commandMessage = new CommandMessage(database, pingCommandDocument, fieldNameValidator, primary(), messageSettings, MULTIPLE, + null) + def response = '{ok : 1}' + stream.getBuffer(1024) >> { new ByteBufNIO(ByteBuffer.wrap(new byte[1024])) } + // responseTo mismatches the actual requestId, simulating stream desynchronization + stream.read(16, _) >> helper.messageHeader(commandMessage.getId() + 1, response) + stream.read(_, _) >> helper.reply(response) + + when: + connection.sendAndReceive(commandMessage, new BsonDocumentCodec(), OPERATION_CONTEXT) + + then: + thrown(MongoInternalException) + connection.isClosed() + } + + def 'should close the connection on an asynchronous responseTo mismatch'() { + given: + def connection = getOpenedConnection() + def pingCommandDocument = new BsonDocument('ping', new BsonInt32(1)) + def commandMessage = new CommandMessage(database, pingCommandDocument, fieldNameValidator, primary(), messageSettings, MULTIPLE, + null) + def callback = new FutureResultCallback() + def response = '{ok : 1}' + + stream.getBuffer(1024) >> { new ByteBufNIO(ByteBuffer.wrap(new byte[1024])) } + stream.writeAsync(_, _, _) >> { buffers, operationContext, handler -> + handler.completed(null) + } + stream.readAsync(16, _, _) >> { numBytes, operationContext, handler -> + // responseTo mismatches the actual requestId, simulating stream desynchronization + handler.completed(helper.defaultMessageHeader(commandMessage.getId() + 1)) + } + stream.readAsync(_, _, _) >> { numBytes, operationContext, handler -> + handler.completed(helper.defaultReply()) + } + + when: + connection.sendAndReceiveAsync(commandMessage, new BsonDocumentCodec(), OPERATION_CONTEXT, callback) + callback.get() + + then: + thrown(MongoInternalException) + connection.isClosed() + } + def 'should notify all asynchronous writers of an exception'() { given: int numberOfOperations = 3