Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add fatal LoggingThread error backout strategy #253

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 18 additions & 5 deletions kobo/worker/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import threading
import time
import os

import six

Expand Down Expand Up @@ -31,6 +32,7 @@ def __init__(self, hub, task_id, *args, **kwargs):
self._running = True
self._send_time = 0
self._send_data = b""
self._timeout = int(os.environ.get("KOBO_LOGGING_THREAD_TIMEOUT", 600))

def read_queue(self):
out = self._queue.get_nowait()
Expand Down Expand Up @@ -68,15 +70,26 @@ def run(self):
self._send_time = now
self._send_data = b""
except Exception:
# Log all caught exceptions.
# Any exception other than an XML-RPC fault may be fatal. It is
# possible that we've encountered a retryable error, such as a
# temporary network disruption between worker and hub. Attempt
# to retry for a bit.
if now - self._send_time <= self._timeout:
continue

# If the timemout has been exceeded, we can assume we've
# encountered a non-temporary, fatal exception.
#
# Since upload_task_log is apparently not working, we can't get
# this into the task logs, but it should at least be possible
# for this to get into the worker's local log file.
if self._logger:
msg = "\n".join([
"Exception in LoggingThread:",
"Fatal error in LoggingThread",
kobo.tback.Traceback().get_traceback(),
])
self._logger.log_error(msg)

continue
self._logger.log_critical(msg)
raise

def write(self, data):
"""Add data to the queue and set the event for sending queue content."""
Expand Down
27 changes: 27 additions & 0 deletions tests/test_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,33 @@ def test_upload_task_log_after_some_time(self):
self.assertFalse(thread.is_alive())
self.assertFalse(thread._running)

# Following test intentionally kills a thread with an exception.
@pytest.mark.filterwarnings("ignore::pytest.PytestUnhandledThreadExceptionWarning")
def test_logs_on_fatal_error(self):
# Set up a logger whose output we'll be able to inspect.
logs = StringIO()
logger = logging.getLogger('TestLoggingThread')
logger.addHandler(logging.StreamHandler(logs))
kobo_logger = LoggingBase(logger)

mock_hub = Mock()
mock_hub.upload_task_log.side_effect = RuntimeError("Simulated error")

thread = LoggingThread(mock_hub, 9999, logger=kobo_logger)
thread.daemon = True
thread.start()

thread.write('This is a log message!')
# Since we set up a fatal error, we expect the thread to die soon
# despite not calling stop().
thread.join(10.0)
self.assertFalse(thread.is_alive())

# Before dying, it should have written something useful to the logs.
captured = logs.getvalue()
self.assertIn('Fatal error in LoggingThread', captured)
self.assertIn('RuntimeError: Simulated error', captured)

def test_logs_during_temporary_outage(self):
# Messages written to the logging thread during a temporary
# outage should be uploaded (and not discarded) after the hub
Expand Down
Loading