Improve engine health monitoring and wakeup scheduling#4645
Conversation
There was a problem hiding this comment.
Pull request overview
This PR tunes engine health monitoring and scheduling progress reporting to reduce false “unhealthy” states during long/overlapping probes and long prefills, and makes the /wakeup endpoint non-blocking for the FastAPI event loop.
Changes:
- Add environment-variable overrides and updated defaults for health monitor polling/timeout/staleness behavior, including “pending” probe handling.
- Make
AsyncEngine.wakeup()asynchronous by offloading the blocking backend wakeup to a worker thread; update the OpenAI-compatible/wakeuproute to await it. - Advance
scheduler_tickonce perforward_async()dispatch (moved to inputs dispatch path) so health logic sees progress even during long prefills.
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| lmdeploy/serve/openai/api_server.py | Await the now-async engine wakeup() in the /wakeup handler. |
| lmdeploy/serve/core/health.py | Add env overrides and adjust probe/poll/staleness logic, including skipping snapshot updates on pending probes. |
| lmdeploy/serve/core/async_engine.py | Return pending when probes overlap; make wakeup() async via asyncio.to_thread(). |
| lmdeploy/pytorch/paging/scheduler.py | Clarify tick() semantics as one step per forward dispatch. |
| lmdeploy/pytorch/engine/inputs_maker.py | Call scheduler.tick() after each forward_async() dispatch. |
| lmdeploy/pytorch/engine/engine_loop.py | Remove the old scheduler.tick() location to avoid double-counting / wrong timing. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| def _env_override_float(env_var: str, value: float) -> float: | ||
| """Return ``value`` unless ``env_var`` is set, then parse and return it.""" | ||
| env_value = os.getenv(env_var) | ||
| if env_value is None: | ||
| return value | ||
| try: | ||
| return float(env_value) | ||
| except ValueError: | ||
| return value |
| await self.engine.sleep(level) | ||
|
|
||
| def wakeup(self, tags: list[str] | None = None): | ||
| async def wakeup(self, tags: list[str] | None = None): |
| logger.warning(f'some tag in {tags} not in sleeping tags {self.sleeping_tags}') | ||
| return | ||
| self.engine.wakeup(tags) | ||
| await asyncio.to_thread(self.engine.wakeup, tags) |
There was a problem hiding this comment.
quote:
await asyncio.to_thread(self.engine.wakeup, tags) runs PyTorch Engine.wakeup() off the event-loop thread. That path calls EngineLoop.resume_from_sleep() (lmdeploy/pytorch/engine/engine.py:553), which sets loop-owned asyncio.Events (lmdeploy/pytorch/engine/engine_loop.py:181-188). This is not thread-safe and can leave the engine loop stuck after /wakeup, or fail under asyncio debug. I’d split the blocking backend wakeup/warmup from the event-loop resume, or resume via loop.call_soon_threadsafe
Motivation
Modification
lmdeploy/serve/core/health.py
LMDEPLOY_HEALTH_POLL_INTERVAL,LMDEPLOY_HEALTH_PROBE_TIMEOUT,LMDEPLOY_HEALTH_UNHEALTHY_AFTER.if poll_interval <= probe_timeout.pendingprobe result: log and skip snapshot update (keep last successful state).lmdeploy/serve/core/async_engine.py
wakeup()async and runengine.wakeup()viaasyncio.to_thread()to avoid blocking the API event loop.lmdeploy/serve/openai/api_server.py
await async wakeup()in the/wakeuphandler.lmdeploy/pytorch/engine/inputs_maker.py
lmdeploy/pytorch/engine/engine_loop.py
lmdeploy/pytorch/paging/scheduler.py
scheduler.tick()after eachforward_async()(one tick per forward dispatch).