diff --git a/logtail/flusher.py b/logtail/flusher.py index c248e02..55b8f18 100644 --- a/logtail/flusher.py +++ b/logtail/flusher.py @@ -10,14 +10,17 @@ class FlushWorker(threading.Thread): - def __init__(self, upload, pipe, buffer_capacity, flush_interval): + def __init__(self, upload, pipe, buffer_capacity, flush_interval, check_interval): threading.Thread.__init__(self) self.parent_thread = threading.current_thread() self.upload = upload self.pipe = pipe self.buffer_capacity = buffer_capacity self.flush_interval = flush_interval + self.check_interval = check_interval self.should_run = True + self._flushing = False + self._clean = True def run(self): while self.should_run: @@ -27,6 +30,7 @@ def step(self): last_flush = time.time() time_remaining = _initial_time_remaining(self.flush_interval) frame = [] + self._clean = True # If the parent thread has exited but there are still outstanding # events, attempt to send them before exiting. @@ -38,16 +42,17 @@ def step(self): # `flush_interval` seconds have passed without sending any events. while len(frame) < self.buffer_capacity and time_remaining > 0: try: - # Blocks for up to 1.0 seconds for each item to prevent + # Blocks for up to `check_interval` seconds for each item to prevent # spinning and burning CPU unnecessarily. Could block for the # entire amount of `time_remaining` but then in the case that # the parent thread has exited, that entire amount of time # would be waited before this child worker thread exits. - entry = self.pipe.get(block=(not shutdown), timeout=1.0) + entry = self.pipe.get(block=(not shutdown), timeout=self.check_interval) + self._clean = False frame.append(entry) self.pipe.task_done() except queue.Empty: - if shutdown: + if shutdown or self._flushing: break shutdown = not self.parent_thread.is_alive() time_remaining = _calculate_time_remaining(last_flush, self.flush_interval) @@ -68,9 +73,15 @@ def step(self): if response.status_code == 500 and getattr(response, "exception") != None: print('Failed to send logs to Better Stack after {} retries: {}'.format(len(RETRY_SCHEDULE), response.exception)) + self._clean = True if shutdown and self.pipe.empty(): self.should_run = False + def flush(self): + self._flushing = True + while not self._clean or not self.pipe.empty(): + time.sleep(self.check_interval) + self._flushing = False def _initial_time_remaining(flush_interval): return flush_interval diff --git a/logtail/handler.py b/logtail/handler.py index 89946c9..91cffaa 100644 --- a/logtail/handler.py +++ b/logtail/handler.py @@ -12,6 +12,7 @@ DEFAULT_HOST = 'https://in.logs.betterstack.com' DEFAULT_BUFFER_CAPACITY = 1000 DEFAULT_FLUSH_INTERVAL = 1 +DEFAULT_CHECK_INTERVAL = 0.1 DEFAULT_RAISE_EXCEPTIONS = False DEFAULT_DROP_EXTRA_EVENTS = True DEFAULT_INCLUDE_EXTRA_ATTRIBUTES = True @@ -23,6 +24,7 @@ def __init__(self, host=DEFAULT_HOST, buffer_capacity=DEFAULT_BUFFER_CAPACITY, flush_interval=DEFAULT_FLUSH_INTERVAL, + check_interval=DEFAULT_CHECK_INTERVAL, raise_exceptions=DEFAULT_RAISE_EXCEPTIONS, drop_extra_events=DEFAULT_DROP_EXTRA_EVENTS, include_extra_attributes=DEFAULT_INCLUDE_EXTRA_ATTRIBUTES, @@ -38,6 +40,7 @@ def __init__(self, self.include_extra_attributes = include_extra_attributes self.buffer_capacity = buffer_capacity self.flush_interval = flush_interval + self.check_interval = check_interval self.raise_exceptions = raise_exceptions self.dropcount = 0 # Do not initialize the flush thread yet because it causes issues on Render. @@ -51,7 +54,8 @@ def ensure_flush_thread_alive(self): self.uploader, self.pipe, self.buffer_capacity, - self.flush_interval + self.flush_interval, + self.check_interval, ) self.flush_thread.start() @@ -71,3 +75,7 @@ def emit(self, record): except Exception as e: if self.raise_exceptions: raise e + + def flush(self): + if self.flush_thread and self.flush_thread.is_alive(): + self.flush_thread.flush()