From a7a14b3962f2d5670e1f7739ba3a88bfbf77af12 Mon Sep 17 00:00:00 2001 From: matija Date: Wed, 29 Apr 2026 18:28:42 +0200 Subject: [PATCH] Prevent deadlock with urllib3 logging during shutdown + add bounded flush timeout MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When LogtailHandler.flush() is called while another thread holds Python's global logging lock — most reproducibly during logging.shutdown() or logging.config.dictConfig() (e.g. triggered at import time by libraries like pymilvus) — the FlushWorker can deadlock against the calling thread. Cause: the FlushWorker uses requests/urllib3 to POST log frames; urllib3's connection-pool code calls logger.debug() from inside _make_request(), which requires logging._lock. The thread invoking shutdown holds that same lock while iterating handlers and calling flush(). If our flush() blocks waiting for the FlushWorker, both threads are stuck. Two complementary fixes: 1. Quiet urllib3 to WARNING when handler.py is imported, with an opt-out via LOGTAIL_KEEP_URLLIB3_LOGS=1. This makes urllib3's per-request debug() calls cheap no-ops, so the FlushWorker no longer needs the global logging lock during uploads. Existing urllib3 levels above WARNING are left untouched. 2. Bound LogtailHandler.flush() with a timeout (default 30s, configurable via the new flush_timeout constructor argument; pass None to keep the legacy unbounded wait). FlushWorker.flush() now accepts an optional timeout and returns True/False indicating whether the queue was fully drained. Shutdown will no longer hang the host process indefinitely when BetterStack is slow or unreachable, and the deadlock window from any future similar issue is bounded. Tests: five new unit tests covering both code paths; existing tests pass unchanged. --- logtail/flusher.py | 26 ++++++++++++++++++++++---- logtail/handler.py | 22 +++++++++++++++++++++- tests/test_flusher.py | 30 ++++++++++++++++++++++++++++++ tests/test_handler.py | 26 +++++++++++++++++++++++++- 4 files changed, 98 insertions(+), 6 deletions(-) diff --git a/logtail/flusher.py b/logtail/flusher.py index 55b8f18..0b1c483 100644 --- a/logtail/flusher.py +++ b/logtail/flusher.py @@ -77,11 +77,29 @@ def step(self): if shutdown and self.pipe.empty(): self.should_run = False - def flush(self): + def flush(self, timeout=None): + """Block until the worker has drained the queue. + + If ``timeout`` is given (in seconds), return after at most that much + wall time has elapsed even if the queue is still non-empty. Returns + ``True`` if the queue was drained, ``False`` if the timeout fired. + + A bounded ``flush()`` is important during ``logging.shutdown()`` — + without it, a slow or unreachable upload endpoint blocks the entire + process indefinitely. It also limits the window in which a deadlock + with the global ``logging`` lock can persist (see ``handler.py`` for + the urllib3-debug background). + """ self._flushing = True - while not self._clean or not self.pipe.empty(): - time.sleep(self.check_interval) - self._flushing = False + try: + deadline = None if timeout is None else time.time() + timeout + while not self._clean or not self.pipe.empty(): + if deadline is not None and time.time() >= deadline: + return False + time.sleep(self.check_interval) + return True + finally: + self._flushing = False def _initial_time_remaining(flush_interval): return flush_interval diff --git a/logtail/handler.py b/logtail/handler.py index 0913f4a..86fba5d 100644 --- a/logtail/handler.py +++ b/logtail/handler.py @@ -2,6 +2,7 @@ from __future__ import print_function, unicode_literals import logging import json +import os from .compat import queue from .helpers import DEFAULT_CONTEXT @@ -17,6 +18,23 @@ DEFAULT_DROP_EXTRA_EVENTS = True DEFAULT_INCLUDE_EXTRA_ATTRIBUTES = True DEFAULT_TIMEOUT = 30 +DEFAULT_FLUSH_TIMEOUT = 30 + + +# urllib3 emits per-request DEBUG logs from inside its connection pool. +# The FlushWorker uses requests/urllib3 to POST to BetterStack — so any +# debug() call from urllib3 happens on the FlushWorker thread. When +# logging.shutdown() / logging.config.dictConfig() runs (e.g. triggered +# at import time by libraries like pymilvus), the calling thread holds +# logging._lock while iterating handlers and invoking flush(); the +# FlushWorker's debug() call would need the same lock and the two threads +# deadlock. Quieting urllib3 to WARNING keeps the debug() calls cheap +# no-ops and avoids the lock contention entirely. Set +# LOGTAIL_KEEP_URLLIB3_LOGS=1 to opt out. +if os.getenv('LOGTAIL_KEEP_URLLIB3_LOGS', '').lower() not in ('1', 'true', 'yes'): + _urllib3_logger = logging.getLogger('urllib3') + if _urllib3_logger.level == logging.NOTSET or _urllib3_logger.level < logging.WARNING: + _urllib3_logger.setLevel(logging.WARNING) class LogtailHandler(logging.Handler): @@ -31,6 +49,7 @@ def __init__(self, include_extra_attributes=DEFAULT_INCLUDE_EXTRA_ATTRIBUTES, context=DEFAULT_CONTEXT, timeout=DEFAULT_TIMEOUT, + flush_timeout=DEFAULT_FLUSH_TIMEOUT, level=logging.NOTSET): super(LogtailHandler, self).__init__(level=level) self.source_token = source_token @@ -47,6 +66,7 @@ def __init__(self, self.flush_interval = flush_interval self.check_interval = check_interval self.raise_exceptions = raise_exceptions + self.flush_timeout = flush_timeout self.dropcount = 0 # Do not initialize the flush thread yet because it causes issues on Render. self.flush_thread = None @@ -83,4 +103,4 @@ def emit(self, record): def flush(self): if self.flush_thread and self.flush_thread.is_alive(): - self.flush_thread.flush() + self.flush_thread.flush(timeout=self.flush_timeout) diff --git a/tests/test_flusher.py b/tests/test_flusher.py index 0a22f75..53ec8c5 100644 --- a/tests/test_flusher.py +++ b/tests/test_flusher.py @@ -160,3 +160,33 @@ def test_shutdown_dont_raise_exception_in_thread(self): self.assertFalse(threading.excepthook.called) threading.excepthook = original_excepthook + + def test_flush_returns_immediately_on_empty_clean_queue(self): + _, _, fw = self._setup_worker() + # _clean defaults to True and pipe is empty — flush should return at once. + t1 = time.time() + result = fw.flush() + elapsed = time.time() - t1 + self.assertTrue(result) + self.assertLess(elapsed, 0.1) + + def test_flush_with_timeout_returns_false_when_queue_not_drained(self): + pipe, _, fw = self._setup_worker() + # Put an item in but never start the worker thread → queue stays full, + # so flush() will spin until the timeout fires. + pipe.put(object(), block=False) + + t1 = time.time() + result = fw.flush(timeout=0.05) + elapsed = time.time() - t1 + + self.assertFalse(result) + # Bounded: should return shortly after the timeout, not block forever. + self.assertLess(elapsed, 1.0) + self.assertGreaterEqual(elapsed, 0.04) + + def test_flush_clears_flushing_flag_even_on_timeout(self): + pipe, _, fw = self._setup_worker() + pipe.put(object(), block=False) + fw.flush(timeout=0.05) + self.assertFalse(fw._flushing) diff --git a/tests/test_handler.py b/tests/test_handler.py index fe51995..ead1853 100644 --- a/tests/test_handler.py +++ b/tests/test_handler.py @@ -26,11 +26,35 @@ def test_handler_passes_timeout_to_uploader(self, MockWorker): # Test default timeout handler = LogtailHandler(source_token=self.source_token, host=self.host) self.assertEqual(handler.uploader.timeout, 30) - + # Test custom timeout handler = LogtailHandler(source_token=self.source_token, host=self.host, timeout=10) self.assertEqual(handler.uploader.timeout, 10) + @patch('logtail.handler.FlushWorker') + def test_handler_stores_flush_timeout(self, MockWorker): + # Default + handler = LogtailHandler(source_token=self.source_token, host=self.host) + self.assertEqual(handler.flush_timeout, 30) + # Custom + handler = LogtailHandler(source_token=self.source_token, host=self.host, flush_timeout=2) + self.assertEqual(handler.flush_timeout, 2) + # Explicit None disables the timeout (legacy unbounded wait) + handler = LogtailHandler(source_token=self.source_token, host=self.host, flush_timeout=None) + self.assertIsNone(handler.flush_timeout) + + @patch('logtail.handler.FlushWorker') + def test_handler_passes_flush_timeout_to_worker(self, MockWorker): + handler = LogtailHandler(source_token=self.source_token, host=self.host, flush_timeout=7) + # Trigger flush_thread creation by emitting once. + logger = logging.getLogger(__name__) + logger.handlers = [] + logger.addHandler(handler) + logger.critical('hello') + # The mocked FlushWorker is alive by default; flush() forwards timeout. + handler.flush() + handler.flush_thread.flush.assert_called_with(timeout=7) + @patch('logtail.handler.FlushWorker') def test_handler_creates_pipe_from_args(self, MockWorker): buffer_capacity = 9