diff --git a/temporalio/worker/_workflow_instance.py b/temporalio/worker/_workflow_instance.py index 8b13e389..0766c20e 100644 --- a/temporalio/worker/_workflow_instance.py +++ b/temporalio/worker/_workflow_instance.py @@ -175,6 +175,8 @@ def __init__(self, det: WorkflowInstanceDetails) -> None: self._time_ns = 0 self._cancel_requested = False self._current_history_length = 0 + self._current_history_size = 0 + self._continue_as_new_suggested = False # Lazily loaded self._memo: Optional[Mapping[str, Any]] = None # Handles which are ready to run on the next event loop iteration @@ -272,6 +274,8 @@ def activate( self._current_completion.successful.SetInParent() self._current_activation_error: Optional[Exception] = None self._current_history_length = act.history_length + self._current_history_size = act.history_size_bytes + self._continue_as_new_suggested = act.continue_as_new_suggested self._time_ns = act.timestamp.ToNanoseconds() self._is_replaying = act.is_replaying @@ -738,6 +742,9 @@ def workflow_extern_functions(self) -> Mapping[str, Callable]: def workflow_get_current_history_length(self) -> int: return self._current_history_length + def workflow_get_current_history_size(self) -> int: + return self._current_history_size + def workflow_get_external_workflow_handle( self, id: str, *, run_id: Optional[str] ) -> temporalio.workflow.ExternalWorkflowHandle[Any]: @@ -760,6 +767,9 @@ def workflow_get_signal_handler(self, name: Optional[str]) -> Optional[Callable] def workflow_info(self) -> temporalio.workflow.Info: return self._outbound.info() + def workflow_is_continue_as_new_suggested(self) -> bool: + return self._continue_as_new_suggested + def workflow_is_replaying(self) -> bool: return self._is_replaying diff --git a/temporalio/workflow.py b/temporalio/workflow.py index 7f4b0d07..c8fd5d53 100644 --- a/temporalio/workflow.py +++ b/temporalio/workflow.py @@ -336,11 +336,34 @@ def _logger_details(self) -> Mapping[str, Any]: def get_current_history_length(self) -> int: """Get the current number of events in history. + Note, this value may not be up to date if accessed inside a query. + Returns: Current number of events in history (up until the current task). """ return _Runtime.current().workflow_get_current_history_length() + def get_current_history_size(self) -> int: + """Get the current byte size of history. + + Note, this value may not be up to date if accessed inside a query. + + Returns: + Current byte-size of history (up until the current task). + """ + return _Runtime.current().workflow_get_current_history_size() + + def is_continue_as_new_suggested(self) -> bool: + """Get whether or not continue as new is suggested. + + Note, this value may not be up to date if accessed inside a query. + + Returns: + True if the server is configured to suggest continue as new and it + is suggested. + """ + return _Runtime.current().workflow_is_continue_as_new_suggested() + @dataclass(frozen=True) class ParentInfo: @@ -405,6 +428,10 @@ def workflow_extern_functions(self) -> Mapping[str, Callable]: def workflow_get_current_history_length(self) -> int: ... + @abstractmethod + def workflow_get_current_history_size(self) -> int: + ... + @abstractmethod def workflow_get_external_workflow_handle( self, id: str, *, run_id: Optional[str] @@ -423,6 +450,10 @@ def workflow_get_signal_handler(self, name: Optional[str]) -> Optional[Callable] def workflow_info(self) -> Info: ... + @abstractmethod + def workflow_is_continue_as_new_suggested(self) -> bool: + ... + @abstractmethod def workflow_is_replaying(self) -> bool: ... diff --git a/tests/conftest.py b/tests/conftest.py index 72ff329b..69ca2157 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -79,6 +79,8 @@ async def env(env_type: str) -> AsyncGenerator[WorkflowEnvironment, None]: dev_server_extra_args=[ "--dynamic-config-value", "system.forceSearchAttributesCacheRefreshOnRead=true", + "--dynamic-config-value", + f"limit.historyCount.suggestContinueAsNew={CONTINUE_AS_NEW_SUGGEST_HISTORY_COUNT}", ] ) elif env_type == "time-skipping": @@ -101,3 +103,11 @@ async def worker( worker = ExternalPythonWorker(env) yield worker await worker.close() + + +CONTINUE_AS_NEW_SUGGEST_HISTORY_COUNT = 50 + + +@pytest.fixture +def continue_as_new_suggest_history_count() -> int: + return CONTINUE_AS_NEW_SUGGEST_HISTORY_COUNT diff --git a/tests/worker/test_workflow.py b/tests/worker/test_workflow.py index 1fdd5396..9c08cea0 100644 --- a/tests/worker/test_workflow.py +++ b/tests/worker/test_workflow.py @@ -132,7 +132,6 @@ class InfoWorkflow: async def run(self) -> Dict: # Convert to JSON and back so it'll stringify un-JSON-able pieces ret = dataclasses.asdict(workflow.info()) - ret["current_history_length"] = workflow.info().get_current_history_length() return json.loads(json.dumps(ret, default=str)) @@ -158,7 +157,6 @@ async def test_workflow_info(client: Client, env: WorkflowEnvironment): ) assert info["attempt"] == 1 assert info["cron_schedule"] is None - assert info["current_history_length"] == 3 assert info["execution_timeout"] is None assert info["namespace"] == client.namespace assert info["retry_policy"] == json.loads( @@ -173,6 +171,68 @@ async def test_workflow_info(client: Client, env: WorkflowEnvironment): assert info["workflow_type"] == "InfoWorkflow" +@dataclass +class HistoryInfo: + history_length: int + history_size: int + continue_as_new_suggested: bool + + +@workflow.defn +class HistoryInfoWorkflow: + @workflow.run + async def run(self) -> None: + # Just wait forever + await workflow.wait_condition(lambda: False) + + @workflow.signal + async def bunch_of_events(self, count: int) -> None: + # Create a lot of one-day timers + for _ in range(count): + asyncio.create_task(asyncio.sleep(60 * 60 * 24)) + + @workflow.query + def get_history_info(self) -> HistoryInfo: + return HistoryInfo( + history_length=workflow.info().get_current_history_length(), + history_size=workflow.info().get_current_history_size(), + continue_as_new_suggested=workflow.info().is_continue_as_new_suggested(), + ) + + +async def test_workflow_history_info( + client: Client, env: WorkflowEnvironment, continue_as_new_suggest_history_count: int +): + if env.supports_time_skipping: + pytest.skip("Java test server does not support should continue as new") + async with new_worker(client, HistoryInfoWorkflow) as worker: + handle = await client.start_workflow( + HistoryInfoWorkflow.run, + id=f"workflow-{uuid.uuid4()}", + task_queue=worker.task_queue, + ) + # Issue query before anything else, which should mean only a history + # size of 3, at least 100 bytes of history, and no continue as new + # suggestion + orig_info = await handle.query(HistoryInfoWorkflow.get_history_info) + assert orig_info.history_length == 3 + assert orig_info.history_size > 100 + assert not orig_info.continue_as_new_suggested + + # Now send a lot of events + await handle.signal( + HistoryInfoWorkflow.bunch_of_events, continue_as_new_suggest_history_count + ) + # Send one more event to trigger the WFT update. We have to do this + # because just a query will have a stale representation of history + # counts, but signal forces a new WFT. + await handle.signal(HistoryInfoWorkflow.bunch_of_events, 1) + new_info = await handle.query(HistoryInfoWorkflow.get_history_info) + assert new_info.history_length > continue_as_new_suggest_history_count + assert new_info.history_size > orig_info.history_size + assert new_info.continue_as_new_suggested + + @workflow.defn class SignalAndQueryWorkflow: def __init__(self) -> None: