refactor(proxy): split monolithic proxy into modular serve/proxy package#4647
Open
lvhan028 wants to merge 10 commits into
Open
refactor(proxy): split monolithic proxy into modular serve/proxy package#4647lvhan028 wants to merge 10 commits into
lvhan028 wants to merge 10 commits into
Conversation
Contributor
There was a problem hiding this comment.
Pull request overview
This PR refactors the proxy server implementation by splitting the former monolithic lmdeploy/serve/proxy/proxy.py into a modular lmdeploy/serve/proxy package with separated concerns (app wiring, routing/selection, replica registry, dispatchers, upstream forwarding), and updates CLI/docs accordingly.
Changes:
- Introduces a
create_app()-based FastAPI proxy application with a wiredProxyRuntimeand modular endpoints/dispatchers. - Adds modular routing (
ReplicaSelector), replica registry (ReplicaPool+ heartbeat), and upstream forwarding (UpstreamForwarder) components. - Updates CLI entrypoints and documentation to use the new proxy app factory, and adds a basic selector unit test.
Reviewed changes
Copilot reviewed 33 out of 34 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/test_lmdeploy/test_proxy/test_selector.py | Adds a unit test for replica selection behavior. |
| lmdeploy/serve/proxy/utils.py | Removes env-based aiohttp timeout setup from utils (moved into app wiring). |
| lmdeploy/serve/proxy/upstream/forwarder.py | Adds shared-session upstream request forwarding utilities. |
| lmdeploy/serve/proxy/upstream/init.py | Exposes upstream forwarder in package API. |
| lmdeploy/serve/proxy/runtime.py | Wires proxy dependencies (pool/selector/forwarder/dispatchers). |
| lmdeploy/serve/proxy/routing/selector.py | Implements routing strategies for replica selection. |
| lmdeploy/serve/proxy/routing/init.py | Exposes routing selector in package API. |
| lmdeploy/serve/proxy/registry/pool.py | Implements in-memory replica registry and inflight tracking integration. |
| lmdeploy/serve/proxy/registry/heartbeat.py | Adds background heartbeat eviction loop. |
| lmdeploy/serve/proxy/registry/init.py | Exposes replica pool in package API. |
| lmdeploy/serve/proxy/proxy.py | Removes the legacy monolithic proxy implementation. |
| lmdeploy/serve/proxy/metrics/load_tracker.py | Adds a small adapter for inflight tracking calls. |
| lmdeploy/serve/proxy/metrics/init.py | Exposes inflight tracker in package API. |
| lmdeploy/serve/proxy/endpoint/openai.py | Implements OpenAI-compatible endpoints using the new runtime/dispatchers. |
| lmdeploy/serve/proxy/endpoint/distserve.py | Adds distserve management endpoints to new app. |
| lmdeploy/serve/proxy/endpoint/admin.py | Adds replica admin endpoints (/nodes/*) to new app. |
| lmdeploy/serve/proxy/endpoint/init.py | Creates endpoint package init. |
| lmdeploy/serve/proxy/dispatch/hybrid.py | Implements hybrid dispatch path via selector/forwarder/tracker. |
| lmdeploy/serve/proxy/dispatch/distserve.py | Implements distserve dispatch path incl. PD connection setup. |
| lmdeploy/serve/proxy/dispatch/base.py | Adds shared dispatch context + safe JSON parsing helpers. |
| lmdeploy/serve/proxy/dispatch/init.py | Exposes dispatchers in package API. |
| lmdeploy/serve/proxy/core/replica.py | Adds replica load/registration models. |
| lmdeploy/serve/proxy/core/errors.py | Adds proxy error codes/messages. |
| lmdeploy/serve/proxy/core/config.py | Adds proxy runtime config dataclass. |
| lmdeploy/serve/proxy/core/init.py | Exposes core proxy types. |
| lmdeploy/serve/proxy/cli.py | Adds new proxy CLI entrypoint that builds config + app. |
| lmdeploy/serve/proxy/app.py | Adds FastAPI app factory + shared aiohttp session management. |
| lmdeploy/serve/proxy/init.py | Exposes create_app and proxy from the new proxy package. |
| lmdeploy/cli/serve.py | Updates CLI integration to import proxy from the new module. |
| docs/en/conf.py | Updates docs OpenAPI app import to use create_app(ProxyConfig()). |
| docs/zh_cn/conf.py | Updates docs OpenAPI app import to use create_app(ProxyConfig()). |
| docs/en/advance/metrics.md | Updates proxy status guidance to use GET /nodes/status. |
| docs/zh_cn/advance/metrics.md | Updates proxy status guidance to use GET /nodes/status. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Comment on lines
+57
to
+60
| async with self._session.post(target_url, json=payload) as response: | ||
| if response.status != HTTPStatus.OK: | ||
| return self.api_timeout_bytes(replica_url) | ||
| return await response.text() |
Comment on lines
+87
to
+90
| async with self._session.post(target_url, headers=headers, data=body_bytes) as response: | ||
| if response.status != HTTPStatus.OK: | ||
| return self.api_timeout_bytes(replica_url) | ||
| return await response.text() |
Comment on lines
+61
to
+63
| except (Exception, GeneratorExit, aiohttp.ClientError, asyncio.CancelledError) as e: | ||
| logger.error(f'caught an exception: {e}') | ||
| return self.api_timeout_bytes(replica_url) |
Comment on lines
+78
to
+80
| except (Exception, GeneratorExit, aiohttp.ClientError) as e: | ||
| logger.error(f'caught an exception: {e}') | ||
| yield self.api_timeout_bytes(replica_url) |
Comment on lines
+110
to
+112
| except (Exception, GeneratorExit, aiohttp.ClientError) as e: | ||
| logger.error(f'caught an exception: {e}') | ||
| yield self.api_timeout_bytes(replica_url) |
Comment on lines
+44
to
+51
| observed = [load.speed for load in replicas.values() if load.speed and load.speed > 0] | ||
| if not observed: | ||
| observed = [1.0] | ||
| average_speed = sum(observed) / len(observed) | ||
| return candidates, [ | ||
| speed if replicas[url].speed else average_speed | ||
| for url, speed in zip(candidates, speeds) | ||
| ] |
Comment on lines
+3
to
+23
| from dataclasses import dataclass | ||
|
|
||
| from lmdeploy.pytorch.disagg.config import DistServeRDMAConfig, ServingStrategy | ||
| from lmdeploy.pytorch.disagg.conn.protocol import MigrationProtocol | ||
| from lmdeploy.serve.proxy.utils import RoutingStrategy | ||
|
|
||
|
|
||
| @dataclass | ||
| class ProxyConfig: | ||
| """Runtime configuration for the proxy server.""" | ||
|
|
||
| serving_strategy: ServingStrategy = ServingStrategy.Hybrid | ||
| routing_strategy: RoutingStrategy = RoutingStrategy.MIN_EXPECTED_LATENCY | ||
| migration_protocol: MigrationProtocol = MigrationProtocol.RDMA | ||
| rdma_config: DistServeRDMAConfig | None = None | ||
| dummy_prefill: bool = False | ||
| server_name: str = '0.0.0.0' | ||
| server_port: int = 8000 | ||
| api_keys: list[str] | None = None | ||
| ssl: bool = False | ||
| log_level: str = 'INFO' |
1b835b9 to
9f69121
Compare
5b7b7a3 to
a0a343a
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
LMDeploy Proxy Server Refactor
Motivation
The legacy proxy lived in a single ~950-line module (
lmdeploy/serve/proxy/proxy.py) that mixed HTTP routing, replica registry, health checking, load balancing, request forwarding, DistServe orchestration, and error handling. That layout made the code hard to test, extend, and reason about.Concrete problems on
mainincluded:api_servercould register with the proxy before its HTTP listener was ready;/nodes/adddid not validate replicas by probing/v1/models.api_server/healthJSON semantics (healthy,sleeping,initializing, etc.).api_serverconnection open and leave inflight counters elevated.The refactor preserves existing user-facing CLI entry points (
lmdeploy serve proxy) and the external API surface (/v1/*,/nodes/*, DistServe admin routes) while improving modularity, HTTP correctness, and operability.Design
Package layout
The proxy is split into a small package under
lmdeploy/serve/proxy/:Runtime wiring
ProxyRuntimeis constructed once per process inside the FastAPI lifespan and stored onapp.state.runtime. It owns:ReplicaPoolHealthCheckerGET {replica}/health, eviction after consecutive failuresReplicaSelectorEngineRoleUpstreamForwarderaiohttp.ClientSessionPOST to replicasInflightTrackerunfinished, rolling latency)HybridDispatcher/DistServeDispatcherRequest flow (Hybrid)
DistServe follows the same pattern with an optional prefill phase, PD connection warmup, and
migration_requestinjection before decode.Registration and health
Registration (
POST /nodes/add)GET {url}/v1/modelsand require a non-empty model list (502 if unreachable or empty).status.models, validate they exist in the probe result (400 on mismatch).ReplicaLoadand upsert intoReplicaPool.Auto-registration (
api_server --proxy-url)Registration is deferred until the local HTTP server accepts connections (
_wait_until_listeningon/health), then posted asynchronously viaasyncio.create_taskso startup is non-blocking.Health checking
GET {url}/healthand parse JSONstatus/message.healthyandsleepingas serving;initializingas transient (do not evict immediately)._FAILURE_THRESHOLD(default 2) consecutive non-serving results.LMDEPLOY_HEALTH_CHECK_INTERVAL/LMDEPLOY_HEALTH_CHECK_TIMEOUT.Error model
ErrorResponseJSON viacreate_error_responseHTTPExceptionwith appropriate status (400, 404, 502)APIServerExceptioncarries status/body/headers through the forwarder; converted at the dispatcher or inProxyStreamingResponsefor streamsProxy-specific numeric error codes are removed from the inference path.
Client disconnect handling
Downstream disconnect must release upstream resources without calling
abort_request(production cannot enableenable_abort_handling).Non-streaming (buffer)
race_awaitable_with_disconnect()runs the upstreampost + readcoroutine in parallel withis_disconnected()polling. On disconnect, the upstream task is cancelled (closing the aiohttp connection) and the forwarder returnsNone. Dispatchers finish inflight tracking infinallyand return without building a response.Streaming
ProxyStreamingResponseuses a hybrid strategy for performance on high chunk rates:__anext__()against disconnect (prefill / time-to-first-token is the longest wait).is_disconnected()check between chunks, then plain__anext__()(no per-chunk task allocation).CancelledError—aclose()the upstream async generator to close the replica connection;on_completealways runs infinallyto decrementunfinished.Shared upstream HTTP session
A single
aiohttp.ClientSessionis created in the app lifespan with configurableAIOHTTP_TIMEOUT,AIOHTTP_LIMIT, andAIOHTTP_LIMIT_PER_HOST. All replicas share this pool.Authentication
When
--api-keysis set,AuthenticationMiddlewareprotects paths under/v1/*only. Admin routes under/nodes/*remain open so replicas can register without knowing the inference API key.Configuration
ProxyConfigcentralizes:serving_strategy—HybridorDistServerouting_strategy—random,min_expected_latency,min_observed_latencymigration_protocol,rdma_config,dummy_prefillRoutingStrategylives incore/config.py(moved from the removedutils.pybarrel module).Modifications (vs
main)Structural
proxy.pyandutils.py.ProxyRuntimedependency injection.RoutingStrategy→core/config.py;APIServerException→upstream/exceptions.py.ReplicaLoad(_LATENCY_DEQUE_LEN = 15).Registry and admin API
ReplicaPool.add()upserts by URL (internal_discardbefore insert).ReplicaPool.remove()raisesReplicaNotFoundError→ HTTP 404.POST /nodes/addprobes/v1/models; unreachable → 502, model mismatch → 400.POST /nodes/terminatecalls replicaGET /terminatethen removes from pool; missing replica → 404.Inference and forwarding
UpstreamForwarderimplementsforward_{json,raw}_{buffer,stream}with shared session.ProxyStreamingResponsehandles upstream HTTP errors on the first stream chunk and client-disconnect cleanup.response_from_api_exception()for non-stream upstream failures.BackgroundTasksfor stream inflight completion; replaced withon_completecallback.api_serverintegrationDocumentation and metrics
Tests
Added focused unit tests under
tests/test_lmdeploy/serve/proxy/:test_admin.py/nodes/addsuccess and failure paths, remove, terminatetest_pool.pytest_health_checker.pytest_selector.pytest_dispatch.pysafe_json_load, upstream error mapping, inflight finishtest_forwarder.pytest_streaming_response.pyFunctional tests for the full proxy +
api_serverstack live inworkspace/proxy/run_functional_tests.py(F01–F20), including client-disconnect end-to-end validation (F20).