Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Safe Eviction #499

Merged
merged 6 commits into from
Apr 5, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions temporalio/worker/_replayer.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ def __init__(
identity: Optional[str] = None,
debug_mode: bool = False,
runtime: Optional[temporalio.runtime.Runtime] = None,
disable_safe_workflow_eviction: bool = False,
) -> None:
"""Create a replayer to replay workflows from history.

Expand All @@ -67,6 +68,7 @@ def __init__(
identity=identity,
debug_mode=debug_mode,
runtime=runtime,
disable_safe_workflow_eviction=disable_safe_workflow_eviction,
)

def config(self) -> ReplayerConfig:
Expand Down Expand Up @@ -228,6 +230,9 @@ def on_eviction_hook(
metric_meter=runtime.metric_meter,
on_eviction_hook=on_eviction_hook,
disable_eager_activity_execution=False,
disable_safe_eviction=self._config[
"disable_safe_workflow_eviction"
],
).run()
)

Expand Down Expand Up @@ -298,6 +303,7 @@ class ReplayerConfig(TypedDict, total=False):
identity: Optional[str]
debug_mode: bool
runtime: Optional[temporalio.runtime.Runtime]
disable_safe_workflow_eviction: bool


@dataclass(frozen=True)
Expand Down
26 changes: 20 additions & 6 deletions temporalio/worker/_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ def __init__(
disable_eager_activity_execution: bool = False,
on_fatal_error: Optional[Callable[[BaseException], Awaitable[None]]] = None,
use_worker_versioning: bool = False,
disable_safe_workflow_eviction: bool = False,
) -> None:
"""Create a worker to process workflows and/or activities.

