Skip to content

Commit b8e1509

Browse files
rgarciaclaude
andcommitted
feat: control-plane fallback for direct-VM browser routing
Add a transparent control-plane fallback to the existing direct-to-VM routing. When a GET/HEAD that was routed to a browser VM (allowlisted subresource + cached route) fails because the VM is unreachable or the session is gone, the original request is re-issued once to the control plane (original URL, Authorization restored, no jwt param). Triggers: connection/transport error, HTTP 502/503/504, or the clean gone signal (404 + X-Kernel-Upstream: gone + body code browser_gone). Never falls back for POST/PUT/PATCH/DELETE, ordinary live-VM 4xx, or non-routed requests. Exactly one fallback attempt; the SDK's own retry, cache eviction, and stream handling are left intact. Implemented by overriding request() in Kernel/AsyncKernel to wrap the base loop: eligibility is computed from the pre-rewrite options, and a ContextVar suppresses re-routing during the single fallback re-issue. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1 parent 4c5dde5 commit b8e1509

3 files changed

Lines changed: 430 additions & 3 deletions

File tree

src/kernel/_client.py

Lines changed: 189 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@
44

55
import os
66
from typing import TYPE_CHECKING, Any, Dict, Type, Mapping, cast
7-
from typing_extensions import Self, Literal, override
7+
from contextvars import ContextVar
8+
from typing_extensions import Self, Literal, overload, override
89

910
import httpx
1011

@@ -25,22 +26,26 @@
2526
is_mapping_t,
2627
get_async_library,
2728
)
28-
from ._compat import cached_property
29+
from ._compat import model_copy, cached_property
2930
from ._models import FinalRequestOptions
3031
from ._version import __version__
3132
from ._streaming import Stream as Stream, AsyncStream as AsyncStream
32-
from ._exceptions import KernelError, APIStatusError
33+
from ._exceptions import KernelError, APIStatusError, APIConnectionError
3334
from ._base_client import (
3435
DEFAULT_MAX_RETRIES,
3536
SyncAPIClient,
3637
AsyncAPIClient,
38+
_StreamT,
39+
_AsyncStreamT,
3740
)
3841
from .lib.browser_routing.routing import (
3942
BrowserRouteCache,
4043
BrowserRoutingConfig,
4144
strip_direct_vm_auth,
4245
rewrite_direct_vm_options,
46+
is_vm_unreachable_response,
4347
browser_routing_config_from_env,
48+
should_fallback_to_control_plane,
4449
maybe_evict_browser_route_from_response,
4550
maybe_populate_browser_route_cache_from_response,
4651
)
@@ -92,6 +97,13 @@
9297
"development": "https://localhost:3001/",
9398
}
9499

100+
# Set (per thread / per async task) only during a control-plane fallback attempt
101+
# so `_prepare_options` skips direct-VM rewriting for that single re-issued request.
102+
# A ContextVar (rather than an instance attribute) keeps the flag local to the
103+
# in-flight request and avoids interfering with other concurrent requests sharing
104+
# the same client.
105+
_disable_browser_routing: ContextVar[bool] = ContextVar("kernel_disable_browser_routing", default=False)
106+
95107

96108
class Kernel(SyncAPIClient):
97109
# client options
@@ -307,12 +319,98 @@ def default_headers(self) -> dict[str, str | Omit]:
307319
@override
308320
def _prepare_options(self, options: Any) -> Any:
309321
options = cast(Any, super()._prepare_options(options))
322+
# During a control-plane fallback attempt we deliberately skip direct-VM
323+
# rewriting so the original request is re-issued to the control plane.
324+
if _disable_browser_routing.get():
325+
return options
310326
return rewrite_direct_vm_options(options, cache=self.browser_route_cache, config=self._browser_routing)
311327

312328
@override
313329
def _prepare_request(self, request: httpx.Request) -> None:
314330
strip_direct_vm_auth(request, cache=self.browser_route_cache)
315331

