Skip to content

Commit

Permalink
LogtailHandler: implement logging.Handler.flush
Browse files Browse the repository at this point in the history
  • Loading branch information
PetrHeinz committed Jun 19, 2024
1 parent 95ab74b commit 87c340e
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 5 deletions.
19 changes: 15 additions & 4 deletions logtail/flusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,17 @@


class FlushWorker(threading.Thread):
def __init__(self, upload, pipe, buffer_capacity, flush_interval):
def __init__(self, upload, pipe, buffer_capacity, flush_interval, check_interval):
threading.Thread.__init__(self)
self.parent_thread = threading.current_thread()
self.upload = upload
self.pipe = pipe
self.buffer_capacity = buffer_capacity
self.flush_interval = flush_interval
self.check_interval = check_interval
self.should_run = True
self._flushing = False
self._clean = True

def run(self):
while self.should_run:
Expand All @@ -27,6 +30,7 @@ def step(self):
last_flush = time.time()
time_remaining = _initial_time_remaining(self.flush_interval)
frame = []
self._clean = True

# If the parent thread has exited but there are still outstanding
# events, attempt to send them before exiting.
Expand All @@ -38,16 +42,17 @@ def step(self):
# `flush_interval` seconds have passed without sending any events.
while len(frame) < self.buffer_capacity and time_remaining > 0:
try:
# Blocks for up to 1.0 seconds for each item to prevent
# Blocks for up to `check_interval` seconds for each item to prevent
# spinning and burning CPU unnecessarily. Could block for the
# entire amount of `time_remaining` but then in the case that
# the parent thread has exited, that entire amount of time
# would be waited before this child worker thread exits.
entry = self.pipe.get(block=(not shutdown), timeout=1.0)
entry = self.pipe.get(block=(not shutdown), timeout=self.check_interval)
self._clean = False
frame.append(entry)
self.pipe.task_done()
except queue.Empty:
if shutdown:
if shutdown or self._flushing:
break
shutdown = not self.parent_thread.is_alive()
time_remaining = _calculate_time_remaining(last_flush, self.flush_interval)
Expand All @@ -68,9 +73,15 @@ def step(self):
if response.status_code == 500 and getattr(response, "exception") != None:
print('Failed to send logs to Better Stack after {} retries: {}'.format(len(RETRY_SCHEDULE), response.exception))

self._clean = True
if shutdown and self.pipe.empty():
self.should_run = False

def flush(self):
self._flushing = True
while not self._clean or not self.pipe.empty():
time.sleep(self.check_interval)
self._flushing = False

def _initial_time_remaining(flush_interval):
return flush_interval
Expand Down
10 changes: 9 additions & 1 deletion logtail/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
DEFAULT_HOST = 'https://in.logs.betterstack.com'
DEFAULT_BUFFER_CAPACITY = 1000
DEFAULT_FLUSH_INTERVAL = 1
DEFAULT_CHECK_INTERVAL = 0.1
DEFAULT_RAISE_EXCEPTIONS = False
DEFAULT_DROP_EXTRA_EVENTS = True
DEFAULT_INCLUDE_EXTRA_ATTRIBUTES = True
Expand All @@ -23,6 +24,7 @@ def __init__(self,
host=DEFAULT_HOST,
buffer_capacity=DEFAULT_BUFFER_CAPACITY,
flush_interval=DEFAULT_FLUSH_INTERVAL,
check_interval=DEFAULT_CHECK_INTERVAL,
raise_exceptions=DEFAULT_RAISE_EXCEPTIONS,
drop_extra_events=DEFAULT_DROP_EXTRA_EVENTS,
include_extra_attributes=DEFAULT_INCLUDE_EXTRA_ATTRIBUTES,
Expand All @@ -38,6 +40,7 @@ def __init__(self,
self.include_extra_attributes = include_extra_attributes
self.buffer_capacity = buffer_capacity
self.flush_interval = flush_interval
self.check_interval = check_interval
self.raise_exceptions = raise_exceptions
self.dropcount = 0
# Do not initialize the flush thread yet because it causes issues on Render.
Expand All @@ -51,7 +54,8 @@ def ensure_flush_thread_alive(self):
self.uploader,
self.pipe,
self.buffer_capacity,
self.flush_interval
self.flush_interval,
self.check_interval,
)
self.flush_thread.start()

Expand All @@ -71,3 +75,7 @@ def emit(self, record):
except Exception as e:
if self.raise_exceptions:
raise e

def flush(self):
if self.flush_thread and self.flush_thread.is_alive():
self.flush_thread.flush()

0 comments on commit 87c340e

Please sign in to comment.