Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
bc11725
increase uvicorn keep alive
nkitsaini Jun 11, 2026
c6e96b9
Fix async UDF event loop starvation under heavy load in Jupyter
kesmit13 May 14, 2026
16b9426
Ensure proper cancellation of async UDFs in dedicated loop
kesmit13 May 14, 2026
8bed545
Fix create_task expecting coroutine, use ensure_future for wrapped fu…
kesmit13 May 14, 2026
3027227
Call Application.shutdown() when replacing UDF server
kesmit13 May 14, 2026
6f2f8ab
Propagate cancellation to UDF loop and prevent thread leak on failure
kesmit13 May 14, 2026
6db2fc1
Fix udf_future NameError and lazily initialize UDF event loop
kesmit13 May 14, 2026
2b65cd0
Reset UDF loop state in shutdown() to allow safe reuse
kesmit13 May 14, 2026
1cffe17
Use thread-per-request for async UDFs instead of shared event loop
kesmit13 May 18, 2026
19d00da
Add cancellable wrapper for responsive async UDF cancellation
kesmit13 May 18, 2026
e260b19
Fix event loop closed error and add comprehensive UDF dispatch tests
kesmit13 May 19, 2026
c96113b
reuse event loop across requests on same thread
nkitsaini Jun 3, 2026
952ef2d
dedicated async thread
nkitsaini Jun 10, 2026
4448a69
more logs
nkitsaini Jun 10, 2026
b928ab3
pin ip
nkitsaini Jun 11, 2026
6933ff3
Revert "pin ip"
nkitsaini Jun 15, 2026
3e4695e
rewrite
nkitsaini Jun 15, 2026
0a7a0a9
rewrite 2
nkitsaini Jun 15, 2026
05077b2
improve comments
nkitsaini Jun 16, 2026
007e29f
improvements in race handling between cancel and task
nkitsaini Jun 17, 2026
77c75a6
fix lint
nkitsaini Jun 17, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions singlestoredb/apps/_python_udfs.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import asyncio
import os
import typing
import warnings

from ..functions.ext.asgi import Application
from ._config import AppConfig
Expand All @@ -16,6 +17,12 @@
# Maximum number of UDFs allowed
MAX_UDFS_LIMIT = 10

# Default uvicorn keep-alive timeout (seconds) for managed UDF servers.
# The high keep-alive timeout makes sure uvicorn does not close idle connections so
# eagerly. Whichever side closes first holds the socket in TIME_WAIT (~60s on
# Linux), so server-initiated closes churn sockets under load.
DEFAULT_UDF_KEEPALIVE_TIMEOUT = 120


async def run_udf_app(
log_level: str = 'error',
Expand Down Expand Up @@ -66,6 +73,7 @@ async def run_udf_app(
host='0.0.0.0',
port=app_config.listen_port,
log_config=app.get_uvicorn_log_config(),
timeout_keep_alive=get_keep_alive_timeout(),
)

# Register the functions only if the app is running interactively.
Expand Down Expand Up @@ -98,3 +106,24 @@ def generate_base_url(app_config: AppConfig) -> str:
)

return f'{gateway_url}/pythonudfs/{app_config.notebook_server_id}/interactive/'


def get_keep_alive_timeout() -> int:
raw = os.environ.get('SINGLESTOREDB_UDF_KEEPALIVE_TIMEOUT')
if raw is None:
return DEFAULT_UDF_KEEPALIVE_TIMEOUT
try:
value = int(raw)
except (TypeError, ValueError):
warnings.warn(
f'Invalid SINGLESTOREDB_UDF_KEEPALIVE_TIMEOUT={raw!r}; '
f'falling back to {DEFAULT_UDF_KEEPALIVE_TIMEOUT}s.',
)
return DEFAULT_UDF_KEEPALIVE_TIMEOUT
if value < 0:
warnings.warn(
f'Negative SINGLESTOREDB_UDF_KEEPALIVE_TIMEOUT={value}; '
f'falling back to {DEFAULT_UDF_KEEPALIVE_TIMEOUT}s.',
)
return DEFAULT_UDF_KEEPALIVE_TIMEOUT
return value
235 changes: 225 additions & 10 deletions singlestoredb/functions/ext/asgi.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
"""
import argparse
import asyncio
import atexit
import contextlib
import contextvars
import dataclasses
import datetime
Expand Down Expand Up @@ -113,6 +115,203 @@ async def to_thread(
return await loop.run_in_executor(None, func_call)


async def _poll_cancel(cancel_event: threading.Event) -> None:
"""
Return once ``cancel_event`` is set, polling it on the running loop.

