Skip to content

Commit

Permalink
Add fatal LoggingThread error backout strategy
Browse files Browse the repository at this point in the history
In #169,
fatal LoggingThread errors were logged to the worker's local
log file before exiting.

In #248, a more
drastic measure was taken: all exceptions were indefinitely
retried and the ability to write exceptions to the worker's
local log file was removed. This approach could prevent the
LoggingThread from terminating when encountering a fatal error.

This commit combines the two approaches, backing out and exiting
only after determining we've identified a persistent fatal error.

The means by which we identify a fatal (vs a temporary/non-fatal)
LoggingThread error is by simply retrying the `upload_task_log`
method during a defined interval (defined by the LoggingThread's
`_timeout` attribute).

If the method continues to fail for the duration of the interval
(i.e., does not succeed by the time the timeout is reached), we
can consider the error to be fatal. At this point, we attempt to
instead write the error to the worker's local log file, and raise
the exception.

Note that the timeout can be toggled using the
`KOBO_LOGGING_THREAD_TIMEOUT` environment variable.
  • Loading branch information
crungehottman committed Mar 15, 2024
1 parent 88237f2 commit f589853
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 1 deletion.
23 changes: 22 additions & 1 deletion kobo/worker/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import threading
import time
import os

import six

Expand Down Expand Up @@ -30,6 +31,7 @@ def __init__(self, hub, task_id, *args, **kwargs):
self._running = True
self._send_time = 0
self._send_data = b""
self._timeout = int(os.environ.get("KOBO_LOGGING_THREAD_TIMEOUT", 120))

def read_queue(self):
out = self._queue.get_nowait()
Expand Down Expand Up @@ -67,7 +69,26 @@ def run(self):
self._send_time = now
self._send_data = b""
except Exception:
continue
# Any exception other than an XML-RPC fault may be fatal. It is
# possible that we've encountered a retryable error, such as a
# temporary network disruption between worker and hub. Attempt
# to retry for a bit.
if now - self._send_time <= self._timeout:
continue

# If the timemout has been exceeded, we can assume we've
# encountered a non-temporary, fatal exception.
#
# Since upload_task_log is apparently not working, we can't get
# this into the task logs, but it should at least be possible
# for this to get into the worker's local log file.
if self._logger:
msg = "\n".join([
"Fatal error in LoggingThread",
kobo.tback.Traceback().get_traceback(),
])
self._logger.log_critical(msg)
raise

def write(self, data):
"""Add data to the queue and set the event for sending queue content."""
Expand Down
27 changes: 27 additions & 0 deletions tests/test_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,33 @@ def test_upload_task_log_after_some_time(self):
self.assertFalse(thread.is_alive())
self.assertFalse(thread._running)

# Following test intentionally kills a thread with an exception.
@pytest.mark.filterwarnings("ignore::pytest.PytestUnhandledThreadExceptionWarning")
def test_logs_on_fatal_error(self):
# Set up a logger whose output we'll be able to inspect.
logs = StringIO()
logger = logging.getLogger('TestLoggingThread')
logger.addHandler(logging.StreamHandler(logs))
kobo_logger = LoggingBase(logger)

mock_hub = Mock()
mock_hub.upload_task_log.side_effect = RuntimeError("Simulated error")

thread = LoggingThread(mock_hub, 9999, logger=kobo_logger)
thread.daemon = True
thread.start()

thread.write('This is a log message!')
# Since we set up a fatal error, we expect the thread to die soon
# despite not calling stop().
thread.join(10.0)
self.assertFalse(thread.is_alive())

# Before dying, it should have written something useful to the logs.
captured = logs.getvalue()
self.assertIn('Fatal error in LoggingThread', captured)
self.assertIn('RuntimeError: Simulated error', captured)

def test_logs_during_temporary_outage(self):
# Messages written to the logging thread during a temporary
# outage should be uploaded (and not discarded) after the hub
Expand Down

0 comments on commit f589853

Please sign in to comment.