Skip to content

Commit

Permalink
Safe Eviction (temporalio#499)
Browse files Browse the repository at this point in the history
  • Loading branch information
cretz authored Apr 5, 2024
1 parent b07e75e commit 466da16
Show file tree
Hide file tree
Showing 5 changed files with 235 additions and 117 deletions.
6 changes: 6 additions & 0 deletions temporalio/worker/_replayer.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ def __init__(
identity: Optional[str] = None,
debug_mode: bool = False,
runtime: Optional[temporalio.runtime.Runtime] = None,
disable_safe_workflow_eviction: bool = False,
) -> None:
"""Create a replayer to replay workflows from history.
Expand All @@ -67,6 +68,7 @@ def __init__(
identity=identity,
debug_mode=debug_mode,
runtime=runtime,
disable_safe_workflow_eviction=disable_safe_workflow_eviction,
)

def config(self) -> ReplayerConfig:
Expand Down Expand Up @@ -228,6 +230,9 @@ def on_eviction_hook(
metric_meter=runtime.metric_meter,
on_eviction_hook=on_eviction_hook,
disable_eager_activity_execution=False,
disable_safe_eviction=self._config[
"disable_safe_workflow_eviction"
],
).run()
)

Expand Down Expand Up @@ -298,6 +303,7 @@ class ReplayerConfig(TypedDict, total=False):
identity: Optional[str]
debug_mode: bool
runtime: Optional[temporalio.runtime.Runtime]
disable_safe_workflow_eviction: bool


@dataclass(frozen=True)
Expand Down
26 changes: 20 additions & 6 deletions temporalio/worker/_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ def __init__(
disable_eager_activity_execution: bool = False,
on_fatal_error: Optional[Callable[[BaseException], Awaitable[None]]] = None,
use_worker_versioning: bool = False,
disable_safe_workflow_eviction: bool = False,
) -> None:
"""Create a worker to process workflows and/or activities.
Expand Down Expand Up @@ -183,11 +184,19 @@ def __init__(
on_fatal_error: An async function that can handle a failure before
the worker shutdown commences. This cannot stop the shutdown and
any exception raised is logged and ignored.
use_worker_versioning: If true, the `build_id` argument must be specified, and this
worker opts into the worker versioning feature. This ensures it only receives
workflow tasks for workflows which it claims to be compatible with.
For more information, see https://docs.temporal.io/workers#worker-versioning
use_worker_versioning: If true, the `build_id` argument must be
specified, and this worker opts into the worker versioning
feature. This ensures it only receives workflow tasks for
workflows which it claims to be compatible with. For more
information, see
https://docs.temporal.io/workers#worker-versioning.
disable_safe_workflow_eviction: If true, instead of letting the
workflow collect its tasks properly, the worker will simply let
the Python garbage collector collect the tasks. WARNING: Users
should not set this value to true. The garbage collector will
throw ``GeneratorExit`` in coroutines causing them to to wake up
in different threads and run ``finally`` and other code in the
wrong workflow environment.
"""
if not activities and not workflows:
raise ValueError("At least one activity or workflow must be specified")
Expand Down Expand Up @@ -254,6 +263,7 @@ def __init__(
disable_eager_activity_execution=disable_eager_activity_execution,
on_fatal_error=on_fatal_error,
use_worker_versioning=use_worker_versioning,
disable_safe_workflow_eviction=disable_safe_workflow_eviction,
)
self._started = False
self._shutdown_event = asyncio.Event()
Expand Down Expand Up @@ -303,6 +313,7 @@ def __init__(
disable_eager_activity_execution=disable_eager_activity_execution,
metric_meter=runtime.metric_meter,
on_eviction_hook=None,
disable_safe_eviction=disable_safe_workflow_eviction,
)

# We need an already connected client
Expand Down Expand Up @@ -473,9 +484,11 @@ async def raise_on_shutdown():
assert self._workflow_worker
tasks[2] = asyncio.create_task(self._workflow_worker.drain_poll_queue())

# Set worker-shutdown event
# Notify shutdown occurring
if self._activity_worker:
self._activity_worker.notify_shutdown()
if self._workflow_worker:
self._workflow_worker.notify_shutdown()

# Wait for all tasks to complete (i.e. for poller loops to stop)
await asyncio.wait(tasks)
Expand Down Expand Up @@ -597,6 +610,7 @@ class WorkerConfig(TypedDict, total=False):
disable_eager_activity_execution: bool
on_fatal_error: Optional[Callable[[BaseException], Awaitable[None]]]
use_worker_versioning: bool
disable_safe_workflow_eviction: bool


_default_build_id: Optional[str] = None
Expand Down
81 changes: 50 additions & 31 deletions temporalio/worker/_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ def __init__(
[str, temporalio.bridge.proto.workflow_activation.RemoveFromCache], None
]
],
disable_safe_eviction: bool,
) -> None:
self._bridge_worker = bridge_worker
self._namespace = namespace
Expand Down Expand Up @@ -91,6 +92,7 @@ def __init__(
self._running_workflows: Dict[str, WorkflowInstance] = {}
self._disable_eager_activity_execution = disable_eager_activity_execution
self._on_eviction_hook = on_eviction_hook
self._disable_safe_eviction = disable_safe_eviction
self._throw_after_activation: Optional[Exception] = None

# If there's a debug mode or a truthy TEMPORAL_DEBUG env var, disable
Expand All @@ -99,6 +101,9 @@ def __init__(
None if debug_mode or os.environ.get("TEMPORAL_DEBUG") else 2
)

# Keep track of workflows that could not be evicted
self._could_not_evict_count = 0

# Validate and build workflow dict
self._workflows: Dict[str, temporalio.workflow._Definition] = {}
self._dynamic_workflow: Optional[temporalio.workflow._Definition] = None
Expand Down Expand Up @@ -155,6 +160,13 @@ async def run(self) -> None:
if self._throw_after_activation:
raise self._throw_after_activation

def notify_shutdown(self) -> None:
if self._could_not_evict_count:
logger.warn(
f"Shutting down workflow worker, but {self._could_not_evict_count} "
+ "workflow(s) could not be evicted previously, so the shutdown will hang"
)

# Only call this if run() raised an error
async def drain_poll_queue(self) -> None:
while True:
Expand Down Expand Up @@ -182,42 +194,43 @@ async def _handle_activation(
cache_remove_job = job.remove_from_cache
elif job.HasField("start_workflow"):
start_job = job.start_workflow
cache_remove_only_activation = len(act.jobs) == 1 and cache_remove_job

# Build default success completion (e.g. remove-job-only activations)
completion = (
temporalio.bridge.proto.workflow_completion.WorkflowActivationCompletion()
)
completion.successful.SetInParent()
try:
# Decode the activation if there's a codec and it's not a
# cache-remove-only activation
if self._data_converter.payload_codec and not cache_remove_only_activation:
# Decode the activation if there's a codec and not cache remove job
if self._data_converter.payload_codec and not cache_remove_job:
await temporalio.bridge.worker.decode_activation(
act, self._data_converter.payload_codec
)

if LOG_PROTOS:
logger.debug("Received workflow activation:\n%s", act)

# We only have to run if there are any non-remove-from-cache jobs
if not cache_remove_only_activation:
# If the workflow is not running yet, create it
# If the workflow is not running yet and this isn't a cache remove
# job, create it. We do not even fetch a workflow if it's a cache
# remove job and safe evictions are enabled
workflow = None
if not cache_remove_job or not self._disable_safe_eviction:
workflow = self._running_workflows.get(act.run_id)
if not workflow:
# Must have a start job to create instance
if not start_job:
raise RuntimeError(
"Missing start workflow, workflow could have unexpectedly been removed from cache"
)
workflow = self._create_workflow_instance(act, start_job)
self._running_workflows[act.run_id] = workflow
elif start_job:
# This should never happen
logger.warn("Cache already exists for activation with start job")

# Run activation in separate thread so we can check if it's
# deadlocked
if not workflow and not cache_remove_job:
# Must have a start job to create instance
if not start_job:
raise RuntimeError(
"Missing start workflow, workflow could have unexpectedly been removed from cache"
)
workflow = self._create_workflow_instance(act, start_job)
self._running_workflows[act.run_id] = workflow
elif start_job:
# This should never happen
logger.warn("Cache already exists for activation with start job")

# Run activation in separate thread so we can check if it's
# deadlocked
if workflow:
activate_task = asyncio.get_running_loop().run_in_executor(
self._workflow_task_executor,
workflow.activate,
Expand All @@ -234,6 +247,17 @@ async def _handle_activation(
f"[TMPRL1101] Potential deadlock detected, workflow didn't yield within {self._deadlock_timeout_seconds} second(s)"
)
except Exception as err:
# We cannot fail a cache eviction, we must just log and not complete
# the activation (failed or otherwise). This should only happen in
# cases of deadlock or tasks not properly completing, and yes this
# means that a slot is forever taken.
# TODO(cretz): Should we build a complex mechanism to continually
# try the eviction until it succeeds?
if cache_remove_job:
logger.exception("Failed running eviction job, not evicting")
self._could_not_evict_count += 1
return

logger.exception(
"Failed handling activation on workflow with run ID %s", act.run_id
)
Expand All @@ -257,7 +281,9 @@ async def _handle_activation(
# Always set the run ID on the completion
completion.run_id = act.run_id

# If there is a remove-from-cache job, do so
# If there is a remove-from-cache job, do so. We don't need to log a
# warning if there's not, because create workflow failing for
# unregistered workflow still triggers cache remove job
if cache_remove_job:
if act.run_id in self._running_workflows:
logger.debug(
Expand All @@ -266,16 +292,9 @@ async def _handle_activation(
cache_remove_job.message,
)
del self._running_workflows[act.run_id]
else:
logger.warn(
"Eviction request on unknown workflow with run ID %s, message: %s",
act.run_id,
cache_remove_job.message,
)

# Encode the completion if there's a codec and it's not a
# cache-remove-only activation
if self._data_converter.payload_codec and not cache_remove_only_activation:
# Encode the completion if there's a codec and not cache remove job
if self._data_converter.payload_codec and not cache_remove_job:
try:
await temporalio.bridge.worker.encode_completion(
completion, self._data_converter.payload_codec
Expand Down
Loading

0 comments on commit 466da16

Please sign in to comment.