diff --git a/temporalio/worker/_replayer.py b/temporalio/worker/_replayer.py index e856e0f8..05ca56f1 100644 --- a/temporalio/worker/_replayer.py +++ b/temporalio/worker/_replayer.py @@ -45,6 +45,7 @@ def __init__( identity: Optional[str] = None, debug_mode: bool = False, runtime: Optional[temporalio.runtime.Runtime] = None, + disable_safe_workflow_eviction: bool = False, ) -> None: """Create a replayer to replay workflows from history. @@ -67,6 +68,7 @@ def __init__( identity=identity, debug_mode=debug_mode, runtime=runtime, + disable_safe_workflow_eviction=disable_safe_workflow_eviction, ) def config(self) -> ReplayerConfig: @@ -228,6 +230,9 @@ def on_eviction_hook( metric_meter=runtime.metric_meter, on_eviction_hook=on_eviction_hook, disable_eager_activity_execution=False, + disable_safe_eviction=self._config[ + "disable_safe_workflow_eviction" + ], ).run() ) @@ -298,6 +303,7 @@ class ReplayerConfig(TypedDict, total=False): identity: Optional[str] debug_mode: bool runtime: Optional[temporalio.runtime.Runtime] + disable_safe_workflow_eviction: bool @dataclass(frozen=True) diff --git a/temporalio/worker/_worker.py b/temporalio/worker/_worker.py index 06a31220..7d40a28e 100644 --- a/temporalio/worker/_worker.py +++ b/temporalio/worker/_worker.py @@ -78,6 +78,7 @@ def __init__( disable_eager_activity_execution: bool = False, on_fatal_error: Optional[Callable[[BaseException], Awaitable[None]]] = None, use_worker_versioning: bool = False, + disable_safe_workflow_eviction: bool = False, ) -> None: """Create a worker to process workflows and/or activities. @@ -183,11 +184,19 @@ def __init__( on_fatal_error: An async function that can handle a failure before the worker shutdown commences. This cannot stop the shutdown and any exception raised is logged and ignored. - use_worker_versioning: If true, the `build_id` argument must be specified, and this - worker opts into the worker versioning feature. This ensures it only receives - workflow tasks for workflows which it claims to be compatible with. - - For more information, see https://docs.temporal.io/workers#worker-versioning + use_worker_versioning: If true, the `build_id` argument must be + specified, and this worker opts into the worker versioning + feature. This ensures it only receives workflow tasks for + workflows which it claims to be compatible with. For more + information, see + https://docs.temporal.io/workers#worker-versioning. + disable_safe_workflow_eviction: If true, instead of letting the + workflow collect its tasks properly, the worker will simply let + the Python garbage collector collect the tasks. WARNING: Users + should not set this value to true. The garbage collector will + throw ``GeneratorExit`` in coroutines causing them to to wake up + in different threads and run ``finally`` and other code in the + wrong workflow environment. """ if not activities and not workflows: raise ValueError("At least one activity or workflow must be specified") @@ -254,6 +263,7 @@ def __init__( disable_eager_activity_execution=disable_eager_activity_execution, on_fatal_error=on_fatal_error, use_worker_versioning=use_worker_versioning, + disable_safe_workflow_eviction=disable_safe_workflow_eviction, ) self._started = False self._shutdown_event = asyncio.Event() @@ -303,6 +313,7 @@ def __init__( disable_eager_activity_execution=disable_eager_activity_execution, metric_meter=runtime.metric_meter, on_eviction_hook=None, + disable_safe_eviction=disable_safe_workflow_eviction, ) # We need an already connected client @@ -473,9 +484,11 @@ async def raise_on_shutdown(): assert self._workflow_worker tasks[2] = asyncio.create_task(self._workflow_worker.drain_poll_queue()) - # Set worker-shutdown event + # Notify shutdown occurring if self._activity_worker: self._activity_worker.notify_shutdown() + if self._workflow_worker: + self._workflow_worker.notify_shutdown() # Wait for all tasks to complete (i.e. for poller loops to stop) await asyncio.wait(tasks) @@ -597,6 +610,7 @@ class WorkerConfig(TypedDict, total=False): disable_eager_activity_execution: bool on_fatal_error: Optional[Callable[[BaseException], Awaitable[None]]] use_worker_versioning: bool + disable_safe_workflow_eviction: bool _default_build_id: Optional[str] = None diff --git a/temporalio/worker/_workflow.py b/temporalio/worker/_workflow.py index 89091688..e4ffd4d7 100644 --- a/temporalio/worker/_workflow.py +++ b/temporalio/worker/_workflow.py @@ -60,6 +60,7 @@ def __init__( [str, temporalio.bridge.proto.workflow_activation.RemoveFromCache], None ] ], + disable_safe_eviction: bool, ) -> None: self._bridge_worker = bridge_worker self._namespace = namespace @@ -91,6 +92,7 @@ def __init__( self._running_workflows: Dict[str, WorkflowInstance] = {} self._disable_eager_activity_execution = disable_eager_activity_execution self._on_eviction_hook = on_eviction_hook + self._disable_safe_eviction = disable_safe_eviction self._throw_after_activation: Optional[Exception] = None # If there's a debug mode or a truthy TEMPORAL_DEBUG env var, disable @@ -99,6 +101,9 @@ def __init__( None if debug_mode or os.environ.get("TEMPORAL_DEBUG") else 2 ) + # Keep track of workflows that could not be evicted + self._could_not_evict_count = 0 + # Validate and build workflow dict self._workflows: Dict[str, temporalio.workflow._Definition] = {} self._dynamic_workflow: Optional[temporalio.workflow._Definition] = None @@ -155,6 +160,13 @@ async def run(self) -> None: if self._throw_after_activation: raise self._throw_after_activation + def notify_shutdown(self) -> None: + if self._could_not_evict_count: + logger.warn( + f"Shutting down workflow worker, but {self._could_not_evict_count} " + + "workflow(s) could not be evicted previously, so the shutdown will hang" + ) + # Only call this if run() raised an error async def drain_poll_queue(self) -> None: while True: @@ -182,7 +194,6 @@ async def _handle_activation( cache_remove_job = job.remove_from_cache elif job.HasField("start_workflow"): start_job = job.start_workflow - cache_remove_only_activation = len(act.jobs) == 1 and cache_remove_job # Build default success completion (e.g. remove-job-only activations) completion = ( @@ -190,9 +201,8 @@ async def _handle_activation( ) completion.successful.SetInParent() try: - # Decode the activation if there's a codec and it's not a - # cache-remove-only activation - if self._data_converter.payload_codec and not cache_remove_only_activation: + # Decode the activation if there's a codec and not cache remove job + if self._data_converter.payload_codec and not cache_remove_job: await temporalio.bridge.worker.decode_activation( act, self._data_converter.payload_codec ) @@ -200,24 +210,27 @@ async def _handle_activation( if LOG_PROTOS: logger.debug("Received workflow activation:\n%s", act) - # We only have to run if there are any non-remove-from-cache jobs - if not cache_remove_only_activation: - # If the workflow is not running yet, create it + # If the workflow is not running yet and this isn't a cache remove + # job, create it. We do not even fetch a workflow if it's a cache + # remove job and safe evictions are enabled + workflow = None + if not cache_remove_job or not self._disable_safe_eviction: workflow = self._running_workflows.get(act.run_id) - if not workflow: - # Must have a start job to create instance - if not start_job: - raise RuntimeError( - "Missing start workflow, workflow could have unexpectedly been removed from cache" - ) - workflow = self._create_workflow_instance(act, start_job) - self._running_workflows[act.run_id] = workflow - elif start_job: - # This should never happen - logger.warn("Cache already exists for activation with start job") - - # Run activation in separate thread so we can check if it's - # deadlocked + if not workflow and not cache_remove_job: + # Must have a start job to create instance + if not start_job: + raise RuntimeError( + "Missing start workflow, workflow could have unexpectedly been removed from cache" + ) + workflow = self._create_workflow_instance(act, start_job) + self._running_workflows[act.run_id] = workflow + elif start_job: + # This should never happen + logger.warn("Cache already exists for activation with start job") + + # Run activation in separate thread so we can check if it's + # deadlocked + if workflow: activate_task = asyncio.get_running_loop().run_in_executor( self._workflow_task_executor, workflow.activate, @@ -234,6 +247,17 @@ async def _handle_activation( f"[TMPRL1101] Potential deadlock detected, workflow didn't yield within {self._deadlock_timeout_seconds} second(s)" ) except Exception as err: + # We cannot fail a cache eviction, we must just log and not complete + # the activation (failed or otherwise). This should only happen in + # cases of deadlock or tasks not properly completing, and yes this + # means that a slot is forever taken. + # TODO(cretz): Should we build a complex mechanism to continually + # try the eviction until it succeeds? + if cache_remove_job: + logger.exception("Failed running eviction job, not evicting") + self._could_not_evict_count += 1 + return + logger.exception( "Failed handling activation on workflow with run ID %s", act.run_id ) @@ -257,7 +281,9 @@ async def _handle_activation( # Always set the run ID on the completion completion.run_id = act.run_id - # If there is a remove-from-cache job, do so + # If there is a remove-from-cache job, do so. We don't need to log a + # warning if there's not, because create workflow failing for + # unregistered workflow still triggers cache remove job if cache_remove_job: if act.run_id in self._running_workflows: logger.debug( @@ -266,16 +292,9 @@ async def _handle_activation( cache_remove_job.message, ) del self._running_workflows[act.run_id] - else: - logger.warn( - "Eviction request on unknown workflow with run ID %s, message: %s", - act.run_id, - cache_remove_job.message, - ) - # Encode the completion if there's a codec and it's not a - # cache-remove-only activation - if self._data_converter.payload_codec and not cache_remove_only_activation: + # Encode the completion if there's a codec and not cache remove job + if self._data_converter.payload_codec and not cache_remove_job: try: await temporalio.bridge.worker.encode_completion( completion, self._data_converter.payload_codec diff --git a/temporalio/worker/_workflow_instance.py b/temporalio/worker/_workflow_instance.py index 62d06d9c..fd9a65ce 100644 --- a/temporalio/worker/_workflow_instance.py +++ b/temporalio/worker/_workflow_instance.py @@ -70,6 +70,9 @@ logger = logging.getLogger(__name__) +# Set to true to log all cases where we're ignoring things during delete +LOG_IGNORE_DURING_DELETE = False + class WorkflowRunner(ABC): """Abstract runner for workflows that creates workflow instances to run. @@ -254,19 +257,13 @@ def __init__(self, det: WorkflowInstanceDetails) -> None: # Set ourselves on our own loop temporalio.workflow._Runtime.set_on_loop(self, self) - # After GC, Python raises GeneratorExit calls from all awaiting tasks. - # Then in a finally of such an await, another exception can swallow - # these causing even more issues. We will set ourselves as deleted so we - # can check in some places to swallow these errors on tear down. + # When we evict, we have to mark the workflow as deleting so we don't + # add any commands and we swallow exceptions on tear down self._deleting = False # We only create the metric meter lazily self._metric_meter: Optional[_ReplaySafeMetricMeter] = None - def __del__(self) -> None: - # We have confirmed there are no super() versions of __del__ - self._deleting = True - #### Activation functions #### # These are in alphabetical order and besides "activate", all other calls # are "_apply_" + the job field name. @@ -316,16 +313,38 @@ def activate( # be checked in patch jobs (first index) or query jobs (last # index). self._run_once(check_conditions=index == 1 or index == 2) - except temporalio.exceptions.FailureError as err: + except Exception as err: # We want failure errors during activation, like those that can # happen during payload conversion, to fail the workflow not the # task - try: - self._set_workflow_failure(err) - except Exception as inner_err: - activation_err = inner_err - except Exception as err: - activation_err = err + if isinstance(err, temporalio.exceptions.FailureError): + try: + self._set_workflow_failure(err) + except Exception as inner_err: + activation_err = inner_err + else: + # Otherwise all exceptions are activation errors + activation_err = err + # If we're deleting, swallow any activation error + if self._deleting: + if LOG_IGNORE_DURING_DELETE: + logger.debug( + "Ignoring exception while deleting workflow", exc_info=True + ) + activation_err = None + + # If we're deleting, there better be no more tasks. It is important for + # the integrity of the system that we check this. If there are tasks + # remaining, they and any associated coroutines will get garbage + # collected which can trigger a GeneratorExit exception thrown in the + # coroutine which can cause it to wakeup on a different thread which may + # have a different workflow/event-loop going. + if self._deleting and self._tasks: + raise RuntimeError( + f"Cache removal processed, but {len(self._tasks)} tasks remain. " + + f"Stack traces below:\n\n{self._stack_trace()}" + ) + if activation_err: logger.warning( f"Failed activation on workflow {self._info.workflow_type} with ID {self._info.workflow_id} and run ID {self._info.run_id}", @@ -381,8 +400,7 @@ def _apply( elif job.HasField("notify_has_patch"): self._apply_notify_has_patch(job.notify_has_patch) elif job.HasField("remove_from_cache"): - # Ignore, handled externally - pass + self._apply_remove_from_cache(job.remove_from_cache) elif job.HasField("resolve_activity"): self._apply_resolve_activity(job.resolve_activity) elif job.HasField("resolve_child_workflow_execution"): @@ -516,19 +534,13 @@ async def run_update() -> None: self._current_activation_error = err return except BaseException as err: - # During tear down, generator exit and no-runtime exceptions can appear - if not self._deleting: - raise - if not isinstance( - err, - ( - GeneratorExit, - temporalio.workflow._NotInWorkflowEventLoopError, - ), - ): - logger.debug( - "Ignoring exception while deleting workflow", exc_info=True - ) + if self._deleting: + if LOG_IGNORE_DURING_DELETE: + logger.debug( + "Ignoring exception while deleting workflow", exc_info=True + ) + return + raise self.create_task( run_update(), @@ -605,6 +617,15 @@ def _apply_notify_has_patch( ) -> None: self._patches_notified.add(job.patch_id) + def _apply_remove_from_cache( + self, job: temporalio.bridge.proto.workflow_activation.RemoveFromCache + ) -> None: + self._deleting = True + self._cancel_requested = True + # Cancel everything + for task in self._tasks: + task.cancel() + def _apply_resolve_activity( self, job: temporalio.bridge.proto.workflow_activation.ResolveActivity ) -> None: @@ -774,17 +795,14 @@ async def run_workflow(input: ExecuteWorkflowInput) -> None: ) command = self._add_command() command.complete_workflow_execution.result.CopyFrom(result_payloads[0]) - except BaseException as err: - # During tear down, generator exit and event loop exceptions can occur - if not self._deleting: - raise - if not isinstance( - err, - (GeneratorExit, temporalio.workflow._NotInWorkflowEventLoopError), - ): - logger.debug( - "Ignoring exception while deleting workflow", exc_info=True - ) + except Exception: + if self._deleting: + if LOG_IGNORE_DURING_DELETE: + logger.debug( + "Ignoring exception while deleting workflow", exc_info=True + ) + return + raise # Set arg types, using raw values for dynamic arg_types = self._defn.arg_types @@ -1477,7 +1495,11 @@ def _as_read_only(self) -> Iterator[None]: finally: self._read_only = prev_val - def _assert_not_read_only(self, action_attempted: str) -> None: + def _assert_not_read_only( + self, action_attempted: str, *, allow_during_delete: bool = False + ) -> None: + if self._deleting and not allow_during_delete: + raise RuntimeError(f"Ignoring {action_attempted} while deleting") if self._read_only: raise temporalio.workflow.ReadOnlyContextError( f"While in read-only function, action attempted: {action_attempted}" @@ -1620,9 +1642,9 @@ def _run_once(self, *, check_conditions: bool) -> None: handle = self._ready.popleft() handle._run() - # Must throw here. Only really set inside + # Must throw here if not deleting. Only really set inside # _run_top_level_workflow_function. - if self._current_activation_error: + if self._current_activation_error and not self._deleting: raise self._current_activation_error # Check conditions which may add to the ready list. Also remove @@ -1645,12 +1667,23 @@ async def _run_top_level_workflow_function(self, coro: Awaitable[None]) -> None: except _ContinueAsNewError as err: logger.debug("Workflow requested continue as new") err._apply_command(self._add_command()) - - # Note in some Python versions, cancelled error does not extend - # exception - # TODO(cretz): Should I fail the task on BaseException too (e.g. - # KeyboardInterrupt)? except (Exception, asyncio.CancelledError) as err: + # During tear down we can ignore exceptions. Technically the + # command-adding done later would throw a not-in-workflow exception + # we'd ignore later, but it's better to preempt it + if self._deleting: + if LOG_IGNORE_DURING_DELETE: + logger.debug( + "Ignoring exception while deleting workflow", exc_info=True + ) + return + + # Handle continue as new + if isinstance(err, _ContinueAsNewError): + logger.debug("Workflow requested continue as new") + err._apply_command(self._add_command()) + return + logger.debug( f"Workflow raised failure with run ID {self._info.run_id}", exc_info=True, @@ -1676,16 +1709,6 @@ async def _run_top_level_workflow_function(self, coro: Awaitable[None]) -> None: else: # All other exceptions fail the task self._current_activation_error = err - except BaseException as err: - # During tear down, generator exit and no-runtime exceptions can appear - if not self._deleting: - raise - if not isinstance( - err, (GeneratorExit, temporalio.workflow._NotInWorkflowEventLoopError) - ): - logger.debug( - "Ignoring exception while deleting workflow", exc_info=True - ) def _set_workflow_failure(self, err: temporalio.exceptions.FailureError) -> None: # All other failure errors fail the workflow @@ -1752,7 +1775,9 @@ def call_soon( *args: Any, context: Optional[contextvars.Context] = None, ) -> asyncio.Handle: - self._assert_not_read_only("schedule task") + # We need to allow this during delete because this is how tasks schedule + # entire cancellation calls + self._assert_not_read_only("schedule task", allow_during_delete=True) handle = asyncio.Handle(callback, args, self, context) self._ready.append(handle) return handle @@ -1856,6 +1881,9 @@ def default_exception_handler(self, context: _Context) -> None: logger.error("\n".join(log_lines), exc_info=exc_info) def call_exception_handler(self, context: _Context) -> None: + # Do nothing with any uncaught exceptions while deleting + if self._deleting: + return # Copied and slightly modified from # asyncio.BaseEventLoop.call_exception_handler if self._exception_handler is None: @@ -2045,12 +2073,15 @@ def __init__( instance._register_task(self, name=f"activity: {input.activity}") def cancel(self, msg: Optional[Any] = None) -> bool: - self._instance._assert_not_read_only("cancel activity handle") - # We override this because if it's not yet started and not done, we need - # to send a cancel command because the async function won't run to trap - # the cancel (i.e. cancelled before started) - if not self._started and not self.done(): - self._apply_cancel_command(self._instance._add_command()) + # Allow the cancel to go through for the task even if we're deleting, + # just don't do any commands + if not self._instance._deleting: + self._instance._assert_not_read_only("cancel activity handle") + # We override this because if it's not yet started and not done, we need + # to send a cancel command because the async function won't run to trap + # the cancel (i.e. cancelled before started) + if not self._started and not self.done(): + self._apply_cancel_command(self._instance._add_command()) # Message not supported in older versions if sys.version_info < (3, 9): return super().cancel() diff --git a/tests/worker/test_workflow.py b/tests/worker/test_workflow.py index b4bce90d..a47344bc 100644 --- a/tests/worker/test_workflow.py +++ b/tests/worker/test_workflow.py @@ -2362,7 +2362,10 @@ async def run(self) -> None: async def test_workflow_deadlock(client: Client): - async with new_worker(client, DeadlockedWorkflow) as worker: + # Disable safe eviction so the worker can complete + async with new_worker( + client, DeadlockedWorkflow, disable_safe_workflow_eviction=True + ) as worker: deadlock_thread_event.clear() handle = await client.start_workflow( DeadlockedWorkflow.run, @@ -3217,21 +3220,60 @@ async def test_manual_result_type(client: Client): assert res4 == ManualResultType(some_string="from-query") +@activity.defn +async def wait_forever_activity() -> None: + await asyncio.Future() + + +@workflow.defn +class WaitForeverWorkflow: + @workflow.run + async def run(self) -> None: + await asyncio.Future() + + @workflow.defn -class SwallowGeneratorExitWorkflow: +class CacheEvictionTearDownWorkflow: def __init__(self) -> None: self._signal_count = 0 @workflow.run async def run(self) -> None: + # Start several things in background. This is just to show that eviction + # can work even with these things running. + tasks = [ + asyncio.create_task( + workflow.execute_activity( + wait_forever_activity, start_to_close_timeout=timedelta(hours=1) + ) + ), + asyncio.create_task( + workflow.execute_child_workflow(WaitForeverWorkflow.run) + ), + asyncio.create_task(asyncio.sleep(1000)), + asyncio.shield( + workflow.execute_activity( + wait_forever_activity, start_to_close_timeout=timedelta(hours=1) + ) + ), + asyncio.create_task(workflow.wait_condition(lambda: False)), + ] + gather_fut = asyncio.gather(*tasks, return_exceptions=True) + # Let's also start something in the background that we never wait on + asyncio.create_task(asyncio.sleep(1000)) try: # Wait for signal count to reach 2 + await asyncio.sleep(0.01) await workflow.wait_condition(lambda: self._signal_count > 1) finally: - # This finally, on eviction, is actually called because the above - # await raises GeneratorExit. Then this will raise a - # _NotInWorkflowEventLoopError swallowing that. + # This finally, on eviction, is actually called but the command + # should be ignored + await asyncio.sleep(0.01) await workflow.wait_condition(lambda: self._signal_count > 2) + # Cancel gather tasks and wait on them, but ignore the errors + for task in tasks: + task.cancel() + await gather_fut @workflow.signal async def signal(self) -> None: @@ -3242,11 +3284,17 @@ def signal_count(self) -> int: return self._signal_count -async def test_swallow_generator_exit(client: Client): - # This test simulates GeneratorExit and GC issues by forcing eviction on - # each step +async def test_cache_eviction_tear_down(client: Client): + # This test simulates forcing eviction. This used to raise GeneratorExit on + # GC which triggered the finally which could run on any thread Python + # chooses, but now we expect eviction to properly tear down tasks and + # therefore we cancel them async with new_worker( - client, SwallowGeneratorExitWorkflow, max_cached_workflows=0 + client, + CacheEvictionTearDownWorkflow, + WaitForeverWorkflow, + activities=[wait_forever_activity], + max_cached_workflows=0, ) as worker: # Put a hook to catch unraisable exceptions old_hook = sys.unraisablehook @@ -3254,25 +3302,25 @@ async def test_swallow_generator_exit(client: Client): sys.unraisablehook = hook_calls.append try: handle = await client.start_workflow( - SwallowGeneratorExitWorkflow.run, + CacheEvictionTearDownWorkflow.run, id=f"wf-{uuid.uuid4()}", task_queue=worker.task_queue, ) async def signal_count() -> int: - return await handle.query(SwallowGeneratorExitWorkflow.signal_count) + return await handle.query(CacheEvictionTearDownWorkflow.signal_count) # Confirm signal count as 0 await assert_eq_eventually(0, signal_count) # Send signal and confirm it's at 1 - await handle.signal(SwallowGeneratorExitWorkflow.signal) + await handle.signal(CacheEvictionTearDownWorkflow.signal) await assert_eq_eventually(1, signal_count) - await handle.signal(SwallowGeneratorExitWorkflow.signal) + await handle.signal(CacheEvictionTearDownWorkflow.signal) await assert_eq_eventually(2, signal_count) - await handle.signal(SwallowGeneratorExitWorkflow.signal) + await handle.signal(CacheEvictionTearDownWorkflow.signal) await assert_eq_eventually(3, signal_count) await handle.result()