``threading.Event`` has no awaitable interface, so this bridges the
cross-thread cancellation signal into the dispatch loop by polling on a
short interval. Used as a sibling task to the UDF coroutine in
``_cancellable_run``.
"""
while not cancel_event.is_set():
await asyncio.sleep(0.1)


async def _cancellable_run(
cancel_event: threading.Event,
coro: Any,
) -> Any:
"""
Run ``coro`` but abandon it if ``cancel_event`` is tripped.

The coroutine races ``_poll_cancel``; whichever finishes first wins. If
the cancel signal wins, the coroutine's task is cancelled and
``CancelledError`` is raised, otherwise its result (or exception) is
propagated. This is the authoritative cancellation path for async UDFs:
they run on the shared dispatch loop, where ordinary task cancellation
from the request loop does not reach them.
"""
task = asyncio.create_task(coro)
cancel_check = asyncio.create_task(_poll_cancel(cancel_event))
try:
done, _ = await asyncio.wait(
[task, cancel_check], return_when=asyncio.FIRST_COMPLETED,
)
# Prefer a completed result: if the work finished, return (or raise)
# its outcome even when the cancel signal fired in the same wakeup
# (both tasks can land in ``done``). This stops a racing
# timeout/disconnect from discarding an already-successful result.
if task in done:
return task.result()
# Otherwise the cancel poll won the race; abandon the work.
raise asyncio.CancelledError()
finally:
# Cancel and await both helper tasks so neither is left pending
# (which would emit "Task was destroyed but it is pending!"
# warnings during loop teardown).
for t in (task, cancel_check):
if not t.done():
t.cancel()
with contextlib.suppress(asyncio.CancelledError, Exception):
await t


# Dedicated event loop used for ALL async UDF requests.
#
# Async UDFs commonly create resources bound to the event loop they are
# first used on (httpx pools, async DB clients, anyio streams, ...). Routing
# every async UDF onto one dedicated loop lets those resources be reused
# safely across requests and avoids the "bound to a different event loop"
# errors seen when requests land on different ad-hoc worker threads.
# ``run_coroutine_threadsafe`` schedules each coroutine immediately, so
# requests run concurrently rather than queuing behind in-flight ones.
#
# Sync UDFs instead run in a worker thread (one ``asyncio.run`` per call):
# a sync UDF would block this shared loop and starve other async requests.
_async_dispatch_loop: 'Optional[asyncio.AbstractEventLoop]' = None
_async_dispatch_thread: 'Optional[threading.Thread]' = None
_async_dispatch_lock = threading.Lock()


def _get_async_dispatch_loop() -> asyncio.AbstractEventLoop:
"""
Return (lazily creating) the singleton async-dispatch event loop.

Owned by a dedicated daemon thread running ``run_forever`` for the life
of the process (see the module-level notes above for the rationale).
"""
global _async_dispatch_loop, _async_dispatch_thread

loop = _async_dispatch_loop
if loop is not None and not loop.is_closed():
return loop

with _async_dispatch_lock:
if _async_dispatch_loop is not None and \
not _async_dispatch_loop.is_closed():
return _async_dispatch_loop

ready = threading.Event()
captured: List[asyncio.AbstractEventLoop] = []
startup_error: List[BaseException] = []

def run_loop() -> None:
try:
new_loop = asyncio.new_event_loop()
asyncio.set_event_loop(new_loop)
captured.append(new_loop)
except BaseException as e: # noqa: B902
# Surface startup failures to the caller instead of leaving
# it blocked forever on ``ready``.
startup_error.append(e)
ready.set()
return
ready.set()
try:
new_loop.run_forever()
finally:
try:
new_loop.run_until_complete(new_loop.shutdown_asyncgens())
except Exception:
pass
try:
new_loop.run_until_complete(
new_loop.shutdown_default_executor(),
)
except Exception:
pass
try:
new_loop.close()
except Exception:
pass

thread = threading.Thread(
target=run_loop,
name='singlestoredb-udf-async-dispatch',
daemon=True,
)
thread.start()
ready.wait()

if startup_error:
raise RuntimeError(
'Failed to start the async UDF dispatch event loop',
) from startup_error[0]
if not captured:
raise RuntimeError(
'Async UDF dispatch event loop failed to start',
)

_async_dispatch_loop = captured[0]
_async_dispatch_thread = thread
return _async_dispatch_loop
Comment thread
nkitsaini marked this conversation as resolved.


async def _dispatch_to_async_loop(coro: Any) -> Any:
"""
Schedule ``coro`` on the dedicated async-dispatch loop and await it.

