Skip to content

refactor(proxy): split monolithic proxy into modular serve/proxy package#4647

Open
lvhan028 wants to merge 10 commits into
InternLM:mainfrom
lvhan028:refactor-proxy-v2
Open

refactor(proxy): split monolithic proxy into modular serve/proxy package#4647
lvhan028 wants to merge 10 commits into
InternLM:mainfrom
lvhan028:refactor-proxy-v2

Conversation

@lvhan028

@lvhan028 lvhan028 commented Jun 4, 2026

Copy link
Copy Markdown
Collaborator

LMDeploy Proxy Server Refactor

Motivation

The legacy proxy lived in a single ~950-line module (lmdeploy/serve/proxy/proxy.py) that mixed HTTP routing, replica registry, health checking, load balancing, request forwarding, DistServe orchestration, and error handling. That layout made the code hard to test, extend, and reason about.

Concrete problems on main included:

  • Monolithic structure — no clear boundaries between admin APIs, OpenAI inference APIs, routing, and upstream I/O.
  • Opaque error semantics — proxy-specific numeric error codes (e.g. 10400/10401/10402) on the inference path instead of standard HTTP status codes and OpenAI-style error bodies.
  • Weak registration guaranteesapi_server could register with the proxy before its HTTP listener was ready; /nodes/add did not validate replicas by probing /v1/models.
  • Misaligned health checks — eviction logic did not consistently follow api_server /health JSON semantics (healthy, sleeping, initializing, etc.).
  • Per-request HTTP clients — upstream forwarding did not share a connection pool, increasing latency and socket churn under load.
  • No client-disconnect propagation — when a downstream client dropped during inference, the proxy could keep the upstream api_server connection open and leave inflight counters elevated.
  • Limited test coverage — almost no focused unit tests for registry, routing, health, or forwarding edge cases.

