Skip to content

Commit

Permalink
Merge pull request #253 from crungehottman/logging-thread-retries
Browse files Browse the repository at this point in the history
Add fatal LoggingThread error backout strategy
  • Loading branch information
crungehottman authored Mar 18, 2024
2 parents 869eb93 + 1fed1be commit 8c7fef3
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 5 deletions.
23 changes: 18 additions & 5 deletions kobo/worker/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import threading
import time
import os

import six

Expand Down Expand Up @@ -31,6 +32,7 @@ def __init__(self, hub, task_id, *args, **kwargs):
self._running = True
self._send_time = 0
self._send_data = b""
self._timeout = int(os.environ.get("KOBO_LOGGING_THREAD_TIMEOUT", 600))

def read_queue(self):
out = self._queue.get_nowait()
Expand Down Expand Up @@ -68,15 +70,26 @@ def run(self):
self._send_time = now
self._send_data = b""
except Exception:
# Log all caught exceptions.
# Any exception other than an XML-RPC fault may be fatal. It is
# possible that we've encountered a retryable error, such as a
# temporary network disruption between worker and hub. Attempt
# to retry for a bit.
if now - self._send_time <= self._timeout:
continue

# If the timemout has been exceeded, we can assume we've
# encountered a non-temporary, fatal exception.
#
# Since upload_task_log is apparently not working, we can't get
# this into the task logs, but it should at least be possible
# for this to get into the worker's local log file.
if self._logger:
msg = "\n".join([
"Exception in LoggingThread:",
"Fatal error in LoggingThread",
kobo.tback.Traceback().get_traceback(),
])
self._logger.log_error(msg)

continue
self._logger.log_critical(msg)
raise

def write(self, data):
"""Add data to the queue and set the event for sending queue content."""
Expand Down
27 changes: 27 additions & 0 deletions tests/test_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,33 @@ def test_upload_task_log_after_some_time(self):
self.assertFalse(thread.is_alive())
self.assertFalse(thread._running)

# Following test intentionally kills a thread with an exception.
@pytest.mark.filterwarnings("ignore::pytest.PytestUnhandledThreadExceptionWarning")
def test_logs_on_fatal_error(self):
# Set up a logger whose output we'll be able to inspect.
logs = StringIO()
logger = logging.getLogger('TestLoggingThread')
logger.addHandler(logging.StreamHandler(logs))
kobo_logger = LoggingBase(logger)

mock_hub = Mock()
mock_hub.upload_task_log.side_effect = RuntimeError("Simulated error")

thread = LoggingThread(mock_hub, 9999, logger=kobo_logger)
thread.daemon = True
thread.start()

thread.write('This is a log message!')
# Since we set up a fatal error, we expect the thread to die soon
# despite not calling stop().
thread.join(10.0)
self.assertFalse(thread.is_alive())

# Before dying, it should have written something useful to the logs.
captured = logs.getvalue()
self.assertIn('Fatal error in LoggingThread', captured)
self.assertIn('RuntimeError: Simulated error', captured)

def test_logs_during_temporary_outage(self):
# Messages written to the logging thread during a temporary
# outage should be uploaded (and not discarded) after the hub
Expand Down

0 comments on commit 8c7fef3

Please sign in to comment.