From cf4c7cbff5553b3187226a7d92903d2094c59d31 Mon Sep 17 00:00:00 2001 From: Chad Retz Date: Fri, 12 Apr 2024 09:52:16 -0500 Subject: [PATCH] Ensure extra data on task fail logs (#502) Fixes #500 --- temporalio/worker/_workflow_instance.py | 1 + tests/worker/test_workflow.py | 121 +++++++++++++++++++----- 2 files changed, 99 insertions(+), 23 deletions(-) diff --git a/temporalio/worker/_workflow_instance.py b/temporalio/worker/_workflow_instance.py index fd9a65ce..afa742a2 100644 --- a/temporalio/worker/_workflow_instance.py +++ b/temporalio/worker/_workflow_instance.py @@ -349,6 +349,7 @@ def activate( logger.warning( f"Failed activation on workflow {self._info.workflow_type} with ID {self._info.workflow_id} and run ID {self._info.run_id}", exc_info=activation_err, + extra={"temporal_workflow": self._info._logger_details()}, ) # Set completion failure self._current_completion.failed.failure.SetInParent() diff --git a/tests/worker/test_workflow.py b/tests/worker/test_workflow.py index a47344bc..3ab2c4bd 100644 --- a/tests/worker/test_workflow.py +++ b/tests/worker/test_workflow.py @@ -9,6 +9,7 @@ import typing import uuid from abc import ABC, abstractmethod +from contextlib import contextmanager from dataclasses import dataclass from datetime import datetime, timedelta, timezone from typing import ( @@ -30,6 +31,7 @@ from google.protobuf.timestamp_pb2 import Timestamp from typing_extensions import Protocol, runtime_checkable +import temporalio.worker from temporalio import activity, workflow from temporalio.api.common.v1 import Payload, Payloads, WorkflowExecution from temporalio.api.enums.v1 import EventType @@ -1876,22 +1878,37 @@ def last_signal(self) -> str: return self._last_signal -async def test_workflow_logging(client: Client, env: WorkflowEnvironment): - # Use queue to capture log statements - log_queue: queue.Queue[logging.LogRecord] = queue.Queue() - handler = logging.handlers.QueueHandler(log_queue) - workflow.logger.base_logger.addHandler(handler) - prev_level = workflow.logger.base_logger.level - workflow.logger.base_logger.setLevel(logging.INFO) - workflow.logger.full_workflow_info_on_extra = True +class LogCapturer: + def __init__(self) -> None: + self.log_queue: queue.Queue[logging.LogRecord] = queue.Queue() - def find_log(starts_with: str) -> Optional[logging.LogRecord]: - for record in cast(List[logging.LogRecord], log_queue.queue): + @contextmanager + def logs_captured(self, *loggers: logging.Logger): + handler = logging.handlers.QueueHandler(self.log_queue) + + prev_levels = [l.level for l in loggers] + for l in loggers: + l.setLevel(logging.INFO) + l.addHandler(handler) + try: + yield self + finally: + for i, l in enumerate(loggers): + l.removeHandler(handler) + l.setLevel(prev_levels[i]) + + def find_log(self, starts_with: str) -> Optional[logging.LogRecord]: + for record in cast(List[logging.LogRecord], self.log_queue.queue): if record.message.startswith(starts_with): return record return None - try: + +async def test_workflow_logging(client: Client, env: WorkflowEnvironment): + workflow.logger.full_workflow_info_on_extra = True + with LogCapturer().logs_captured( + workflow.logger.base_logger, activity.logger.base_logger + ) as capturer: # Log two signals and kill worker before completing. Need to disable # workflow cache since we restart the worker and don't want to pay the # sticky queue penalty. @@ -1909,11 +1926,11 @@ def find_log(starts_with: str) -> Optional[logging.LogRecord]: assert "signal 2" == await handle.query(LoggingWorkflow.last_signal) # Confirm two logs happened - assert find_log("Signal: signal 1 ({'attempt':") - assert find_log("Signal: signal 2") - assert not find_log("Signal: signal 3") + assert capturer.find_log("Signal: signal 1 ({'attempt':") + assert capturer.find_log("Signal: signal 2") + assert not capturer.find_log("Signal: signal 3") # Also make sure it has some workflow info and correct funcName - record = find_log("Signal: signal 1") + record = capturer.find_log("Signal: signal 1") assert ( record and record.__dict__["temporal_workflow"]["workflow_type"] @@ -1924,7 +1941,7 @@ def find_log(starts_with: str) -> Optional[logging.LogRecord]: assert isinstance(record.__dict__["workflow_info"], workflow.Info) # Clear queue and start a new one with more signals - log_queue.queue.clear() + capturer.log_queue.queue.clear() async with new_worker( client, LoggingWorkflow, @@ -1937,13 +1954,71 @@ def find_log(starts_with: str) -> Optional[logging.LogRecord]: await handle.result() # Confirm replayed logs are not present but new ones are - assert not find_log("Signal: signal 1") - assert not find_log("Signal: signal 2") - assert find_log("Signal: signal 3") - assert find_log("Signal: finish") - finally: - workflow.logger.base_logger.removeHandler(handler) - workflow.logger.base_logger.setLevel(prev_level) + assert not capturer.find_log("Signal: signal 1") + assert not capturer.find_log("Signal: signal 2") + assert capturer.find_log("Signal: signal 3") + assert capturer.find_log("Signal: finish") + + +@activity.defn +async def task_fail_once_activity() -> None: + if activity.info().attempt == 1: + raise RuntimeError("Intentional activity task failure") + + +task_fail_once_workflow_has_failed = False + + +@workflow.defn(sandboxed=False) +class TaskFailOnceWorkflow: + @workflow.run + async def run(self) -> None: + # Fail on first attempt + global task_fail_once_workflow_has_failed + if not task_fail_once_workflow_has_failed: + task_fail_once_workflow_has_failed = True + raise RuntimeError("Intentional workflow task failure") + + # Execute activity that will fail once + await workflow.execute_activity( + task_fail_once_activity, + start_to_close_timeout=timedelta(seconds=30), + retry_policy=RetryPolicy( + initial_interval=timedelta(milliseconds=1), + backoff_coefficient=1.0, + maximum_attempts=2, + ), + ) + + +async def test_workflow_logging_task_fail(client: Client): + with LogCapturer().logs_captured( + activity.logger.base_logger, temporalio.worker._workflow_instance.logger + ) as capturer: + async with new_worker( + client, TaskFailOnceWorkflow, activities=[task_fail_once_activity] + ) as worker: + await client.execute_workflow( + TaskFailOnceWorkflow.run, + id=f"workflow-{uuid.uuid4()}", + task_queue=worker.task_queue, + ) + + wf_task_record = capturer.find_log("Failed activation on workflow") + assert wf_task_record + assert "Intentional workflow task failure" in wf_task_record.message + assert ( + getattr(wf_task_record, "temporal_workflow")["workflow_type"] + == "TaskFailOnceWorkflow" + ) + + act_task_record = capturer.find_log("Completing activity as failed") + assert act_task_record + assert "Intentional activity task failure" in act_task_record.message + assert ( + getattr(act_task_record, "temporal_activity")["activity_type"] + == "task_fail_once_activity" + ) @workflow.defn