diff --git a/logtail/flusher.py b/logtail/flusher.py index 55b8f18..0b1c483 100644 --- a/logtail/flusher.py +++ b/logtail/flusher.py @@ -77,11 +77,29 @@ def step(self): if shutdown and self.pipe.empty(): self.should_run = False - def flush(self): + def flush(self, timeout=None): + """Block until the worker has drained the queue. + + If ``timeout`` is given (in seconds), return after at most that much + wall time has elapsed even if the queue is still non-empty. Returns + ``True`` if the queue was drained, ``False`` if the timeout fired. + + A bounded ``flush()`` is important during ``logging.shutdown()`` — + without it, a slow or unreachable upload endpoint blocks the entire + process indefinitely. It also limits the window in which a deadlock + with the global ``logging`` lock can persist (see ``handler.py`` for + the urllib3-debug background). + """ self._flushing = True - while not self._clean or not self.pipe.empty(): - time.sleep(self.check_interval) - self._flushing = False + try: + deadline = None if timeout is None else time.time() + timeout + while not self._clean or not self.pipe.empty(): + if deadline is not None and time.time() >= deadline: + return False + time.sleep(self.check_interval) + return True + finally: + self._flushing = False def _initial_time_remaining(flush_interval): return flush_interval diff --git a/logtail/handler.py b/logtail/handler.py index 0913f4a..86fba5d 100644 --- a/logtail/handler.py +++ b/logtail/handler.py @@ -2,6 +2,7 @@ from __future__ import print_function, unicode_literals import logging import json +import os from .compat import queue from .helpers import DEFAULT_CONTEXT @@ -17,6 +18,23 @@ DEFAULT_DROP_EXTRA_EVENTS = True DEFAULT_INCLUDE_EXTRA_ATTRIBUTES = True DEFAULT_TIMEOUT = 30 +DEFAULT_FLUSH_TIMEOUT = 30 + + +# urllib3 emits per-request DEBUG logs from inside its connection pool. +# The FlushWorker uses requests/urllib3 to POST to BetterStack — so any +# debug() call from urllib3 happens on the FlushWorker thread. When +# logging.shutdown() / logging.config.dictConfig() runs (e.g. triggered +# at import time by libraries like pymilvus), the calling thread holds +# logging._lock while iterating handlers and invoking flush(); the +# FlushWorker's debug() call would need the same lock and the two threads +# deadlock. Quieting urllib3 to WARNING keeps the debug() calls cheap +# no-ops and avoids the lock contention entirely. Set +# LOGTAIL_KEEP_URLLIB3_LOGS=1 to opt out. +if os.getenv('LOGTAIL_KEEP_URLLIB3_LOGS', '').lower() not in ('1', 'true', 'yes'): + _urllib3_logger = logging.getLogger('urllib3') + if _urllib3_logger.level == logging.NOTSET or _urllib3_logger.level < logging.WARNING: + _urllib3_logger.setLevel(logging.WARNING) class LogtailHandler(logging.Handler): @@ -31,6 +49,7 @@ def __init__(self, include_extra_attributes=DEFAULT_INCLUDE_EXTRA_ATTRIBUTES, context=DEFAULT_CONTEXT, timeout=DEFAULT_TIMEOUT, + flush_timeout=DEFAULT_FLUSH_TIMEOUT, level=logging.NOTSET): super(LogtailHandler, self).__init__(level=level) self.source_token = source_token @@ -47,6 +66,7 @@ def __init__(self, self.flush_interval = flush_interval self.check_interval = check_interval self.raise_exceptions = raise_exceptions + self.flush_timeout = flush_timeout self.dropcount = 0 # Do not initialize the flush thread yet because it causes issues on Render. self.flush_thread = None @@ -83,4 +103,4 @@ def emit(self, record): def flush(self): if self.flush_thread and self.flush_thread.is_alive(): - self.flush_thread.flush() + self.flush_thread.flush(timeout=self.flush_timeout) diff --git a/tests/test_flusher.py b/tests/test_flusher.py index 0a22f75..53ec8c5 100644 --- a/tests/test_flusher.py +++ b/tests/test_flusher.py @@ -160,3 +160,33 @@ def test_shutdown_dont_raise_exception_in_thread(self): self.assertFalse(threading.excepthook.called) threading.excepthook = original_excepthook + + def test_flush_returns_immediately_on_empty_clean_queue(self): + _, _, fw = self._setup_worker() + # _clean defaults to True and pipe is empty — flush should return at once. + t1 = time.time() + result = fw.flush() + elapsed = time.time() - t1 + self.assertTrue(result) + self.assertLess(elapsed, 0.1) + + def test_flush_with_timeout_returns_false_when_queue_not_drained(self): + pipe, _, fw = self._setup_worker() + # Put an item in but never start the worker thread → queue stays full, + # so flush() will spin until the timeout fires. + pipe.put(object(), block=False) + + t1 = time.time() + result = fw.flush(timeout=0.05) + elapsed = time.time() - t1 + + self.assertFalse(result) + # Bounded: should return shortly after the timeout, not block forever. + self.assertLess(elapsed, 1.0) + self.assertGreaterEqual(elapsed, 0.04) + + def test_flush_clears_flushing_flag_even_on_timeout(self): + pipe, _, fw = self._setup_worker() + pipe.put(object(), block=False) + fw.flush(timeout=0.05) + self.assertFalse(fw._flushing) diff --git a/tests/test_handler.py b/tests/test_handler.py index fe51995..ead1853 100644 --- a/tests/test_handler.py +++ b/tests/test_handler.py @@ -26,11 +26,35 @@ def test_handler_passes_timeout_to_uploader(self, MockWorker): # Test default timeout handler = LogtailHandler(source_token=self.source_token, host=self.host) self.assertEqual(handler.uploader.timeout, 30) - + # Test custom timeout handler = LogtailHandler(source_token=self.source_token, host=self.host, timeout=10) self.assertEqual(handler.uploader.timeout, 10) + @patch('logtail.handler.FlushWorker') + def test_handler_stores_flush_timeout(self, MockWorker): + # Default + handler = LogtailHandler(source_token=self.source_token, host=self.host) + self.assertEqual(handler.flush_timeout, 30) + # Custom + handler = LogtailHandler(source_token=self.source_token, host=self.host, flush_timeout=2) + self.assertEqual(handler.flush_timeout, 2) + # Explicit None disables the timeout (legacy unbounded wait) + handler = LogtailHandler(source_token=self.source_token, host=self.host, flush_timeout=None) + self.assertIsNone(handler.flush_timeout) + + @patch('logtail.handler.FlushWorker') + def test_handler_passes_flush_timeout_to_worker(self, MockWorker): + handler = LogtailHandler(source_token=self.source_token, host=self.host, flush_timeout=7) + # Trigger flush_thread creation by emitting once. + logger = logging.getLogger(__name__) + logger.handlers = [] + logger.addHandler(handler) + logger.critical('hello') + # The mocked FlushWorker is alive by default; flush() forwards timeout. + handler.flush() + handler.flush_thread.flush.assert_called_with(timeout=7) + @patch('logtail.handler.FlushWorker') def test_handler_creates_pipe_from_args(self, MockWorker): buffer_capacity = 9