Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion src/mcp/client/stdio.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import logging
import os
import subprocess
import sys
from collections.abc import AsyncGenerator
from contextlib import asynccontextmanager, suppress
Expand Down Expand Up @@ -181,6 +182,21 @@ async def stdin_writer() -> None:
finally:
writer_done.set()

async def stderr_reader() -> None:
stderr = getattr(process, "stderr", None)
if stderr is None:
return

try:
async for chunk in TextReceiveStream(
stderr, encoding=server.encoding, errors=server.encoding_error_handler
):
errlog.write(chunk)
errlog.flush()
except (anyio.ClosedResourceError, anyio.BrokenResourceError, ConnectionError, OSError):
if not shutting_down: # pragma: no branch
logger.exception("Reading from the MCP server's stderr failed mid-session")

async def shutdown() -> None:
"""Winds the transport down: stop traffic, flush, stop the server, release the streams."""
# Unblock the reader into its drain: a server stuck writing stdout cannot
Expand All @@ -200,6 +216,7 @@ async def shutdown() -> None:
async with anyio.create_task_group() as tg:
tg.start_soon(stdout_reader)
tg.start_soon(stdin_writer)
tg.start_soon(stderr_reader)
try:
yield read_stream, write_stream
finally:
Expand Down Expand Up @@ -264,6 +281,9 @@ async def _stop_server_process(process: ServerProcess) -> None:
close_process_job(process)
# A kill survivor can hold the stdout pipe open; poison the reader anyway.
await _close_pipe(process.stdout)
stderr = getattr(process, "stderr", None)
if stderr is not None:
await _close_pipe(stderr)
_close_subprocess_transport(process)


Expand Down Expand Up @@ -342,7 +362,7 @@ async def _create_platform_compatible_process(
return await anyio.open_process(
[command, *args],
env=env,
stderr=errlog,
stderr=subprocess.PIPE,
cwd=cwd,
start_new_session=True,
)
Expand Down
62 changes: 61 additions & 1 deletion tests/client/test_stdio.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import errno
import gc
import io
import logging
import math
import os
Expand All @@ -25,7 +26,7 @@
import pytest
import trio
import trio.testing
from anyio.streams.memory import MemoryObjectReceiveStream
from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream

from mcp.client import stdio
from mcp.client._transport import ReadStream
Expand Down Expand Up @@ -136,7 +137,9 @@ def __init__(
stdin_send_gate: anyio.Event | None = None,
stdout_eof_error: Exception | None = None,
stdout_aclose_error: Exception | None = None,
stderr_eof_error: Exception | None = None,
on_stdout_receive: Callable[[], None] | None = None,
with_stderr: bool = False,
) -> None:
self._stdout_send, stdout_receive = anyio.create_memory_object_stream[bytes](math.inf)
self.stdout = _FakeStdout(
Expand All @@ -145,6 +148,16 @@ def __init__(
aclose_error=stdout_aclose_error,
on_receive=self._dispatch_stdout_receive,
)
self._stderr_send: MemoryObjectSendStream[bytes] | None = None
self.stderr: _FakeStdout | None = None
if with_stderr:
self._stderr_send, stderr_receive = anyio.create_memory_object_stream[bytes](math.inf)
self.stderr = _FakeStdout(
stderr_receive,
eof_error=stderr_eof_error,
aclose_error=None,
on_receive=lambda: None,
)
self.pid = 424242
self.written: list[bytes] = []
self.stdin_closed = anyio.Event()
Expand All @@ -170,10 +183,21 @@ def close_stdout(self) -> None:
"""End the fake process's stdout, as the kernel does when it dies."""
self._stdout_send.close()

async def feed_stderr(self, data: bytes) -> None:
"""Make `data` readable on the fake process's stderr."""
assert self._stderr_send is not None
await self._stderr_send.send(data)

def close_stderr(self) -> None:
"""End the fake process's stderr, as the kernel does when it dies."""
if self._stderr_send is not None:
self._stderr_send.close()

def exit(self, code: int = 0) -> None:
"""Die: set the exit code and EOF stdout, as the kernel does."""
self.returncode = code
self.close_stdout()
self.close_stderr()

def pending_stdout_chunks(self) -> int:
"""How many fed chunks the client has not yet pulled off the fake stdout."""
Expand Down Expand Up @@ -225,6 +249,42 @@ async def _next_message(read_stream: ReadStream[SessionMessage | Exception]) ->
return received.message


@pytest.mark.anyio
async def test_server_stderr_is_forwarded_to_errlog(monkeypatch: pytest.MonkeyPatch) -> None:
"""Server stderr is piped and forwarded, so notebook-like errlogs still see it."""
process = FakeProcess(with_stderr=True)
install_fake_process(monkeypatch, process)
errlog = io.StringIO()

async with stdio_client(FAKE_PARAMS, errlog=errlog):
await process.feed_stderr(b"starting server\n")
with anyio.fail_after(1):
while errlog.getvalue() != "starting server\n":
await anyio.sleep(0)
process.exit()


@pytest.mark.anyio
async def test_mid_session_stderr_failure_is_logged(
monkeypatch: pytest.MonkeyPatch,
caplog: pytest.LogCaptureFixture,
) -> None:
"""A broken stderr pipe is logged, but does not escape the client context."""
process = FakeProcess(
on_stdin_close=lambda: process.exit(0),
stderr_eof_error=ConnectionError("stderr pipe failed"),
with_stderr=True,
)
install_fake_process(monkeypatch, process)

with anyio.fail_after(5):
async with stdio_client(FAKE_PARAMS):
process.close_stderr()
with anyio.fail_after(1): # pragma: no branch
while "stderr failed mid-session" not in caplog.text:
await anyio.sleep(0)


@pytest.mark.anyio
async def test_messages_split_and_packed_across_chunks_are_reframed(monkeypatch: pytest.MonkeyPatch) -> None:
"""Framing survives arbitrary chunk boundaries.
Expand Down
Loading