The caller's ``contextvars`` context is copied and re-established on the
dispatch loop so values such as tracing/tenant context propagate across
the thread boundary (``run_coroutine_threadsafe`` would otherwise run the
work in the dispatch thread's empty context).

Cancelling the awaiting task best-effort cancels the scheduled work, but
``cancel_event`` (observed by ``_cancellable_run`` from inside the
dispatch loop) remains the authoritative cancellation signal.
"""
loop = _get_async_dispatch_loop()
ctx = contextvars.copy_context()

async def _runner() -> Any:
# Creating the task inside ``ctx.run`` makes it copy ``ctx`` as its
# context (asyncio copies the active context at task creation), so the
# UDF coroutine sees the caller's context variables. ``context=`` on
# ``create_task`` would be cleaner but is only available on 3.11+.
task = ctx.run(asyncio.create_task, coro)
return await task

cf = asyncio.run_coroutine_threadsafe(_runner(), loop)
try:
return await asyncio.wrap_future(cf)
except asyncio.CancelledError:
cf.cancel()
raise


def _shutdown_async_dispatch_loop() -> None:
"""Best-effort cleanup of the dedicated async-dispatch loop at exit."""
global _async_dispatch_loop, _async_dispatch_thread
with _async_dispatch_lock:
loop = _async_dispatch_loop
thread = _async_dispatch_thread
_async_dispatch_loop = None
_async_dispatch_thread = None

if loop is not None and not loop.is_closed():
try:
loop.call_soon_threadsafe(loop.stop)
except Exception:
pass

if thread is not None:
thread.join(timeout=5)


atexit.register(_shutdown_async_dispatch_loop)


# Use negative values to indicate unsigned ints / binary data / usec time precision
rowdat_1_type_map = {
'bool': ft.LONGLONG,
Expand Down Expand Up @@ -1195,15 +1394,26 @@ async def __call__(
func_info['colspec'], b''.join(data),
)

func_task = asyncio.create_task(
func(cancel_event, call_timer, *inputs)
if func_info['is_async']
else to_thread(
lambda: asyncio.run(
func(cancel_event, call_timer, *inputs),
# Async UDFs run on the dedicated dispatch loop; sync UDFs run
# in a worker thread (one asyncio.run per call) so they cannot
# block that shared loop (see the module-level notes above).
if func_info.get('is_async'):
func_task = asyncio.create_task(
_dispatch_to_async_loop(
_cancellable_run(
cancel_event,
func(cancel_event, call_timer, *inputs),
),
),
),
)
)
else:
func_task = asyncio.create_task(
to_thread(
lambda: asyncio.run(
func(cancel_event, call_timer, *inputs),
),
),
)
disconnect_task = asyncio.create_task(
asyncio.sleep(int(1e9))
if ignore_cancel else cancel_on_disconnect(receive),
Expand All @@ -1219,17 +1429,21 @@ async def __call__(
all_tasks, return_when=asyncio.FIRST_COMPLETED,
)

# Signal cancellation before awaiting: cancelling func_task
# only unwinds its asyncio wrapper on this loop, not the work
# running off-thread; cancel_event is what actually reaches it.
if func_task in pending:
cancel_event.set()
Comment thread
cursor[bot] marked this conversation as resolved.

await cancel_all_tasks(pending)

for task in done:
if task is disconnect_task:
cancel_event.set()
raise asyncio.CancelledError(
'Function call was cancelled by client disconnect',
)

elif task is timeout_task:
cancel_event.set()
raise asyncio.TimeoutError(
'Function call was cancelled due to timeout',
)
Expand Down Expand Up @@ -1292,6 +1506,7 @@ async def __call__(
await send(self.error_response_dict)

finally:
cancel_event.set()
await cancel_all_tasks(all_tasks)

# Handle api reflection
Expand Down
Loading
Loading