332+
@overload
333+
def request(
334+
self,
335+
cast_to: Type[ResponseT],
336+
options: FinalRequestOptions,
337+
*,
338+
stream: Literal[True],
339+
stream_cls: Type[_StreamT],
340+
) -> _StreamT: ...
341+
342+
@overload
343+
def request(
344+
self,
345+
cast_to: Type[ResponseT],
346+
options: FinalRequestOptions,
347+
*,
348+
stream: Literal[False] = False,
349+
) -> ResponseT: ...
350+
351+
@overload
352+
def request(
353+
self,
354+
cast_to: Type[ResponseT],
355+
options: FinalRequestOptions,
356+
*,
357+
stream: bool = False,
358+
stream_cls: Type[_StreamT] | None = None,
359+
) -> ResponseT | _StreamT: ...
360+
361+
@override
362+
def request(
363+
self,
364+
cast_to: Type[ResponseT],
365+
options: FinalRequestOptions,
366+
*,
367+
stream: bool = False,
368+
stream_cls: type[_StreamT] | None = None,
369+
) -> ResponseT | _StreamT:
370+
# Capture the ORIGINAL, pre-rewrite control-plane options. `request`
371+
# rewrites a routed request to the VM internally (via `_prepare_options`),
372+
# so we must decide fallback eligibility from these untouched options.
373+
original_options = model_copy(options)
374+
eligible = _disable_browser_routing.get() is False and should_fallback_to_control_plane(
375+
original_options, cache=self.browser_route_cache, config=self._browser_routing
376+
)
377+
378+
try:
379+
return super().request(cast_to, options, stream=stream, stream_cls=stream_cls)
380+
except APIStatusError as err:
381+
# Direct-VM attempt completed but the VM is unreachable/gone. The SDK's
382+
# own retry logic has already run and given up by the time we get here.
383+
if eligible and is_vm_unreachable_response(err.response):
384+
return self._control_plane_fallback(
385+
cast_to, original_options, stream=stream, stream_cls=stream_cls
386+
)
387+
raise
388+
except APIConnectionError:
389+
# Connection/network/transport error (incl. timeouts) talking to the VM.
390+
if eligible:
391+
return self._control_plane_fallback(
392+
cast_to, original_options, stream=stream, stream_cls=stream_cls
393+
)
394+
raise
395+
396+
def _control_plane_fallback(
397+
self,
398+
cast_to: Type[ResponseT],
399+
original_options: FinalRequestOptions,
400+
*,
401+
stream: bool,
402+
stream_cls: type[_StreamT] | None,
403+
) -> ResponseT | _StreamT:
404+
# Re-issue the ORIGINAL request to the control plane exactly once: original
405+
# URL, Authorization restored (default auth headers are re-applied because
406+
# routing is disabled, so `strip_direct_vm_auth` is a no-op), no jwt param.
407+
# The flag prevents re-routing to the VM during this single attempt.
408+
token = _disable_browser_routing.set(True)
409+
try:
410+
return super().request(cast_to, model_copy(original_options), stream=stream, stream_cls=stream_cls)
411+
finally:
412+
_disable_browser_routing.reset(token)
413+
316414
@override
317415
def _process_response(
318416
self,
@@ -638,12 +736,100 @@ def default_headers(self) -> dict[str, str | Omit]:
638736
@override
639737
async def _prepare_options(self, options: Any) -> Any:
640738
options = cast(Any, await super()._prepare_options(options))
739+
# During a control-plane fallback attempt we deliberately skip direct-VM
740+
# rewriting so the original request is re-issued to the control plane.
741+
if _disable_browser_routing.get():
742+
return options
641743
return rewrite_direct_vm_options(options, cache=self.browser_route_cache, config=self._browser_routing)
642744

643745
@override
644746
async def _prepare_request(self, request: httpx.Request) -> None:
645747
strip_direct_vm_auth(request, cache=self.browser_route_cache)
646748

749+
@overload
750+
async def request(
751+
self,
752+
cast_to: Type[ResponseT],
753+
options: FinalRequestOptions,
754+
*,
755+
stream: Literal[False] = False,
756+
) -> ResponseT: ...
757+
758+
@overload
759+
async def request(
760+
self,
761+
cast_to: Type[ResponseT],
762+
options: FinalRequestOptions,
763+
*,
764+
stream: Literal[True],
765+
stream_cls: type[_AsyncStreamT],
766+
) -> _AsyncStreamT: ...
767+
768+
@overload
769+
async def request(
770+
self,
771+
cast_to: Type[ResponseT],
772+
options: FinalRequestOptions,
773+
*,
774+
stream: bool = False,
775+
stream_cls: type[_AsyncStreamT] | None = None,
776+
) -> ResponseT | _AsyncStreamT: ...
777+
778+
@override
779+
async def request(
780+
self,
781+
cast_to: Type[ResponseT],
782+
options: FinalRequestOptions,
783+
*,
784+
stream: bool = False,
785+
stream_cls: type[_AsyncStreamT] | None = None,
786+
) -> ResponseT | _AsyncStreamT:
787+
# Capture the ORIGINAL, pre-rewrite control-plane options. `request`
788+
# rewrites a routed request to the VM internally (via `_prepare_options`),
789+
# so we must decide fallback eligibility from these untouched options.
790+
original_options = model_copy(options)
791+
eligible = _disable_browser_routing.get() is False and should_fallback_to_control_plane(
792+
original_options, cache=self.browser_route_cache, config=self._browser_routing
793+
)
794+
795+
try:
796+
return await super().request(cast_to, options, stream=stream, stream_cls=stream_cls)
797+
except APIStatusError as err:
798+
# Direct-VM attempt completed but the VM is unreachable/gone. The SDK's
799+
# own retry logic has already run and given up by the time we get here.
800+
if eligible and is_vm_unreachable_response(err.response):
801+
return await self._control_plane_fallback(
802+
cast_to, original_options, stream=stream, stream_cls=stream_cls
803+
)
804+
raise
805+
except APIConnectionError:
806+
# Connection/network/transport error (incl. timeouts) talking to the VM.
807+
if eligible:
808+
return await self._control_plane_fallback(
809+
cast_to, original_options, stream=stream, stream_cls=stream_cls
810+
)
811+
raise
812+
813+
async def _control_plane_fallback(
814+
self,
815+
cast_to: Type[ResponseT],
816+
original_options: FinalRequestOptions,
817+
*,
818+
stream: bool,
819+
stream_cls: type[_AsyncStreamT] | None,
820+
) -> ResponseT | _AsyncStreamT:
821+
# Re-issue the ORIGINAL request to the control plane exactly once: original
822+
# URL, Authorization restored (default auth headers are re-applied because
823+
# routing is disabled, so `strip_direct_vm_auth` is a no-op), no jwt param.
824+
# The flag prevents re-routing to the VM during this single attempt.
825+
token = _disable_browser_routing.set(True)
826+
try:
827+
return await super().request(
828+
cast_to, model_copy(original_options), stream=stream, stream_cls=stream_cls
829+
)
830+
finally:
831+
_disable_browser_routing.reset(token)
832+
647833
@override
648834
async def _process_response(
649835
self,

src/kernel/lib/browser_routing/routing.py

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,73 @@ def rewrite_direct_vm_options(
216216
return rewritten
217217

218218

219+
# HTTP methods that are safe to transparently re-issue. Only idempotent reads may
220+
# fall back to the control plane; re-running a POST/PUT/PATCH/DELETE could
221+
# double-execute a side effect (e.g. a routed `curl`), so those never fall back.
222+
_FALLBACK_IDEMPOTENT_METHODS = frozenset({"GET", "HEAD"})
223+
224+
# The clean "browser is gone" signal emitted by the VM proxy: a 404 carrying this
225+
# header value plus a JSON body with this code. 404 mirrors the control plane's
226+
# not_found semantics, so falling back to the control plane is safe.
227+
_GONE_UPSTREAM_HEADER = "x-kernel-upstream"
228+
_GONE_UPSTREAM_VALUE = "gone"
229+
_GONE_BODY_CODE = "browser_gone"
230+
231+
# Transport-level / "VM unreachable" status codes. A live VM never returns these
232+
# for a real request, so they (along with the gone signal and connection errors)
233+
# indicate the session's VM is unreachable or gone.
234+
_UNREACHABLE_STATUS_CODES = frozenset({502, 503, 504})
235+
236+
237+
def should_fallback_to_control_plane(options: FinalRequestOptions, *, cache: BrowserRouteCache, config: BrowserRoutingConfig) -> bool:
238+
"""Return True if ``options`` (the ORIGINAL, pre-rewrite control-plane options)
239+
describes a request that was actually routed to a browser VM AND is safe to
240+
transparently re-issue to the control plane on failure.
241+
242+
Eligibility requires all of: an allowlisted browser subresource, a cached
243+
route for the session, and an idempotent HTTP method (GET/HEAD).
244+
"""
245+
if options.method.upper() not in _FALLBACK_IDEMPOTENT_METHODS:
246+
return False
247+
248+
match = match_direct_vm_path(options.url)
249+
if match is None:
250+
return False
251+
252+
session_id, subresource, _suffix = match
253+
if subresource not in set(config.subresources):
254+
return False
255+
256+
return cache.get(session_id) is not None
257+
258+
259+
def is_vm_unreachable_response(response: httpx.Response) -> bool:
260+
"""Return True if ``response`` from the direct-VM attempt indicates the VM is
261+
unreachable or the session is gone.
262+
263+
Triggers on 502/503/504 (today's dead-VM signal) or the clean gone marker
264+
(404 + ``X-Kernel-Upstream: gone`` + JSON body code ``browser_gone``). An
265+
ordinary 4xx from a live VM (400/401/403, or a non-marker 404) does NOT
266+
trigger.
267+
"""
268+
status = response.status_code
269+
if status in _UNREACHABLE_STATUS_CODES:
270+
return True
271+
272+
# Clean "gone" signal: 404 + header marker + JSON body code. Require the
273+
# header so we never confuse a live VM's ordinary 404 with a gone session.
274+
if status == 404 and response.headers.get(_GONE_UPSTREAM_HEADER, "").strip().lower() == _GONE_UPSTREAM_VALUE:
275+
try:
276+
body = response.json()
277+
except Exception:
278+
return False
279+
if not isinstance(body, Mapping):
280+
return False
281+
return cast(Mapping[str, object], body).get("code") == _GONE_BODY_CODE
282+
283+
return False
284+
285+
219286
def strip_direct_vm_auth(request: httpx.Request, *, cache: BrowserRouteCache) -> None:
220287
raw = str(request.url)
221288
for route in cache.values():

0 commit comments

Comments
 (0)