Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 22 additions & 4 deletions logtail/flusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,29 @@ def step(self):
if shutdown and self.pipe.empty():
self.should_run = False

def flush(self):
def flush(self, timeout=None):
"""Block until the worker has drained the queue.

If ``timeout`` is given (in seconds), return after at most that much
wall time has elapsed even if the queue is still non-empty. Returns
``True`` if the queue was drained, ``False`` if the timeout fired.

A bounded ``flush()`` is important during ``logging.shutdown()`` —
without it, a slow or unreachable upload endpoint blocks the entire
process indefinitely. It also limits the window in which a deadlock
with the global ``logging`` lock can persist (see ``handler.py`` for
the urllib3-debug background).
"""
self._flushing = True
while not self._clean or not self.pipe.empty():
time.sleep(self.check_interval)
self._flushing = False
try:
deadline = None if timeout is None else time.time() + timeout
while not self._clean or not self.pipe.empty():
if deadline is not None and time.time() >= deadline:
return False
time.sleep(self.check_interval)
return True
finally:
self._flushing = False

def _initial_time_remaining(flush_interval):
return flush_interval
Expand Down
22 changes: 21 additions & 1 deletion logtail/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from __future__ import print_function, unicode_literals
import logging
import json
import os

from .compat import queue
from .helpers import DEFAULT_CONTEXT
Expand All @@ -17,6 +18,23 @@
DEFAULT_DROP_EXTRA_EVENTS = True
DEFAULT_INCLUDE_EXTRA_ATTRIBUTES = True
DEFAULT_TIMEOUT = 30
DEFAULT_FLUSH_TIMEOUT = 30


# urllib3 emits per-request DEBUG logs from inside its connection pool.
# The FlushWorker uses requests/urllib3 to POST to BetterStack — so any
# debug() call from urllib3 happens on the FlushWorker thread. When
# logging.shutdown() / logging.config.dictConfig() runs (e.g. triggered
# at import time by libraries like pymilvus), the calling thread holds
# logging._lock while iterating handlers and invoking flush(); the
# FlushWorker's debug() call would need the same lock and the two threads
# deadlock. Quieting urllib3 to WARNING keeps the debug() calls cheap
# no-ops and avoids the lock contention entirely. Set
# LOGTAIL_KEEP_URLLIB3_LOGS=1 to opt out.
if os.getenv('LOGTAIL_KEEP_URLLIB3_LOGS', '').lower() not in ('1', 'true', 'yes'):
_urllib3_logger = logging.getLogger('urllib3')
if _urllib3_logger.level == logging.NOTSET or _urllib3_logger.level < logging.WARNING:
_urllib3_logger.setLevel(logging.WARNING)


class LogtailHandler(logging.Handler):
Expand All @@ -31,6 +49,7 @@ def __init__(self,
include_extra_attributes=DEFAULT_INCLUDE_EXTRA_ATTRIBUTES,
context=DEFAULT_CONTEXT,
timeout=DEFAULT_TIMEOUT,
flush_timeout=DEFAULT_FLUSH_TIMEOUT,
level=logging.NOTSET):
super(LogtailHandler, self).__init__(level=level)
self.source_token = source_token
Expand All @@ -47,6 +66,7 @@ def __init__(self,
self.flush_interval = flush_interval
self.check_interval = check_interval
self.raise_exceptions = raise_exceptions
self.flush_timeout = flush_timeout
self.dropcount = 0
# Do not initialize the flush thread yet because it causes issues on Render.
self.flush_thread = None
Expand Down Expand Up @@ -83,4 +103,4 @@ def emit(self, record):

def flush(self):
if self.flush_thread and self.flush_thread.is_alive():
self.flush_thread.flush()
self.flush_thread.flush(timeout=self.flush_timeout)
30 changes: 30 additions & 0 deletions tests/test_flusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,3 +160,33 @@ def test_shutdown_dont_raise_exception_in_thread(self):
self.assertFalse(threading.excepthook.called)

threading.excepthook = original_excepthook

def test_flush_returns_immediately_on_empty_clean_queue(self):
_, _, fw = self._setup_worker()
# _clean defaults to True and pipe is empty — flush should return at once.
t1 = time.time()
result = fw.flush()
elapsed = time.time() - t1
self.assertTrue(result)
self.assertLess(elapsed, 0.1)

def test_flush_with_timeout_returns_false_when_queue_not_drained(self):
pipe, _, fw = self._setup_worker()
# Put an item in but never start the worker thread → queue stays full,
# so flush() will spin until the timeout fires.
pipe.put(object(), block=False)

t1 = time.time()
result = fw.flush(timeout=0.05)
elapsed = time.time() - t1

self.assertFalse(result)
# Bounded: should return shortly after the timeout, not block forever.
self.assertLess(elapsed, 1.0)
self.assertGreaterEqual(elapsed, 0.04)

def test_flush_clears_flushing_flag_even_on_timeout(self):
pipe, _, fw = self._setup_worker()
pipe.put(object(), block=False)
fw.flush(timeout=0.05)
self.assertFalse(fw._flushing)
26 changes: 25 additions & 1 deletion tests/test_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,35 @@ def test_handler_passes_timeout_to_uploader(self, MockWorker):
# Test default timeout
handler = LogtailHandler(source_token=self.source_token, host=self.host)
self.assertEqual(handler.uploader.timeout, 30)

# Test custom timeout
handler = LogtailHandler(source_token=self.source_token, host=self.host, timeout=10)
self.assertEqual(handler.uploader.timeout, 10)

@patch('logtail.handler.FlushWorker')
def test_handler_stores_flush_timeout(self, MockWorker):
# Default
handler = LogtailHandler(source_token=self.source_token, host=self.host)
self.assertEqual(handler.flush_timeout, 30)
# Custom
handler = LogtailHandler(source_token=self.source_token, host=self.host, flush_timeout=2)
self.assertEqual(handler.flush_timeout, 2)
# Explicit None disables the timeout (legacy unbounded wait)
handler = LogtailHandler(source_token=self.source_token, host=self.host, flush_timeout=None)
self.assertIsNone(handler.flush_timeout)

@patch('logtail.handler.FlushWorker')
def test_handler_passes_flush_timeout_to_worker(self, MockWorker):
handler = LogtailHandler(source_token=self.source_token, host=self.host, flush_timeout=7)
# Trigger flush_thread creation by emitting once.
logger = logging.getLogger(__name__)
logger.handlers = []
logger.addHandler(handler)
logger.critical('hello')
# The mocked FlushWorker is alive by default; flush() forwards timeout.
handler.flush()
handler.flush_thread.flush.assert_called_with(timeout=7)

@patch('logtail.handler.FlushWorker')
def test_handler_creates_pipe_from_args(self, MockWorker):
buffer_capacity = 9
Expand Down