diff --git a/temporalio/bridge/worker.py b/temporalio/bridge/worker.py index fd44d1b1..2ea59579 100644 --- a/temporalio/bridge/worker.py +++ b/temporalio/bridge/worker.py @@ -175,6 +175,8 @@ async def _apply_to_payloads( if len(payloads) == 0: return new_payloads = await cb(payloads) + if new_payloads is payloads: + return del payloads[:] # TODO(cretz): Copy too expensive? payloads.extend(new_payloads) @@ -189,9 +191,7 @@ async def _apply_to_payload( ) -> None: """Apply API payload callback to payload.""" new_payload = (await cb([payload]))[0] - payload.metadata.clear() - payload.metadata.update(new_payload.metadata) - payload.data = new_payload.data + payload.CopyFrom(new_payload) async def _decode_payloads( diff --git a/temporalio/worker/_workflow_instance.py b/temporalio/worker/_workflow_instance.py index 81d66b7f..34c997ad 100644 --- a/temporalio/worker/_workflow_instance.py +++ b/temporalio/worker/_workflow_instance.py @@ -637,6 +637,9 @@ def _apply_remove_from_cache( ) -> None: self._deleting = True self._cancel_requested = True + # We consider eviction to be under replay so that certain code like + # logging that avoids replaying doesn't run during eviction either + self._is_replaying = True # Cancel everything for task in self._tasks: task.cancel() @@ -1514,7 +1517,9 @@ def _assert_not_read_only( self, action_attempted: str, *, allow_during_delete: bool = False ) -> None: if self._deleting and not allow_during_delete: - raise RuntimeError(f"Ignoring {action_attempted} while deleting") + raise _WorkflowBeingEvictedError( + f"Ignoring {action_attempted} while evicting workflow. This is not an error." + ) if self._read_only: raise temporalio.workflow.ReadOnlyContextError( f"While in read-only function, action attempted: {action_attempted}" @@ -2614,3 +2619,7 @@ def set( ) -> None: if not temporalio.workflow.unsafe.is_replaying(): self._underlying.set(value, additional_attributes) + + +class _WorkflowBeingEvictedError(BaseException): + pass diff --git a/tests/worker/test_workflow.py b/tests/worker/test_workflow.py index 5056ad5f..20e8685b 100644 --- a/tests/worker/test_workflow.py +++ b/tests/worker/test_workflow.py @@ -1486,6 +1486,24 @@ async def test_workflow_with_codec(client: Client, env: WorkflowEnvironment): await test_workflow_update_handlers_happy(client, env) +class PassThroughCodec(PayloadCodec): + async def encode(self, payloads: Sequence[Payload]) -> List[Payload]: + return list(payloads) + + async def decode(self, payloads: Sequence[Payload]) -> List[Payload]: + return list(payloads) + + +async def test_workflow_with_passthrough_codec(client: Client): + # Make client with this codec and run the activity test. This used to fail + # because there was a bug where the codec couldn't reuse the passed-in + # payloads. + config = client.config() + config["data_converter"] = DataConverter(payload_codec=PassThroughCodec()) + client = Client(**config) + await test_workflow_simple_activity(client) + + class CustomWorkflowRunner(WorkflowRunner): def __init__(self) -> None: super().__init__() @@ -3412,6 +3430,52 @@ async def signal_count() -> int: assert not hook_calls +@dataclass +class CapturedEvictionException: + is_replaying: bool + exception: BaseException + + +captured_eviction_exceptions: List[CapturedEvictionException] = [] + + +@workflow.defn(sandboxed=False) +class EvictionCaptureExceptionWorkflow: + @workflow.run + async def run(self) -> None: + # Going to sleep so we can force eviction + try: + await asyncio.sleep(0.01) + except BaseException as err: + captured_eviction_exceptions.append( + CapturedEvictionException( + is_replaying=workflow.unsafe.is_replaying(), exception=err + ) + ) + + +async def test_workflow_eviction_exception(client: Client): + assert not captured_eviction_exceptions + + # Run workflow with no cache (forces eviction every step) + async with new_worker( + client, EvictionCaptureExceptionWorkflow, max_cached_workflows=0 + ) as worker: + await client.execute_workflow( + EvictionCaptureExceptionWorkflow.run, + id=f"workflow-{uuid.uuid4()}", + task_queue=worker.task_queue, + ) + + # Confirm expected eviction replaying state and exception type + assert len(captured_eviction_exceptions) == 1 + assert captured_eviction_exceptions[0].is_replaying + assert ( + type(captured_eviction_exceptions[0].exception).__name__ + == "_WorkflowBeingEvictedError" + ) + + @dataclass class DynamicWorkflowValue: some_string: str