Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ability for certain task failure types to fail workflow #516

Merged
merged 2 commits into from
Apr 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -722,7 +722,8 @@ While running in a workflow, in addition to features documented elsewhere, the f

#### Exceptions

* Workflows can raise exceptions to fail the workflow or the "workflow task" (i.e. suspend the workflow retrying).
* Workflows/updates can raise exceptions to fail the workflow or the "workflow task" (i.e. suspend the workflow
in a retrying state).
* Exceptions that are instances of `temporalio.exceptions.FailureError` will fail the workflow with that exception
* For failing the workflow explicitly with a user exception, use `temporalio.exceptions.ApplicationError`. This can
be marked non-retryable or include details as needed.
Expand All @@ -732,6 +733,13 @@ While running in a workflow, in addition to features documented elsewhere, the f
fixed. This is helpful for bad code or other non-predictable exceptions. To actually fail the workflow, use an
`ApplicationError` as mentioned above.

This default can be changed by providing a list of exception types to `workflow_failure_exception_types` when creating a
`Worker` or `failure_exception_types` on the `@workflow.defn` decorator. If a workflow-thrown exception is an instance
of any type in either list, it will fail the workflow instead of the task. This means a value of `[Exception]` will
cause every exception to fail the workflow instead of the task. Also, as a special case, if
`temporalio.workflow.NondeterminismError` (or any superclass of it) is set, non-deterministic exceptions will fail the
workflow. WARNING: These settings are experimental.

#### External Workflows

* `workflow.get_external_workflow_handle()` inside a workflow returns a handle to interact with another workflow
Expand Down
21 changes: 21 additions & 0 deletions temporalio/bridge/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@ use prost::Message;
use pyo3::exceptions::{PyException, PyRuntimeError, PyValueError};
use pyo3::prelude::*;
use pyo3::types::{PyBytes, PyTuple};
use std::collections::HashMap;
use std::collections::HashSet;
use std::sync::Arc;
use std::time::Duration;
use temporal_sdk_core::api::errors::{PollActivityError, PollWfError};
use temporal_sdk_core::replay::{HistoryForReplay, ReplayWorkerInput};
use temporal_sdk_core_api::errors::WorkflowErrorType;
use temporal_sdk_core_api::Worker;
use temporal_sdk_core_protos::coresdk::workflow_completion::WorkflowActivationCompletion;
use temporal_sdk_core_protos::coresdk::{ActivityHeartbeat, ActivityTaskCompletion};
Expand Down Expand Up @@ -45,6 +48,8 @@ pub struct WorkerConfig {
max_task_queue_activities_per_second: Option<f64>,
graceful_shutdown_period_millis: u64,
use_worker_versioning: bool,
nondeterminism_as_workflow_fail: bool,
nondeterminism_as_workflow_fail_for_types: HashSet<String>,
}

