Skip to content

Commit

Permalink
Ensure extra data on task fail logs (temporalio#502)
Browse files Browse the repository at this point in the history
  • Loading branch information
cretz authored Apr 12, 2024
1 parent 1001653 commit cf4c7cb
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 23 deletions.
1 change: 1 addition & 0 deletions temporalio/worker/_workflow_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,7 @@ def activate(
logger.warning(
f"Failed activation on workflow {self._info.workflow_type} with ID {self._info.workflow_id} and run ID {self._info.run_id}",
exc_info=activation_err,
extra={"temporal_workflow": self._info._logger_details()},
)
# Set completion failure
self._current_completion.failed.failure.SetInParent()
Expand Down
121 changes: 98 additions & 23 deletions tests/worker/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import typing
import uuid
from abc import ABC, abstractmethod
from contextlib import contextmanager
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from typing import (
Expand All @@ -30,6 +31,7 @@
from google.protobuf.timestamp_pb2 import Timestamp
from typing_extensions import Protocol, runtime_checkable

import temporalio.worker
from temporalio import activity, workflow
from temporalio.api.common.v1 import Payload, Payloads, WorkflowExecution
from temporalio.api.enums.v1 import EventType
Expand Down Expand Up @@ -1876,22 +1878,37 @@ def last_signal(self) -> str:
return self._last_signal


async def test_workflow_logging(client: Client, env: WorkflowEnvironment):
# Use queue to capture log statements
log_queue: queue.Queue[logging.LogRecord] = queue.Queue()
handler = logging.handlers.QueueHandler(log_queue)
workflow.logger.base_logger.addHandler(handler)
prev_level = workflow.logger.base_logger.level
workflow.logger.base_logger.setLevel(logging.INFO)
workflow.logger.full_workflow_info_on_extra = True
class LogCapturer:
def __init__(self) -> None:
self.log_queue: queue.Queue[logging.LogRecord] = queue.Queue()

def find_log(starts_with: str) -> Optional[logging.LogRecord]:
for record in cast(List[logging.LogRecord], log_queue.queue):
@contextmanager
def logs_captured(self, *loggers: logging.Logger):
handler = logging.handlers.QueueHandler(self.log_queue)

prev_levels = [l.level for l in loggers]
for l in loggers:
l.setLevel(logging.INFO)
l.addHandler(handler)
try:
yield self
finally:
for i, l in enumerate(loggers):
l.removeHandler(handler)
l.setLevel(prev_levels[i])

def find_log(self, starts_with: str) -> Optional[logging.LogRecord]:
for record in cast(List[logging.LogRecord], self.log_queue.queue):
if record.message.startswith(starts_with):
return record
return None

try:

async def test_workflow_logging(client: Client, env: WorkflowEnvironment):
workflow.logger.full_workflow_info_on_extra = True
with LogCapturer().logs_captured(
workflow.logger.base_logger, activity.logger.base_logger
) as capturer:
# Log two signals and kill worker before completing. Need to disable
# workflow cache since we restart the worker and don't want to pay the
# sticky queue penalty.
Expand All @@ -1909,11 +1926,11 @@ def find_log(starts_with: str) -> Optional[logging.LogRecord]:
assert "signal 2" == await handle.query(LoggingWorkflow.last_signal)

# Confirm two logs happened
assert find_log("Signal: signal 1 ({'attempt':")
assert find_log("Signal: signal 2")
assert not find_log("Signal: signal 3")
assert capturer.find_log("Signal: signal 1 ({'attempt':")
assert capturer.find_log("Signal: signal 2")
assert not capturer.find_log("Signal: signal 3")
# Also make sure it has some workflow info and correct funcName
record = find_log("Signal: signal 1")
record = capturer.find_log("Signal: signal 1")
assert (
record
and record.__dict__["temporal_workflow"]["workflow_type"]
Expand All @@ -1924,7 +1941,7 @@ def find_log(starts_with: str) -> Optional[logging.LogRecord]:
assert isinstance(record.__dict__["workflow_info"], workflow.Info)

# Clear queue and start a new one with more signals
log_queue.queue.clear()
capturer.log_queue.queue.clear()
async with new_worker(
client,
LoggingWorkflow,
Expand All @@ -1937,13 +1954,71 @@ def find_log(starts_with: str) -> Optional[logging.LogRecord]:
await handle.result()

# Confirm replayed logs are not present but new ones are
assert not find_log("Signal: signal 1")
assert not find_log("Signal: signal 2")
assert find_log("Signal: signal 3")
assert find_log("Signal: finish")
finally:
workflow.logger.base_logger.removeHandler(handler)
workflow.logger.base_logger.setLevel(prev_level)
assert not capturer.find_log("Signal: signal 1")
assert not capturer.find_log("Signal: signal 2")
assert capturer.find_log("Signal: signal 3")
assert capturer.find_log("Signal: finish")


@activity.defn
async def task_fail_once_activity() -> None:
if activity.info().attempt == 1:
raise RuntimeError("Intentional activity task failure")


task_fail_once_workflow_has_failed = False


@workflow.defn(sandboxed=False)
class TaskFailOnceWorkflow:
@workflow.run
async def run(self) -> None:
# Fail on first attempt
global task_fail_once_workflow_has_failed
if not task_fail_once_workflow_has_failed:
task_fail_once_workflow_has_failed = True
raise RuntimeError("Intentional workflow task failure")

# Execute activity that will fail once
await workflow.execute_activity(
task_fail_once_activity,
start_to_close_timeout=timedelta(seconds=30),
retry_policy=RetryPolicy(
initial_interval=timedelta(milliseconds=1),
backoff_coefficient=1.0,
maximum_attempts=2,
),
)


async def test_workflow_logging_task_fail(client: Client):
with LogCapturer().logs_captured(
activity.logger.base_logger, temporalio.worker._workflow_instance.logger
) as capturer:
async with new_worker(
client, TaskFailOnceWorkflow, activities=[task_fail_once_activity]
) as worker:
await client.execute_workflow(
TaskFailOnceWorkflow.run,
id=f"workflow-{uuid.uuid4()}",
task_queue=worker.task_queue,
)

wf_task_record = capturer.find_log("Failed activation on workflow")
assert wf_task_record
assert "Intentional workflow task failure" in wf_task_record.message
assert (
getattr(wf_task_record, "temporal_workflow")["workflow_type"]
== "TaskFailOnceWorkflow"
)

act_task_record = capturer.find_log("Completing activity as failed")
assert act_task_record
assert "Intentional activity task failure" in act_task_record.message
assert (
getattr(act_task_record, "temporal_activity")["activity_type"]
== "task_fail_once_activity"
)


@workflow.defn
Expand Down

0 comments on commit cf4c7cb

Please sign in to comment.