Expand Down Expand Up @@ -183,11 +184,19 @@ def __init__(
on_fatal_error: An async function that can handle a failure before
the worker shutdown commences. This cannot stop the shutdown and
any exception raised is logged and ignored.
use_worker_versioning: If true, the `build_id` argument must be specified, and this
worker opts into the worker versioning feature. This ensures it only receives
workflow tasks for workflows which it claims to be compatible with.

For more information, see https://docs.temporal.io/workers#worker-versioning
use_worker_versioning: If true, the `build_id` argument must be
specified, and this worker opts into the worker versioning
feature. This ensures it only receives workflow tasks for
workflows which it claims to be compatible with. For more
information, see
https://docs.temporal.io/workers#worker-versioning.
disable_safe_workflow_eviction: If true, instead of letting the
workflow collect its tasks properly, the worker will simply let
the Python garbage collector collect the tasks. WARNING: Users
should not set this value to true. The garbage collector will
throw ``GeneratorExit`` in coroutines causing them to to wake up
in different threads and run ``finally`` and other code in the
wrong workflow environment.
"""
if not activities and not workflows:
raise ValueError("At least one activity or workflow must be specified")
Expand Down Expand Up @@ -254,6 +263,7 @@ def __init__(
disable_eager_activity_execution=disable_eager_activity_execution,
on_fatal_error=on_fatal_error,
use_worker_versioning=use_worker_versioning,
disable_safe_workflow_eviction=disable_safe_workflow_eviction,
)
self._started = False
self._shutdown_event = asyncio.Event()
Expand Down Expand Up @@ -303,6 +313,7 @@ def __init__(
disable_eager_activity_execution=disable_eager_activity_execution,
metric_meter=runtime.metric_meter,
on_eviction_hook=None,
disable_safe_eviction=disable_safe_workflow_eviction,
)

# We need an already connected client
Expand Down Expand Up @@ -473,9 +484,11 @@ async def raise_on_shutdown():
assert self._workflow_worker
tasks[2] = asyncio.create_task(self._workflow_worker.drain_poll_queue())

# Set worker-shutdown event
# Notify shutdown occurring
if self._activity_worker:
self._activity_worker.notify_shutdown()
if self._workflow_worker:
self._workflow_worker.notify_shutdown()

# Wait for all tasks to complete (i.e. for poller loops to stop)
await asyncio.wait(tasks)
Expand Down Expand Up @@ -597,6 +610,7 @@ class WorkerConfig(TypedDict, total=False):
disable_eager_activity_execution: bool
on_fatal_error: Optional[Callable[[BaseException], Awaitable[None]]]
use_worker_versioning: bool
disable_safe_workflow_eviction: bool


_default_build_id: Optional[str] = None
Expand Down
81 changes: 50 additions & 31 deletions temporalio/worker/_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ def __init__(
[str, temporalio.bridge.proto.workflow_activation.RemoveFromCache], None
]
],
disable_safe_eviction: bool,
) -> None:
self._bridge_worker = bridge_worker
self._namespace = namespace
Expand Down Expand Up @@ -91,6 +92,7 @@ def __init__(
self._running_workflows: Dict[str, WorkflowInstance] = {}
self._disable_eager_activity_execution = disable_eager_activity_execution
self._on_eviction_hook = on_eviction_hook
self._disable_safe_eviction = disable_safe_eviction
self._throw_after_activation: Optional[Exception] = None

# If there's a debug mode or a truthy TEMPORAL_DEBUG env var, disable
Expand All @@ -99,6 +101,9 @@ def __init__(
None if debug_mode or os.environ.get("TEMPORAL_DEBUG") else 2
)

# Keep track of workflows that could not be evicted
self._could_not_evict_count = 0

# Validate and build workflow dict
self._workflows: Dict[str, temporalio.workflow._Definition] = {}
self._dynamic_workflow: Optional[temporalio.workflow._Definition] = None
Expand Down Expand Up @@ -155,6 +160,13 @@ async def run(self) -> None:
if self._throw_after_activation:
raise self._throw_after_activation

def notify_shutdown(self) -> None:
if self._could_not_evict_count:
logger.warn(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this warrants 'error' level. Sounds like something the user should most definitely know about.

I know in our case the task will just get SIGKILL'ED but this is just a red flag in general

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, but this could also be while handling an exception which causes shutdown. But I think I agree this maybe should throw.

f"Shutting down workflow worker, but {self._could_not_evict_count} "
+ "workflow(s) could not be evicted previously, so the shutdown will hang"
)

# Only call this if run() raised an error
async def drain_poll_queue(self) -> None:
while True:
Expand Down Expand Up @@ -182,42 +194,43 @@ async def _handle_activation(
cache_remove_job = job.remove_from_cache
elif job.HasField("start_workflow"):
start_job = job.start_workflow
cache_remove_only_activation = len(act.jobs) == 1 and cache_remove_job

# Build default success completion (e.g. remove-job-only activations)
completion = (
temporalio.bridge.proto.workflow_completion.WorkflowActivationCompletion()
)
completion.successful.SetInParent()
try:
# Decode the activation if there's a codec and it's not a
# cache-remove-only activation
if self._data_converter.payload_codec and not cache_remove_only_activation:
# Decode the activation if there's a codec and not cache remove job
if self._data_converter.payload_codec and not cache_remove_job:
await temporalio.bridge.worker.decode_activation(
act, self._data_converter.payload_codec
)

if LOG_PROTOS:
logger.debug("Received workflow activation:\n%s", act)

# We only have to run if there are any non-remove-from-cache jobs
if not cache_remove_only_activation:
# If the workflow is not running yet, create it
# If the workflow is not running yet and this isn't a cache remove
# job, create it. We do not even fetch a workflow if it's a cache
# remove job and safe evictions are enabled
workflow = None
if not cache_remove_job or not self._disable_safe_eviction:
workflow = self._running_workflows.get(act.run_id)
if not workflow:
# Must have a start job to create instance
if not start_job:
raise RuntimeError(
"Missing start workflow, workflow could have unexpectedly been removed from cache"
)
workflow = self._create_workflow_instance(act, start_job)
self._running_workflows[act.run_id] = workflow
elif start_job:
# This should never happen
logger.warn("Cache already exists for activation with start job")

# Run activation in separate thread so we can check if it's
# deadlocked
if not workflow and not cache_remove_job:
# Must have a start job to create instance
if not start_job:
raise RuntimeError(
"Missing start workflow, workflow could have unexpectedly been removed from cache"
)
workflow = self._create_workflow_instance(act, start_job)
self._running_workflows[act.run_id] = workflow
elif start_job:
# This should never happen
logger.warn("Cache already exists for activation with start job")

# Run activation in separate thread so we can check if it's
# deadlocked
if workflow:
activate_task = asyncio.get_running_loop().run_in_executor(
self._workflow_task_executor,
workflow.activate,
Expand All @@ -234,6 +247,17 @@ async def _handle_activation(
f"[TMPRL1101] Potential deadlock detected, workflow didn't yield within {self._deadlock_timeout_seconds} second(s)"
)
except Exception as err:
# We cannot fail a cache eviction, we must just log and not complete
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would something like an exception or syntax error in finally of the workflow code cause this?
I.e. this is just a workflow task activation failure that happens to happen during eviction?

Copy link
Member Author

@cretz cretz Apr 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would something like an exception or syntax error in finally of the workflow code cause this?

No, workflow code exceptions would not bubble out here, we swallow them in the workflow instance

I.e. this is just a workflow task activation failure that happens to happen during eviction?

Correct, and I actually have a hard time replicating besides deadlock

# the activation (failed or otherwise). This should only happen in
# cases of deadlock or tasks not properly completing, and yes this
# means that a slot is forever taken.
# TODO(cretz): Should we build a complex mechanism to continually
# try the eviction until it succeeds?
if cache_remove_job:
logger.exception("Failed running eviction job, not evicting")
self._could_not_evict_count += 1
return

logger.exception(
"Failed handling activation on workflow with run ID %s", act.run_id
)
Expand All @@ -257,7 +281,9 @@ async def _handle_activation(
# Always set the run ID on the completion
completion.run_id = act.run_id

# If there is a remove-from-cache job, do so
# If there is a remove-from-cache job, do so. We don't need to log a
# warning if there's not, because create workflow failing for
# unregistered workflow still triggers cache remove job
if cache_remove_job:
if act.run_id in self._running_workflows:
logger.debug(
Expand All @@ -266,16 +292,9 @@ async def _handle_activation(
cache_remove_job.message,
)
del self._running_workflows[act.run_id]
else:
logger.warn(
"Eviction request on unknown workflow with run ID %s, message: %s",
act.run_id,
cache_remove_job.message,
)

# Encode the completion if there's a codec and it's not a
# cache-remove-only activation
if self._data_converter.payload_codec and not cache_remove_only_activation:
# Encode the completion if there's a codec and not cache remove job
if self._data_converter.payload_codec and not cache_remove_job:
try:
await temporalio.bridge.worker.encode_completion(
completion, self._data_converter.payload_codec
Expand Down
Loading
Loading