The refactor preserves existing user-facing CLI entry points (lmdeploy serve proxy) and the external API surface (/v1/*, /nodes/*, DistServe admin routes) while improving modularity, HTTP correctness, and operability.

Design

Package layout

The proxy is split into a small package under lmdeploy/serve/proxy/:

serve/proxy/
├── app.py                 # FastAPI app, middleware, shared aiohttp session (lifespan)
├── cli.py                 # uvicorn entry
├── runtime.py             # Dependency wiring (ProxyRuntime)
├── core/
│   ├── config.py          # ProxyConfig, RoutingStrategy
│   └── replica.py         # ReplicaLoad, ReplicaRegistration
├── endpoint/
│   ├── admin.py           # /nodes/*
│   ├── openai.py          # /v1/models, /v1/chat/completions, /v1/completions
│   └── distserve.py       # /distserve/*
├── registry/
│   ├── pool.py            # ReplicaPool (in-memory registry + inflight counters)
│   └── health_checker.py  # Background /health probes + eviction
├── routing/
│   └── selector.py        # Replica selection (random / latency-based)
├── dispatch/
│   ├── base.py            # ProxyContext, shared response helpers
│   ├── hybrid.py          # Single-replica pass-through
│   └── distserve.py       # Prefill → decode disaggregated path
├── upstream/
│   ├── forwarder.py       # aiohttp forwarding (buffer + stream generators)
│   ├── disconnect.py      # Client-disconnect racing helpers
│   └── exceptions.py      # APIServerException (upstream error carrier)
├── metrics/
│   └── load_tracker.py    # InflightTracker → pool inflight_start/finish
└── streaming_response.py  # ProxyStreamingResponse (stream errors + disconnect)

Runtime wiring

ProxyRuntime is constructed once per process inside the FastAPI lifespan and stored on app.state.runtime. It owns:

Component Responsibility
ReplicaPool Thread-safe replica registry, model list aggregation, inflight/latency per URL
HealthChecker Periodic GET {replica}/health, eviction after consecutive failures
ReplicaSelector Pick a replica URL by model name and EngineRole
UpstreamForwarder Shared aiohttp.ClientSession POST to replicas
InflightTracker Bookkeeping for routing metrics (unfinished, rolling latency)
HybridDispatcher / DistServeDispatcher Strategy-specific request orchestration

Request flow (Hybrid)

Client → /v1/chat/completions
       → openai endpoint builds ProxyContext
       → HybridDispatcher.dispatch()
            → ReplicaSelector.select(model, Hybrid)
            → InflightTracker.start(url)
            → UpstreamForwarder.forward_raw_{buffer|stream}()
            → JSONResponse | ProxyStreamingResponse
            → InflightTracker.finish(url)  [buffer: finally; stream: on_complete]

DistServe follows the same pattern with an optional prefill phase, PD connection warmup, and migration_request injection before decode.

Registration and health

Registration (POST /nodes/add)

  1. Probe GET {url}/v1/models and require a non-empty model list (502 if unreachable or empty).
  2. If the client declares models in status.models, validate they exist in the probe result (400 on mismatch).
  3. Merge discovered models into ReplicaLoad and upsert into ReplicaPool.

Auto-registration (api_server --proxy-url)

Registration is deferred until the local HTTP server accepts connections (_wait_until_listening on /health), then posted asynchronously via asyncio.create_task so startup is non-blocking.

Health checking

  • Probe GET {url}/health and parse JSON status / message.
  • Treat healthy and sleeping as serving; initializing as transient (do not evict immediately).
  • Evict after _FAILURE_THRESHOLD (default 2) consecutive non-serving results.
  • Interval and timeout configurable via LMDEPLOY_HEALTH_CHECK_INTERVAL / LMDEPLOY_HEALTH_CHECK_TIMEOUT.

Error model

Layer Behavior
Inference path Standard HTTP status (404, 502, 503) + OpenAI ErrorResponse JSON via create_error_response
Admin path FastAPI HTTPException with appropriate status (400, 404, 502)
Upstream errors APIServerException carries status/body/headers through the forwarder; converted at the dispatcher or in ProxyStreamingResponse for streams

Proxy-specific numeric error codes are removed from the inference path.

Client disconnect handling

Downstream disconnect must release upstream resources without calling abort_request (production cannot enable enable_abort_handling).

Non-streaming (buffer)

race_awaitable_with_disconnect() runs the upstream post + read coroutine in parallel with is_disconnected() polling. On disconnect, the upstream task is cancelled (closing the aiohttp connection) and the forwarder returns None. Dispatchers finish inflight tracking in finally and return without building a response.

Streaming

ProxyStreamingResponse uses a hybrid strategy for performance on high chunk rates:

  1. First chunk — race __anext__() against disconnect (prefill / time-to-first-token is the longest wait).
  2. Subsequent chunks — lightweight is_disconnected() check between chunks, then plain __anext__() (no per-chunk task allocation).
  3. On disconnect or CancelledErroraclose() the upstream async generator to close the replica connection; on_complete always runs in finally to decrement unfinished.

Shared upstream HTTP session

A single aiohttp.ClientSession is created in the app lifespan with configurable AIOHTTP_TIMEOUT, AIOHTTP_LIMIT, and AIOHTTP_LIMIT_PER_HOST. All replicas share this pool.

Authentication

When --api-keys is set, AuthenticationMiddleware protects paths under /v1/* only. Admin routes under /nodes/* remain open so replicas can register without knowing the inference API key.

Configuration

ProxyConfig centralizes:

  • serving_strategyHybrid or DistServe
  • routing_strategyrandom, min_expected_latency, min_observed_latency
  • DistServe options — migration_protocol, rdma_config, dummy_prefill
  • Server — host, port, SSL, log level, API keys

RoutingStrategy lives in core/config.py (moved from the removed utils.py barrel module).

Modifications (vs main)

Structural

  • Deleted monolithic proxy.py and utils.py.
  • Added modular package (see layout above) with ProxyRuntime dependency injection.
  • Moved RoutingStrategycore/config.py; APIServerExceptionupstream/exceptions.py.
  • Colocated latency window constant on ReplicaLoad (_LATENCY_DEQUE_LEN = 15).

Registry and admin API

  • ReplicaPool.add() upserts by URL (internal _discard before insert).
  • ReplicaPool.remove() raises ReplicaNotFoundError → HTTP 404.
  • POST /nodes/add probes /v1/models; unreachable → 502, model mismatch → 400.
  • POST /nodes/terminate calls replica GET /terminate then removes from pool; missing replica → 404.

Inference and forwarding

  • UpstreamForwarder implements forward_{json,raw}_{buffer,stream} with shared session.
  • ProxyStreamingResponse handles upstream HTTP errors on the first stream chunk and client-disconnect cleanup.
  • Dispatchers use response_from_api_exception() for non-stream upstream failures.
  • Removed BackgroundTasks for stream inflight completion; replaced with on_complete callback.

api_server integration

  • Proxy registration waits until HTTP is listening.
  • Registration runs in a background task after the engine loop starts.

Documentation and metrics

  • Metrics docs updated to reference the new module paths.

Tests

Added focused unit tests under tests/test_lmdeploy/serve/proxy/:

Module Coverage
test_admin.py /nodes/add success and failure paths, remove, terminate
test_pool.py add, replace, remove
test_health_checker.py probe states, eviction threshold, initializing tolerance, recovery
test_selector.py model-based replica selection
test_dispatch.py safe_json_load, upstream error mapping, inflight finish
test_forwarder.py buffer disconnect cancels upstream aiohttp
test_streaming_response.py first-chunk disconnect, mid-stream disconnect, upstream error on stream

Functional tests for the full proxy + api_server stack live in workspace/proxy/run_functional_tests.py (F01–F20), including client-disconnect end-to-end validation (F20).

Copilot AI review requested due to automatic review settings June 4, 2026 12:52

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR refactors the proxy server implementation by splitting the former monolithic lmdeploy/serve/proxy/proxy.py into a modular lmdeploy/serve/proxy package with separated concerns (app wiring, routing/selection, replica registry, dispatchers, upstream forwarding), and updates CLI/docs accordingly.

Changes:

  • Introduces a create_app()-based FastAPI proxy application with a wired ProxyRuntime and modular endpoints/dispatchers.
  • Adds modular routing (ReplicaSelector), replica registry (ReplicaPool + heartbeat), and upstream forwarding (UpstreamForwarder) components.
  • Updates CLI entrypoints and documentation to use the new proxy app factory, and adds a basic selector unit test.

Reviewed changes

Copilot reviewed 33 out of 34 changed files in this pull request and generated 7 comments.

Show a summary per file
File Description
tests/test_lmdeploy/test_proxy/test_selector.py Adds a unit test for replica selection behavior.
lmdeploy/serve/proxy/utils.py Removes env-based aiohttp timeout setup from utils (moved into app wiring).
lmdeploy/serve/proxy/upstream/forwarder.py Adds shared-session upstream request forwarding utilities.
lmdeploy/serve/proxy/upstream/init.py Exposes upstream forwarder in package API.
lmdeploy/serve/proxy/runtime.py Wires proxy dependencies (pool/selector/forwarder/dispatchers).
lmdeploy/serve/proxy/routing/selector.py Implements routing strategies for replica selection.
lmdeploy/serve/proxy/routing/init.py Exposes routing selector in package API.
lmdeploy/serve/proxy/registry/pool.py Implements in-memory replica registry and inflight tracking integration.
lmdeploy/serve/proxy/registry/heartbeat.py Adds background heartbeat eviction loop.
lmdeploy/serve/proxy/registry/init.py Exposes replica pool in package API.
lmdeploy/serve/proxy/proxy.py Removes the legacy monolithic proxy implementation.
lmdeploy/serve/proxy/metrics/load_tracker.py Adds a small adapter for inflight tracking calls.
lmdeploy/serve/proxy/metrics/init.py Exposes inflight tracker in package API.
lmdeploy/serve/proxy/endpoint/openai.py Implements OpenAI-compatible endpoints using the new runtime/dispatchers.
lmdeploy/serve/proxy/endpoint/distserve.py Adds distserve management endpoints to new app.
lmdeploy/serve/proxy/endpoint/admin.py Adds replica admin endpoints (/nodes/*) to new app.
lmdeploy/serve/proxy/endpoint/init.py Creates endpoint package init.
lmdeploy/serve/proxy/dispatch/hybrid.py Implements hybrid dispatch path via selector/forwarder/tracker.
lmdeploy/serve/proxy/dispatch/distserve.py Implements distserve dispatch path incl. PD connection setup.
lmdeploy/serve/proxy/dispatch/base.py Adds shared dispatch context + safe JSON parsing helpers.
lmdeploy/serve/proxy/dispatch/init.py Exposes dispatchers in package API.
lmdeploy/serve/proxy/core/replica.py Adds replica load/registration models.
lmdeploy/serve/proxy/core/errors.py Adds proxy error codes/messages.
lmdeploy/serve/proxy/core/config.py Adds proxy runtime config dataclass.
lmdeploy/serve/proxy/core/init.py Exposes core proxy types.
lmdeploy/serve/proxy/cli.py Adds new proxy CLI entrypoint that builds config + app.
lmdeploy/serve/proxy/app.py Adds FastAPI app factory + shared aiohttp session management.
lmdeploy/serve/proxy/init.py Exposes create_app and proxy from the new proxy package.
lmdeploy/cli/serve.py Updates CLI integration to import proxy from the new module.
docs/en/conf.py Updates docs OpenAPI app import to use create_app(ProxyConfig()).
docs/zh_cn/conf.py Updates docs OpenAPI app import to use create_app(ProxyConfig()).
docs/en/advance/metrics.md Updates proxy status guidance to use GET /nodes/status.
docs/zh_cn/advance/metrics.md Updates proxy status guidance to use GET /nodes/status.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +57 to +60
async with self._session.post(target_url, json=payload) as response:
if response.status != HTTPStatus.OK:
return self.api_timeout_bytes(replica_url)
return await response.text()
Comment on lines +87 to +90
async with self._session.post(target_url, headers=headers, data=body_bytes) as response:
if response.status != HTTPStatus.OK:
return self.api_timeout_bytes(replica_url)
return await response.text()
Comment on lines +61 to +63
except (Exception, GeneratorExit, aiohttp.ClientError, asyncio.CancelledError) as e:
logger.error(f'caught an exception: {e}')
return self.api_timeout_bytes(replica_url)
Comment on lines +78 to +80
except (Exception, GeneratorExit, aiohttp.ClientError) as e:
logger.error(f'caught an exception: {e}')
yield self.api_timeout_bytes(replica_url)
Comment on lines +110 to +112
except (Exception, GeneratorExit, aiohttp.ClientError) as e:
logger.error(f'caught an exception: {e}')
yield self.api_timeout_bytes(replica_url)
Comment on lines +44 to +51
observed = [load.speed for load in replicas.values() if load.speed and load.speed > 0]
if not observed:
observed = [1.0]
average_speed = sum(observed) / len(observed)
return candidates, [
speed if replicas[url].speed else average_speed
for url, speed in zip(candidates, speeds)
]
Comment on lines +3 to +23
from dataclasses import dataclass

from lmdeploy.pytorch.disagg.config import DistServeRDMAConfig, ServingStrategy
from lmdeploy.pytorch.disagg.conn.protocol import MigrationProtocol
from lmdeploy.serve.proxy.utils import RoutingStrategy


@dataclass
class ProxyConfig:
"""Runtime configuration for the proxy server."""

serving_strategy: ServingStrategy = ServingStrategy.Hybrid
routing_strategy: RoutingStrategy = RoutingStrategy.MIN_EXPECTED_LATENCY
migration_protocol: MigrationProtocol = MigrationProtocol.RDMA
rdma_config: DistServeRDMAConfig | None = None
dummy_prefill: bool = False
server_name: str = '0.0.0.0'
server_port: int = 8000
api_keys: list[str] | None = None
ssl: bool = False
log_level: str = 'INFO'
@lvhan028 lvhan028 marked this pull request as draft June 5, 2026 06:41
@lvhan028 lvhan028 force-pushed the refactor-proxy-v2 branch from 1b835b9 to 9f69121 Compare June 8, 2026 08:14
@lvhan028 lvhan028 force-pushed the refactor-proxy-v2 branch from 5b7b7a3 to a0a343a Compare June 9, 2026 04:37
@lvhan028 lvhan028 marked this pull request as ready for review June 9, 2026 14:35
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants