From 1fed1be555ffff7a371d09034b14daa97d503b57 Mon Sep 17 00:00:00 2001 From: Caleigh Runge-Hottman Date: Thu, 14 Mar 2024 11:34:50 -0400 Subject: [PATCH] Add fatal LoggingThread error backout strategy In https://github.com/release-engineering/kobo/pull/169, fatal LoggingThread errors were logged to the worker's local log file before exiting. In https://github.com/release-engineering/kobo/pull/248, a more drastic measure was taken: all exceptions were indefinitely retried and the ability to write exceptions to the worker's local log file was removed. This approach could prevent the LoggingThread from terminating when encountering a fatal error. This commit combines the two approaches, backing out and exiting only after determining we've identified a persistent fatal error. The means by which we identify a fatal (vs a temporary/non-fatal) LoggingThread error is by simply retrying the `upload_task_log` method during a defined interval (defined by the LoggingThread's `_timeout` attribute). If the method continues to fail for the duration of the interval (i.e., does not succeed by the time the timeout is reached), we can consider the error to be fatal. At this point, we attempt to instead write the error to the worker's local log file, and raise the exception. Note that the timeout can be toggled using the `KOBO_LOGGING_THREAD_TIMEOUT` environment variable. --- kobo/worker/logger.py | 23 ++++++++++++++++++----- tests/test_logger.py | 27 +++++++++++++++++++++++++++ 2 files changed, 45 insertions(+), 5 deletions(-) diff --git a/kobo/worker/logger.py b/kobo/worker/logger.py index 5cad5c4..a5c3961 100644 --- a/kobo/worker/logger.py +++ b/kobo/worker/logger.py @@ -2,6 +2,7 @@ import threading import time +import os import six @@ -31,6 +32,7 @@ def __init__(self, hub, task_id, *args, **kwargs): self._running = True self._send_time = 0 self._send_data = b"" + self._timeout = int(os.environ.get("KOBO_LOGGING_THREAD_TIMEOUT", 600)) def read_queue(self): out = self._queue.get_nowait() @@ -68,15 +70,26 @@ def run(self): self._send_time = now self._send_data = b"" except Exception: - # Log all caught exceptions. + # Any exception other than an XML-RPC fault may be fatal. It is + # possible that we've encountered a retryable error, such as a + # temporary network disruption between worker and hub. Attempt + # to retry for a bit. + if now - self._send_time <= self._timeout: + continue + + # If the timemout has been exceeded, we can assume we've + # encountered a non-temporary, fatal exception. + # + # Since upload_task_log is apparently not working, we can't get + # this into the task logs, but it should at least be possible + # for this to get into the worker's local log file. if self._logger: msg = "\n".join([ - "Exception in LoggingThread:", + "Fatal error in LoggingThread", kobo.tback.Traceback().get_traceback(), ]) - self._logger.log_error(msg) - - continue + self._logger.log_critical(msg) + raise def write(self, data): """Add data to the queue and set the event for sending queue content.""" diff --git a/tests/test_logger.py b/tests/test_logger.py index d2a85e6..9ef6404 100644 --- a/tests/test_logger.py +++ b/tests/test_logger.py @@ -56,6 +56,33 @@ def test_upload_task_log_after_some_time(self): self.assertFalse(thread.is_alive()) self.assertFalse(thread._running) + # Following test intentionally kills a thread with an exception. + @pytest.mark.filterwarnings("ignore::pytest.PytestUnhandledThreadExceptionWarning") + def test_logs_on_fatal_error(self): + # Set up a logger whose output we'll be able to inspect. + logs = StringIO() + logger = logging.getLogger('TestLoggingThread') + logger.addHandler(logging.StreamHandler(logs)) + kobo_logger = LoggingBase(logger) + + mock_hub = Mock() + mock_hub.upload_task_log.side_effect = RuntimeError("Simulated error") + + thread = LoggingThread(mock_hub, 9999, logger=kobo_logger) + thread.daemon = True + thread.start() + + thread.write('This is a log message!') + # Since we set up a fatal error, we expect the thread to die soon + # despite not calling stop(). + thread.join(10.0) + self.assertFalse(thread.is_alive()) + + # Before dying, it should have written something useful to the logs. + captured = logs.getvalue() + self.assertIn('Fatal error in LoggingThread', captured) + self.assertIn('RuntimeError: Simulated error', captured) + def test_logs_during_temporary_outage(self): # Messages written to the logging thread during a temporary # outage should be uploaded (and not discarded) after the hub