macro_rules! enter_sync {
Expand Down Expand Up @@ -234,6 +239,22 @@ impl TryFrom<WorkerConfig> for temporal_sdk_core::WorkerConfig {
// always set it even if 0.
.graceful_shutdown_period(Duration::from_millis(conf.graceful_shutdown_period_millis))
.use_worker_versioning(conf.use_worker_versioning)
.workflow_failure_errors(if conf.nondeterminism_as_workflow_fail {
HashSet::from([WorkflowErrorType::Nondeterminism])
} else {
HashSet::new()
})
.workflow_types_to_failure_errors(
conf.nondeterminism_as_workflow_fail_for_types
.iter()
.map(|s| {
(
s.to_owned(),
HashSet::from([WorkflowErrorType::Nondeterminism]),
)
})
.collect::<HashMap<String, HashSet<WorkflowErrorType>>>(),
)
.build()
.map_err(|err| PyValueError::new_err(format!("Invalid worker config: {}", err)))
}
Expand Down
13 changes: 12 additions & 1 deletion temporalio/bridge/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,16 @@
from __future__ import annotations

from dataclasses import dataclass
from typing import TYPE_CHECKING, Awaitable, Callable, List, Optional, Sequence, Tuple
from typing import (
TYPE_CHECKING,
Awaitable,
Callable,
List,
Optional,
Sequence,
Set,
Tuple,
)

import google.protobuf.internal.containers
from typing_extensions import TypeAlias
Expand Down Expand Up @@ -48,6 +57,8 @@ class WorkerConfig:
max_task_queue_activities_per_second: Optional[float]
graceful_shutdown_period_millis: int
use_worker_versioning: bool
nondeterminism_as_workflow_fail: bool
nondeterminism_as_workflow_fail_for_types: Set[str]


class Worker:
Expand Down
105 changes: 56 additions & 49 deletions temporalio/worker/_replayer.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ def __init__(
interceptors: Sequence[Interceptor] = [],
build_id: Optional[str] = None,
identity: Optional[str] = None,
workflow_failure_exception_types: Sequence[Type[BaseException]] = [],
debug_mode: bool = False,
runtime: Optional[temporalio.runtime.Runtime] = None,
disable_safe_workflow_eviction: bool = False,
Expand All @@ -66,6 +67,7 @@ def __init__(
interceptors=interceptors,
build_id=build_id,
identity=identity,
workflow_failure_exception_types=workflow_failure_exception_types,
debug_mode=debug_mode,
runtime=runtime,
disable_safe_workflow_eviction=disable_safe_workflow_eviction,
Expand Down Expand Up @@ -153,35 +155,6 @@ async def workflow_replay_iterator(
An async iterator that returns replayed workflow results as they are
replayed.
"""
# Create bridge worker
task_queue = f"replay-{self._config['build_id']}"
runtime = self._config["runtime"] or temporalio.runtime.Runtime.default()
bridge_worker, pusher = temporalio.bridge.worker.Worker.for_replay(
runtime._core_runtime,
temporalio.bridge.worker.WorkerConfig(
namespace=self._config["namespace"],
task_queue=task_queue,
build_id=self._config["build_id"] or load_default_build_id(),
identity_override=self._config["identity"],
# All values below are ignored but required by Core
max_cached_workflows=2,
max_outstanding_workflow_tasks=2,
max_outstanding_activities=1,
max_outstanding_local_activities=1,
max_concurrent_workflow_task_polls=1,
nonsticky_to_sticky_poll_ratio=1,
max_concurrent_activity_task_polls=1,
no_remote_activities=True,
sticky_queue_schedule_to_start_timeout_millis=1000,
max_heartbeat_throttle_interval_millis=1000,
default_heartbeat_throttle_interval_millis=1000,
max_activities_per_second=None,
max_task_queue_activities_per_second=None,
graceful_shutdown_period_millis=0,
use_worker_versioning=False,
),
)

try:
last_replay_failure: Optional[Exception]
last_replay_complete = asyncio.Event()
Expand Down Expand Up @@ -212,29 +185,62 @@ def on_eviction_hook(
last_replay_failure = None
last_replay_complete.set()

# Start the worker
workflow_worker_task = asyncio.create_task(
_WorkflowWorker(
bridge_worker=lambda: bridge_worker,
# Create worker referencing bridge worker
bridge_worker: temporalio.bridge.worker.Worker
task_queue = f"replay-{self._config['build_id']}"
runtime = self._config["runtime"] or temporalio.runtime.Runtime.default()
workflow_worker = _WorkflowWorker(
bridge_worker=lambda: bridge_worker,
namespace=self._config["namespace"],
task_queue=task_queue,
workflows=self._config["workflows"],
workflow_task_executor=self._config["workflow_task_executor"],
workflow_runner=self._config["workflow_runner"],
unsandboxed_workflow_runner=self._config["unsandboxed_workflow_runner"],
data_converter=self._config["data_converter"],
interceptors=self._config["interceptors"],
workflow_failure_exception_types=self._config[
"workflow_failure_exception_types"
],
debug_mode=self._config["debug_mode"],
metric_meter=runtime.metric_meter,
on_eviction_hook=on_eviction_hook,
disable_eager_activity_execution=False,
disable_safe_eviction=self._config["disable_safe_workflow_eviction"],
)
# Create bridge worker
bridge_worker, pusher = temporalio.bridge.worker.Worker.for_replay(
runtime._core_runtime,
temporalio.bridge.worker.WorkerConfig(
namespace=self._config["namespace"],
task_queue=task_queue,
workflows=self._config["workflows"],
workflow_task_executor=self._config["workflow_task_executor"],
workflow_runner=self._config["workflow_runner"],
unsandboxed_workflow_runner=self._config[
"unsandboxed_workflow_runner"
],
data_converter=self._config["data_converter"],
interceptors=self._config["interceptors"],
debug_mode=self._config["debug_mode"],
metric_meter=runtime.metric_meter,
on_eviction_hook=on_eviction_hook,
disable_eager_activity_execution=False,
disable_safe_eviction=self._config[
"disable_safe_workflow_eviction"
],
).run()
build_id=self._config["build_id"] or load_default_build_id(),
identity_override=self._config["identity"],
# Need to tell core whether we want to consider all
# non-determinism exceptions as workflow fail, and whether we do
# per workflow type
nondeterminism_as_workflow_fail=workflow_worker.nondeterminism_as_workflow_fail(),
nondeterminism_as_workflow_fail_for_types=workflow_worker.nondeterminism_as_workflow_fail_for_types(),
# All values below are ignored but required by Core
max_cached_workflows=2,
max_outstanding_workflow_tasks=2,
max_outstanding_activities=1,
max_outstanding_local_activities=1,
max_concurrent_workflow_task_polls=1,
nonsticky_to_sticky_poll_ratio=1,
max_concurrent_activity_task_polls=1,
no_remote_activities=True,
sticky_queue_schedule_to_start_timeout_millis=1000,
max_heartbeat_throttle_interval_millis=1000,
default_heartbeat_throttle_interval_millis=1000,
max_activities_per_second=None,
max_task_queue_activities_per_second=None,
graceful_shutdown_period_millis=0,
use_worker_versioning=False,
),
)
# Start worker
workflow_worker_task = asyncio.create_task(workflow_worker.run())

# Yield iterator
async def replay_iterator() -> AsyncIterator[WorkflowReplayResult]:
Expand Down Expand Up @@ -301,6 +307,7 @@ class ReplayerConfig(TypedDict, total=False):
interceptors: Sequence[Interceptor]
build_id: Optional[str]
identity: Optional[str]
workflow_failure_exception_types: Sequence[Type[BaseException]]
debug_mode: bool
runtime: Optional[temporalio.runtime.Runtime]
disable_safe_workflow_eviction: bool
Expand Down
19 changes: 19 additions & 0 deletions temporalio/worker/_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ def __init__(
max_activities_per_second: Optional[float] = None,
max_task_queue_activities_per_second: Optional[float] = None,
graceful_shutdown_timeout: timedelta = timedelta(),
workflow_failure_exception_types: Sequence[Type[BaseException]] = [],
shared_state_manager: Optional[SharedStateManager] = None,
debug_mode: bool = False,
disable_eager_activity_execution: bool = False,
Expand Down Expand Up @@ -167,6 +168,13 @@ def __init__(
graceful_shutdown_timeout: Amount of time after shutdown is called
that activities are given to complete before their tasks are
cancelled.
workflow_failure_exception_types: The types of exceptions that, if a
workflow-thrown exception extends, will cause the
workflow/update to fail instead of suspending the workflow via
task failure. These are applied in addition to ones set on the
``workflow.defn`` decorator. If ``Exception`` is set, it
effectively will fail a workflow/update in all user exception
cases. WARNING: This setting is experimental.
shared_state_manager: Used for obtaining cross-process friendly
synchronization primitives. This is required for non-async
activities where the activity_executor is not a
Expand Down Expand Up @@ -258,6 +266,7 @@ def __init__(
max_activities_per_second=max_activities_per_second,
max_task_queue_activities_per_second=max_task_queue_activities_per_second,
graceful_shutdown_timeout=graceful_shutdown_timeout,
workflow_failure_exception_types=workflow_failure_exception_types,
shared_state_manager=shared_state_manager,
debug_mode=debug_mode,
disable_eager_activity_execution=disable_eager_activity_execution,
Expand Down Expand Up @@ -309,6 +318,7 @@ def __init__(
unsandboxed_workflow_runner=unsandboxed_workflow_runner,
data_converter=client_config["data_converter"],
interceptors=interceptors,
workflow_failure_exception_types=workflow_failure_exception_types,
debug_mode=debug_mode,
disable_eager_activity_execution=disable_eager_activity_execution,
metric_meter=runtime.metric_meter,
Expand Down Expand Up @@ -366,6 +376,14 @@ def __init__(
1000 * graceful_shutdown_timeout.total_seconds()
),
use_worker_versioning=use_worker_versioning,
# Need to tell core whether we want to consider all
# non-determinism exceptions as workflow fail, and whether we do
# per workflow type
nondeterminism_as_workflow_fail=self._workflow_worker is not None
and self._workflow_worker.nondeterminism_as_workflow_fail(),
nondeterminism_as_workflow_fail_for_types=self._workflow_worker.nondeterminism_as_workflow_fail_for_types()
if self._workflow_worker
else set(),
),
)

Expand Down Expand Up @@ -605,6 +623,7 @@ class WorkerConfig(TypedDict, total=False):
max_activities_per_second: Optional[float]
max_task_queue_activities_per_second: Optional[float]
graceful_shutdown_timeout: timedelta
workflow_failure_exception_types: Sequence[Type[BaseException]]
shared_state_manager: Optional[SharedStateManager]
debug_mode: bool
disable_eager_activity_execution: bool
Expand Down
26 changes: 25 additions & 1 deletion temporalio/worker/_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import logging
import os
from datetime import timezone
from typing import Callable, Dict, List, MutableMapping, Optional, Sequence, Type
from typing import Callable, Dict, List, MutableMapping, Optional, Sequence, Set, Type

import temporalio.activity
import temporalio.api.common.v1
Expand Down Expand Up @@ -52,6 +52,7 @@ def __init__(
unsandboxed_workflow_runner: WorkflowRunner,
data_converter: temporalio.converter.DataConverter,
interceptors: Sequence[Interceptor],
workflow_failure_exception_types: Sequence[Type[BaseException]],
debug_mode: bool,
disable_eager_activity_execution: bool,
metric_meter: temporalio.common.MetricMeter,
Expand Down Expand Up @@ -89,6 +90,7 @@ def __init__(
self._extern_functions.update(
**_WorkflowExternFunctions(__temporal_get_metric_meter=lambda: metric_meter)
)
self._workflow_failure_exception_types = workflow_failure_exception_types
self._running_workflows: Dict[str, WorkflowInstance] = {}
self._disable_eager_activity_execution = disable_eager_activity_execution
self._on_eviction_hook = on_eviction_hook
Expand All @@ -104,6 +106,11 @@ def __init__(
# Keep track of workflows that could not be evicted
self._could_not_evict_count = 0

# Set the worker-level failure exception types into the runner
workflow_runner.set_worker_level_failure_exception_types(
workflow_failure_exception_types
)

# Validate and build workflow dict
self._workflows: Dict[str, temporalio.workflow._Definition] = {}
self._dynamic_workflow: Optional[temporalio.workflow._Definition] = None
Expand Down Expand Up @@ -389,8 +396,25 @@ def _create_workflow_instance(
randomness_seed=start.randomness_seed,
extern_functions=self._extern_functions,
disable_eager_activity_execution=self._disable_eager_activity_execution,
worker_level_failure_exception_types=self._workflow_failure_exception_types,
)
if defn.sandboxed:
return self._workflow_runner.create_instance(det)
else:
return self._unsandboxed_workflow_runner.create_instance(det)

def nondeterminism_as_workflow_fail(self) -> bool:
return any(
issubclass(temporalio.workflow.NondeterminismError, typ)
for typ in self._workflow_failure_exception_types
)

def nondeterminism_as_workflow_fail_for_types(self) -> Set[str]:
return set(
k
for k, v in self._workflows.items()
if any(
issubclass(temporalio.workflow.NondeterminismError, typ)
for typ in v.failure_exception_types
)
)
Loading
Loading