Skip to content

Commit

Permalink
Expose history size and continue-as-new-suggested (#361)
Browse files Browse the repository at this point in the history
  • Loading branch information
cretz authored Aug 2, 2023
1 parent 63f63ae commit 24fea4c
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 2 deletions.
10 changes: 10 additions & 0 deletions temporalio/worker/_workflow_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,8 @@ def __init__(self, det: WorkflowInstanceDetails) -> None:
self._time_ns = 0
self._cancel_requested = False
self._current_history_length = 0
self._current_history_size = 0
self._continue_as_new_suggested = False
# Lazily loaded
self._memo: Optional[Mapping[str, Any]] = None
# Handles which are ready to run on the next event loop iteration
Expand Down Expand Up @@ -272,6 +274,8 @@ def activate(
self._current_completion.successful.SetInParent()
self._current_activation_error: Optional[Exception] = None
self._current_history_length = act.history_length
self._current_history_size = act.history_size_bytes
self._continue_as_new_suggested = act.continue_as_new_suggested
self._time_ns = act.timestamp.ToNanoseconds()
self._is_replaying = act.is_replaying

Expand Down Expand Up @@ -738,6 +742,9 @@ def workflow_extern_functions(self) -> Mapping[str, Callable]:
def workflow_get_current_history_length(self) -> int:
return self._current_history_length

def workflow_get_current_history_size(self) -> int:
return self._current_history_size

def workflow_get_external_workflow_handle(
self, id: str, *, run_id: Optional[str]
) -> temporalio.workflow.ExternalWorkflowHandle[Any]:
Expand All @@ -760,6 +767,9 @@ def workflow_get_signal_handler(self, name: Optional[str]) -> Optional[Callable]
def workflow_info(self) -> temporalio.workflow.Info:
return self._outbound.info()

def workflow_is_continue_as_new_suggested(self) -> bool:
return self._continue_as_new_suggested

def workflow_is_replaying(self) -> bool:
return self._is_replaying

Expand Down
31 changes: 31 additions & 0 deletions temporalio/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -336,11 +336,34 @@ def _logger_details(self) -> Mapping[str, Any]:
def get_current_history_length(self) -> int:
"""Get the current number of events in history.
Note, this value may not be up to date if accessed inside a query.
Returns:
Current number of events in history (up until the current task).
"""
return _Runtime.current().workflow_get_current_history_length()

def get_current_history_size(self) -> int:
"""Get the current byte size of history.
Note, this value may not be up to date if accessed inside a query.
Returns:
Current byte-size of history (up until the current task).
"""
return _Runtime.current().workflow_get_current_history_size()

def is_continue_as_new_suggested(self) -> bool:
"""Get whether or not continue as new is suggested.
Note, this value may not be up to date if accessed inside a query.
Returns:
True if the server is configured to suggest continue as new and it
is suggested.
"""
return _Runtime.current().workflow_is_continue_as_new_suggested()


@dataclass(frozen=True)
class ParentInfo:
Expand Down Expand Up @@ -405,6 +428,10 @@ def workflow_extern_functions(self) -> Mapping[str, Callable]:
def workflow_get_current_history_length(self) -> int:
...

@abstractmethod
def workflow_get_current_history_size(self) -> int:
...

@abstractmethod
def workflow_get_external_workflow_handle(
self, id: str, *, run_id: Optional[str]
Expand All @@ -423,6 +450,10 @@ def workflow_get_signal_handler(self, name: Optional[str]) -> Optional[Callable]
def workflow_info(self) -> Info:
...

@abstractmethod
def workflow_is_continue_as_new_suggested(self) -> bool:
...

@abstractmethod
def workflow_is_replaying(self) -> bool:
...
Expand Down
10 changes: 10 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ async def env(env_type: str) -> AsyncGenerator[WorkflowEnvironment, None]:
dev_server_extra_args=[
"--dynamic-config-value",
"system.forceSearchAttributesCacheRefreshOnRead=true",
"--dynamic-config-value",
f"limit.historyCount.suggestContinueAsNew={CONTINUE_AS_NEW_SUGGEST_HISTORY_COUNT}",
]
)
elif env_type == "time-skipping":
Expand All @@ -101,3 +103,11 @@ async def worker(
worker = ExternalPythonWorker(env)
yield worker
await worker.close()


CONTINUE_AS_NEW_SUGGEST_HISTORY_COUNT = 50


@pytest.fixture
def continue_as_new_suggest_history_count() -> int:
return CONTINUE_AS_NEW_SUGGEST_HISTORY_COUNT
64 changes: 62 additions & 2 deletions tests/worker/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,6 @@ class InfoWorkflow:
async def run(self) -> Dict:
# Convert to JSON and back so it'll stringify un-JSON-able pieces
ret = dataclasses.asdict(workflow.info())
ret["current_history_length"] = workflow.info().get_current_history_length()
return json.loads(json.dumps(ret, default=str))


Expand All @@ -158,7 +157,6 @@ async def test_workflow_info(client: Client, env: WorkflowEnvironment):
)
assert info["attempt"] == 1
assert info["cron_schedule"] is None
assert info["current_history_length"] == 3
assert info["execution_timeout"] is None
assert info["namespace"] == client.namespace
assert info["retry_policy"] == json.loads(
Expand All @@ -173,6 +171,68 @@ async def test_workflow_info(client: Client, env: WorkflowEnvironment):
assert info["workflow_type"] == "InfoWorkflow"


@dataclass
class HistoryInfo:
history_length: int
history_size: int
continue_as_new_suggested: bool


@workflow.defn
class HistoryInfoWorkflow:
@workflow.run
async def run(self) -> None:
# Just wait forever
await workflow.wait_condition(lambda: False)

@workflow.signal
async def bunch_of_events(self, count: int) -> None:
# Create a lot of one-day timers
for _ in range(count):
asyncio.create_task(asyncio.sleep(60 * 60 * 24))

@workflow.query
def get_history_info(self) -> HistoryInfo:
return HistoryInfo(
history_length=workflow.info().get_current_history_length(),
history_size=workflow.info().get_current_history_size(),
continue_as_new_suggested=workflow.info().is_continue_as_new_suggested(),
)


async def test_workflow_history_info(
client: Client, env: WorkflowEnvironment, continue_as_new_suggest_history_count: int
):
if env.supports_time_skipping:
pytest.skip("Java test server does not support should continue as new")
async with new_worker(client, HistoryInfoWorkflow) as worker:
handle = await client.start_workflow(
HistoryInfoWorkflow.run,
id=f"workflow-{uuid.uuid4()}",
task_queue=worker.task_queue,
)
# Issue query before anything else, which should mean only a history
# size of 3, at least 100 bytes of history, and no continue as new
# suggestion
orig_info = await handle.query(HistoryInfoWorkflow.get_history_info)
assert orig_info.history_length == 3
assert orig_info.history_size > 100
assert not orig_info.continue_as_new_suggested

# Now send a lot of events
await handle.signal(
HistoryInfoWorkflow.bunch_of_events, continue_as_new_suggest_history_count
)
# Send one more event to trigger the WFT update. We have to do this
# because just a query will have a stale representation of history
# counts, but signal forces a new WFT.
await handle.signal(HistoryInfoWorkflow.bunch_of_events, 1)
new_info = await handle.query(HistoryInfoWorkflow.get_history_info)
assert new_info.history_length > continue_as_new_suggest_history_count
assert new_info.history_size > orig_info.history_size
assert new_info.continue_as_new_suggested


@workflow.defn
class SignalAndQueryWorkflow:
def __init__(self) -> None:
Expand Down

0 comments on commit 24fea4c

Please sign in to comment.