Skip to content

Commit

Permalink
Merge branch 'main' into update-updates
Browse files Browse the repository at this point in the history
  • Loading branch information
cretz authored May 20, 2024
2 parents 1a2acd5 + a52f25d commit 1b2d411
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 4 deletions.
6 changes: 3 additions & 3 deletions temporalio/bridge/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,8 @@ async def _apply_to_payloads(
if len(payloads) == 0:
return
new_payloads = await cb(payloads)
if new_payloads is payloads:
return
del payloads[:]
# TODO(cretz): Copy too expensive?
payloads.extend(new_payloads)
Expand All @@ -189,9 +191,7 @@ async def _apply_to_payload(
) -> None:
"""Apply API payload callback to payload."""
new_payload = (await cb([payload]))[0]
payload.metadata.clear()
payload.metadata.update(new_payload.metadata)
payload.data = new_payload.data
payload.CopyFrom(new_payload)


async def _decode_payloads(
Expand Down
11 changes: 10 additions & 1 deletion temporalio/worker/_workflow_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,9 @@ def _apply_remove_from_cache(
) -> None:
self._deleting = True
self._cancel_requested = True
# We consider eviction to be under replay so that certain code like
# logging that avoids replaying doesn't run during eviction either
self._is_replaying = True
# Cancel everything
for task in self._tasks:
task.cancel()
Expand Down Expand Up @@ -1514,7 +1517,9 @@ def _assert_not_read_only(
self, action_attempted: str, *, allow_during_delete: bool = False
) -> None:
if self._deleting and not allow_during_delete:
raise RuntimeError(f"Ignoring {action_attempted} while deleting")
raise _WorkflowBeingEvictedError(
f"Ignoring {action_attempted} while evicting workflow. This is not an error."
)
if self._read_only:
raise temporalio.workflow.ReadOnlyContextError(
f"While in read-only function, action attempted: {action_attempted}"
Expand Down Expand Up @@ -2614,3 +2619,7 @@ def set(
) -> None:
if not temporalio.workflow.unsafe.is_replaying():
self._underlying.set(value, additional_attributes)


class _WorkflowBeingEvictedError(BaseException):
pass
64 changes: 64 additions & 0 deletions tests/worker/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1486,6 +1486,24 @@ async def test_workflow_with_codec(client: Client, env: WorkflowEnvironment):
await test_workflow_update_handlers_happy(client, env)


class PassThroughCodec(PayloadCodec):
async def encode(self, payloads: Sequence[Payload]) -> List[Payload]:
return list(payloads)

async def decode(self, payloads: Sequence[Payload]) -> List[Payload]:
return list(payloads)


async def test_workflow_with_passthrough_codec(client: Client):
# Make client with this codec and run the activity test. This used to fail
# because there was a bug where the codec couldn't reuse the passed-in
# payloads.
config = client.config()
config["data_converter"] = DataConverter(payload_codec=PassThroughCodec())
client = Client(**config)
await test_workflow_simple_activity(client)


class CustomWorkflowRunner(WorkflowRunner):
def __init__(self) -> None:
super().__init__()
Expand Down Expand Up @@ -3412,6 +3430,52 @@ async def signal_count() -> int:
assert not hook_calls


@dataclass
class CapturedEvictionException:
is_replaying: bool
exception: BaseException


captured_eviction_exceptions: List[CapturedEvictionException] = []


@workflow.defn(sandboxed=False)
class EvictionCaptureExceptionWorkflow:
@workflow.run
async def run(self) -> None:
# Going to sleep so we can force eviction
try:
await asyncio.sleep(0.01)
except BaseException as err:
captured_eviction_exceptions.append(
CapturedEvictionException(
is_replaying=workflow.unsafe.is_replaying(), exception=err
)
)


async def test_workflow_eviction_exception(client: Client):
assert not captured_eviction_exceptions

# Run workflow with no cache (forces eviction every step)
async with new_worker(
client, EvictionCaptureExceptionWorkflow, max_cached_workflows=0
) as worker:
await client.execute_workflow(
EvictionCaptureExceptionWorkflow.run,
id=f"workflow-{uuid.uuid4()}",
task_queue=worker.task_queue,
)

# Confirm expected eviction replaying state and exception type
assert len(captured_eviction_exceptions) == 1
assert captured_eviction_exceptions[0].is_replaying
assert (
type(captured_eviction_exceptions[0].exception).__name__
== "_WorkflowBeingEvictedError"
)


@dataclass
class DynamicWorkflowValue:
some_string: str
Expand Down

0 comments on commit 1b2d411

Please sign in to comment.