diff --git a/posthog/management/commands/test/test_create_batch_export_from_app.py b/posthog/management/commands/test/test_create_batch_export_from_app.py index 54e6a43a0b3b0..b6832221d2d71 100644 --- a/posthog/management/commands/test/test_create_batch_export_from_app.py +++ b/posthog/management/commands/test/test_create_batch_export_from_app.py @@ -1,9 +1,11 @@ +import asyncio import collections import datetime as dt import json import typing import pytest +import temporalio.client from asgiref.sync import async_to_sync from django.conf import settings from django.core.management import call_command @@ -459,12 +461,21 @@ def test_create_batch_export_from_app_with_disabled_plugin( @async_to_sync -async def list_workflows(temporal, schedule_id: str): - """List Workflows scheduled by given Schedule.""" - workflows = [] +async def wait_for_workflow_executions( + temporal: temporalio.client.Client, query: str, timeout: int = 30, sleep: int = 1 +): + """Wait for Workflow Executions matching query.""" + workflows = [workflow async for workflow in temporal.list_workflows(query=query)] + + total = 0 + while not workflows: + total += sleep + + if total > timeout: + raise TimeoutError(f"No backfill Workflow Executions after {timeout} seconds") - while len(workflows) == 0: - workflows = [workflow async for workflow in temporal.list_workflows(f'TemporalScheduledById="{schedule_id}"')] + await asyncio.sleep(sleep) + workflows = [workflow async for workflow in temporal.list_workflows(query=query)] return workflows @@ -492,9 +503,9 @@ def test_create_batch_export_from_app_with_backfill(interval, plugin_config): output = call_command("create_batch_export_from_app", *args) batch_export_data = json.loads(output) - workflows = list_workflows(temporal, str(batch_export_data["id"])) + batch_export_id = str(batch_export_data["id"]) + workflows = wait_for_workflow_executions(temporal, query=f'TemporalScheduledById="{batch_export_id}"') - # Backfills are triggered by a Workflow now, so there should be two running. - assert len(workflows) == 2 + assert len(workflows) == 1 workflow_execution = workflows[0] assert workflow_execution.workflow_type == f"{export_type.lower()}-export"