diff --git a/temporalio/worker/_workflow.py b/temporalio/worker/_workflow.py index 634f59a4..9d975419 100644 --- a/temporalio/worker/_workflow.py +++ b/temporalio/worker/_workflow.py @@ -261,7 +261,10 @@ async def _handle_activation( # TODO(cretz): Should we build a complex mechanism to continually # try the eviction until it succeeds? if cache_remove_job: - logger.exception("Failed running eviction job, not evicting") + logger.exception( + "Failed running eviction job, not evicting. " + + "Since eviction could not be processed, this worker cannot complete and the slot will remain forever used." + ) self._could_not_evict_count += 1 return diff --git a/temporalio/worker/_workflow_instance.py b/temporalio/worker/_workflow_instance.py index 34c997ad..fecbecaf 100644 --- a/temporalio/worker/_workflow_instance.py +++ b/temporalio/worker/_workflow_instance.py @@ -357,7 +357,7 @@ def activate( # have a different workflow/event-loop going. if self._deleting and self._tasks: raise RuntimeError( - f"Cache removal processed, but {len(self._tasks)} tasks remain. " + f"Eviction processed, but {len(self._tasks)} tasks remain. " + f"Stack traces below:\n\n{self._stack_trace()}" ) @@ -1776,7 +1776,7 @@ async def _signal_external_workflow( def _stack_trace(self) -> str: stacks = [] - for task in self._tasks: + for task in list(self._tasks): # TODO(cretz): These stacks are not very clean currently frames = [] for frame in task.get_stack(): diff --git a/temporalio/worker/workflow_sandbox/_runner.py b/temporalio/worker/workflow_sandbox/_runner.py index aa882b9e..87dbab2c 100644 --- a/temporalio/worker/workflow_sandbox/_runner.py +++ b/temporalio/worker/workflow_sandbox/_runner.py @@ -173,4 +173,4 @@ def _run_code(self, code: str, **extra_globals: Any) -> None: finally: temporalio.workflow.unsafe._set_in_sandbox(False) for k, v in extra_globals.items(): - del self.globals_and_locals[k] + self.globals_and_locals.pop(k, None) diff --git a/tests/helpers/__init__.py b/tests/helpers/__init__.py index 9199cb25..251ef12b 100644 --- a/tests/helpers/__init__.py +++ b/tests/helpers/__init__.py @@ -59,7 +59,7 @@ async def assert_eq_eventually( await asyncio.sleep(interval.total_seconds()) assert ( expected == last_value - ), "timed out waiting for equal, asserted against last value" + ), f"timed out waiting for equal, asserted against last value of {last_value}" async def worker_versioning_enabled(client: Client) -> bool: diff --git a/tests/worker/test_workflow.py b/tests/worker/test_workflow.py index dd59be3f..6950899a 100644 --- a/tests/worker/test_workflow.py +++ b/tests/worker/test_workflow.py @@ -2467,6 +2467,8 @@ async def test_workflow_deadlock(client: Client): async with new_worker( client, DeadlockedWorkflow, disable_safe_workflow_eviction=True ) as worker: + if worker._workflow_worker: + worker._workflow_worker._deadlock_timeout_seconds = 1 deadlock_thread_event.clear() handle = await client.start_workflow( DeadlockedWorkflow.run, @@ -2488,7 +2490,7 @@ async def last_history_task_failure() -> str: try: await assert_eq_eventually( - "[TMPRL1101] Potential deadlock detected, workflow didn't yield within 2 second(s)", + "[TMPRL1101] Potential deadlock detected, workflow didn't yield within 1 second(s)", last_history_task_failure, timeout=timedelta(seconds=5), interval=timedelta(seconds=1), @@ -2497,6 +2499,64 @@ async def last_history_task_failure() -> str: deadlock_thread_event.set() +@workflow.defn +class EvictionDeadlockWorkflow: + def __init__(self) -> None: + self.val = 1 + + async def wait_until_positive(self): + while True: + await workflow.wait_condition(lambda: self.val > 0) + self.val = -self.val + + async def wait_until_negative(self): + while True: + await workflow.wait_condition(lambda: self.val < 0) + self.val = -self.val + + @workflow.run + async def run(self): + await asyncio.gather(self.wait_until_negative(), self.wait_until_positive()) + + +async def test_workflow_eviction_deadlock(client: Client): + # We are running the worker, but we can't ever shut it down on eviction + # error so we send shutdown in the background and leave this worker dangling + worker = new_worker(client, EvictionDeadlockWorkflow) + if worker._workflow_worker: + worker._workflow_worker._deadlock_timeout_seconds = 1 + worker_task = asyncio.create_task(worker.run()) + + # Run workflow that deadlocks + handle = await client.start_workflow( + EvictionDeadlockWorkflow.run, + id=f"workflow-{uuid.uuid4()}", + task_queue=worker.task_queue, + ) + + async def last_history_task_failure() -> str: + resp = await client.workflow_service.get_workflow_execution_history( + GetWorkflowExecutionHistoryRequest( + namespace=client.namespace, + execution=WorkflowExecution(workflow_id=handle.id), + ), + ) + for event in reversed(resp.history.events): + if event.event_type == EventType.EVENT_TYPE_WORKFLOW_TASK_FAILED: + return event.workflow_task_failed_event_attributes.failure.message + return "" + + await assert_eq_eventually( + "[TMPRL1101] Potential deadlock detected, workflow didn't yield within 1 second(s)", + last_history_task_failure, + timeout=timedelta(seconds=5), + interval=timedelta(seconds=1), + ) + + # Send cancel but don't wait + worker_task.cancel() + + class PatchWorkflowBase: def __init__(self) -> None: self._result = ""