From 0a16034e79cbbc412243ff2976c2054c4e2ae1e9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Far=C3=ADas=20Santana?= Date: Thu, 19 Oct 2023 14:16:00 +0200 Subject: [PATCH] fix(batch-exports-backfill): Ensure at least one heartbeat is sent --- posthog/temporal/workflows/backfill_batch_export.py | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/posthog/temporal/workflows/backfill_batch_export.py b/posthog/temporal/workflows/backfill_batch_export.py index b7234cfb5745a3..fc6cf193f4390a 100644 --- a/posthog/temporal/workflows/backfill_batch_export.py +++ b/posthog/temporal/workflows/backfill_batch_export.py @@ -34,7 +34,7 @@ class HeartbeatDetails(typing.NamedTuple): wait_start_at: str def make_activity_heartbeat_while_running( - self, function_to_run: collections.abc.Callable, heartbeat_timeout: dt.timedelta, factor: int = 2 + self, function_to_run: collections.abc.Callable, heartbeat_timeout: dt.timedelta, factor: int = 120 ) -> collections.abc.Callable[..., collections.abc.Coroutine]: """Return a callable that returns a coroutine that hearbeats with these HeartbeatDetails. @@ -53,7 +53,7 @@ async def heartbeat_while_running(*args, **kwargs): heartbeat_task = asyncio.create_task(heartbeat()) try: - await function_to_run(*args, **kwargs) + return await function_to_run(*args, **kwargs) finally: heartbeat_task.cancel() await asyncio.wait([heartbeat_task]) @@ -210,11 +210,6 @@ async def wait_for_schedule_backfill_in_range( f'AND StartTime >= "{now.isoformat()}"' ) - workflows = [workflow async for workflow in client.list_workflows(query=query)] - - if workflows and check_workflow_executions_not_running(workflows) is True: - return - done = False while not done: await asyncio.sleep(wait_delay)