Skip to content

Commit

Permalink
Add deterministic alternatives for asyncio.wait and asyncio.as_comple…
Browse files Browse the repository at this point in the history
…ted (#533)

Fixes #429
Fixes #518
  • Loading branch information
cretz authored Jun 4, 2024
1 parent afadc15 commit 365cead
Show file tree
Hide file tree
Showing 6 changed files with 423 additions and 26 deletions.
24 changes: 18 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ informal introduction to the features and their implementation.
- [Invoking Child Workflows](#invoking-child-workflows)
- [Timers](#timers)
- [Conditions](#conditions)
- [Asyncio and Cancellation](#asyncio-and-cancellation)
- [Asyncio and Determinism](#asyncio-and-determinism)
- [Asyncio Cancellation](#asyncio-cancellation)
- [Workflow Utilities](#workflow-utilities)
- [Exceptions](#exceptions)
- [External Workflows](#external-workflows)
Expand Down Expand Up @@ -550,8 +551,9 @@ Some things to note about the above workflow code:
* This workflow continually updates the queryable current greeting when signalled and can complete with the greeting on
a different signal
* Workflows are always classes and must have a single `@workflow.run` which is an `async def` function
* Workflow code must be deterministic. This means no threading, no randomness, no external calls to processes, no
network IO, and no global state mutation. All code must run in the implicit `asyncio` event loop and be deterministic.
* Workflow code must be deterministic. This means no `set` iteration, threading, no randomness, no external calls to
processes, no network IO, and no global state mutation. All code must run in the implicit `asyncio` event loop and be
deterministic. Also see the [Asyncio and Determinism](#asyncio-and-determinism) section later.
* `@activity.defn` is explained in a later section. For normal simple string concatenation, this would just be done in
the workflow. The activity is for demonstration purposes only.
* `workflow.execute_activity(create_greeting_activity, ...` is actually a typed signature, and MyPy will fail if the
Expand Down Expand Up @@ -678,16 +680,26 @@ Some things to note about the above code:
* A `timeout` can optionally be provided which will throw a `asyncio.TimeoutError` if reached (internally backed by
`asyncio.wait_for` which uses a timer)

#### Asyncio and Cancellation
#### Asyncio and Determinism

Workflows are backed by a custom [asyncio](https://docs.python.org/3/library/asyncio.html) event loop. This means many
of the common `asyncio` calls work as normal. Some asyncio features are disabled such as:
Workflows must be deterministic. Workflows are backed by a custom
[asyncio](https://docs.python.org/3/library/asyncio.html) event loop. This means many of the common `asyncio` calls work
as normal. Some asyncio features are disabled such as:

* Thread related calls such as `to_thread()`, `run_coroutine_threadsafe()`, `loop.run_in_executor()`, etc
* Calls that alter the event loop such as `loop.close()`, `loop.stop()`, `loop.run_forever()`,
`loop.set_task_factory()`, etc
* Calls that use anything external such as networking, subprocesses, disk IO, etc

Also, there are some `asyncio` utilities that internally use `set()` which can make them non-deterministic from one
worker to the next. Therefore the following `asyncio` functions have `workflow`-module alternatives that are
deterministic:

* `asyncio.as_completed()` - use `workflow.as_completed()`
* `asyncio.wait()` - use `workflow.wait()`

#### Asyncio Cancellation

Cancellation is done the same way as `asyncio`. Specifically, a task can be requested to be cancelled but does not
necessarily have to respect that cancellation immediately. This also means that `asyncio.shield()` can be used to
protect against cancellation. The following tasks, when cancelled, perform a Temporal cancellation:
Expand Down
127 changes: 109 additions & 18 deletions temporalio/worker/workflow_sandbox/_restrictions.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import operator
import random
import types
import warnings
from copy import copy, deepcopy
from dataclasses import dataclass
from typing import (
Expand Down Expand Up @@ -49,15 +50,25 @@ class RestrictedWorkflowAccessError(temporalio.workflow.NondeterminismError):
qualified_name: Fully qualified name of what was accessed.
"""

def __init__(self, qualified_name: str) -> None:
def __init__(
self, qualified_name: str, *, override_message: Optional[str] = None
) -> None:
"""Create restricted workflow access error."""
super().__init__(
override_message
or RestrictedWorkflowAccessError.default_message(qualified_name)
)
self.qualified_name = qualified_name

@staticmethod
def default_message(qualified_name: str) -> str:
"""Get default message for restricted access."""
return (
f"Cannot access {qualified_name} from inside a workflow. "
"If this is code from a module not used in a workflow or known to "
"only be used deterministically from a workflow, mark the import "
"as pass through."
)
self.qualified_name = qualified_name


@dataclass(frozen=True)
Expand Down Expand Up @@ -182,6 +193,20 @@ def nested_child(path: Sequence[str], child: SandboxMatcher) -> SandboxMatcher:
time.
"""

leaf_message: Optional[str] = None
"""
Override message to use in error/warning. Defaults to a common message.
This is only applicable to leafs, so this must only be set when
``match_self`` is ``True`` and this matcher is on ``children`` of a parent.
"""

leaf_warning: Optional[Type[Warning]] = None
"""
If set, issues a warning instead of raising an error. This is only
applicable to leafs, so this must only be set when ``match_self`` is
``True`` and this matcher is on ``children`` of a parent.
"""

all: ClassVar[SandboxMatcher]
"""Shortcut for an always-matched matcher."""

Expand All @@ -197,40 +222,67 @@ def nested_child(path: Sequence[str], child: SandboxMatcher) -> SandboxMatcher:
all_uses_runtime: ClassVar[SandboxMatcher]
"""Shortcut for a matcher that matches any :py:attr:`use` at runtime."""

def match_access(
def __post_init__(self):
"""Post initialization validations."""
if self.leaf_message and not self.match_self:
raise ValueError("Cannot set leaf_message without match_self")
if self.leaf_warning and not self.match_self:
raise ValueError("Cannot set leaf_warning without match_self")

def access_matcher(
self, context: RestrictionContext, *child_path: str, include_use: bool = False
) -> bool:
"""Perform a match check.
) -> Optional[SandboxMatcher]:
"""Perform a match check and return matcher.
Args:
context: Current restriction context.
child_path: Full path to the child being accessed.
include_use: Whether to include the :py:attr:`use` set in the check.
Returns:
``True`` if matched.
The matcher if matched.
"""
# We prefer to avoid recursion
matcher = self
for v in child_path:
# Does not match if this is runtime only and we're not runtime
if not context.is_runtime and matcher.only_runtime:
return False
return None

# Considered matched if self matches or access matches. Note, "use"
# does not match by default because we allow it to be accessed but
# not used.
if matcher.match_self or v in matcher.access or "*" in matcher.access:
return True
return matcher
if include_use and (v in matcher.use or "*" in matcher.use):
return True
return matcher
child_matcher = matcher.children.get(v) or matcher.children.get("*")
if not child_matcher:
return False
return None
matcher = child_matcher
if not context.is_runtime and matcher.only_runtime:
return False
return matcher.match_self
return None
if not matcher.match_self:
return None
return matcher

def match_access(
self, context: RestrictionContext, *child_path: str, include_use: bool = False
) -> bool:
"""Perform a match check.
Args:
context: Current restriction context.
child_path: Full path to the child being accessed.
include_use: Whether to include the :py:attr:`use` set in the check.
Returns:
``True`` if matched.
"""
return (
self.access_matcher(context, *child_path, include_use=include_use)
is not None
)

def child_matcher(self, *child_path: str) -> Optional[SandboxMatcher]:
"""Return a child matcher for the given path.
Expand Down Expand Up @@ -273,6 +325,10 @@ def __or__(self, other: SandboxMatcher) -> SandboxMatcher:
"""Combine this matcher with another."""
if self.only_runtime != other.only_runtime:
raise ValueError("Cannot combine only-runtime and non-only-runtime")
if self.leaf_message != other.leaf_message:
raise ValueError("Cannot combine different messages")
if self.leaf_warning != other.leaf_warning:
raise ValueError("Cannot combine different warning values")
if self.match_self or other.match_self:
return SandboxMatcher.all
new_children = dict(self.children) if self.children else {}
Expand All @@ -287,6 +343,8 @@ def __or__(self, other: SandboxMatcher) -> SandboxMatcher:
use=self.use | other.use,
children=new_children,
only_runtime=self.only_runtime,
leaf_message=self.leaf_message,
leaf_warning=self.leaf_warning,
)

def with_child_unrestricted(self, *child_path: str) -> SandboxMatcher:
Expand Down Expand Up @@ -457,6 +515,28 @@ def _public_callables(parent: Any, *, exclude: Set[str] = set()) -> Set[str]:
# rewriter
only_runtime=True,
),
"asyncio": SandboxMatcher(
children={
"as_completed": SandboxMatcher(
children={
"__call__": SandboxMatcher(
match_self=True,
leaf_warning=UserWarning,
leaf_message="asyncio.as_completed() is non-deterministic, use workflow.as_completed() instead",
)
},
),
"wait": SandboxMatcher(
children={
"__call__": SandboxMatcher(
match_self=True,
leaf_warning=UserWarning,
leaf_message="asyncio.wait() is non-deterministic, use workflow.wait() instead",
)
},
),
}
),
# TODO(cretz): Fix issues with class extensions on restricted proxy
# "argparse": SandboxMatcher.all_uses_runtime,
"bz2": SandboxMatcher(use={"open"}),
Expand Down Expand Up @@ -689,12 +769,23 @@ def from_proxy(v: _RestrictedProxy) -> _RestrictionState:
matcher: SandboxMatcher

def assert_child_not_restricted(self, name: str) -> None:
if (
self.matcher.match_access(self.context, name)
and not temporalio.workflow.unsafe.is_sandbox_unrestricted()
):
logger.warning("%s on %s restricted", name, self.name)
raise RestrictedWorkflowAccessError(f"{self.name}.{name}")
if temporalio.workflow.unsafe.is_sandbox_unrestricted():
return
matcher = self.matcher.access_matcher(self.context, name)
if not matcher:
return
logger.warning("%s on %s restricted", name, self.name)
# Issue warning instead of error if configured to do so
if matcher.leaf_warning:
warnings.warn(
matcher.leaf_message
or RestrictedWorkflowAccessError.default_message(f"{self.name}.{name}"),
matcher.leaf_warning,
)
else:
raise RestrictedWorkflowAccessError(
f"{self.name}.{name}", override_message=matcher.leaf_message
)

def set_on_proxy(self, v: _RestrictedProxy) -> None:
# To prevent recursion, must use __setattr__ on object to set the
Expand Down
Loading

0 comments on commit 365cead

Please sign in to comment.