Skip to content

Commit

Permalink
fix: Async wait for backfills in test
Browse files Browse the repository at this point in the history
  • Loading branch information
tomasfarias committed Oct 27, 2023
1 parent c95678e commit 6eefd44
Showing 1 changed file with 19 additions and 8 deletions.
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import asyncio
import collections
import datetime as dt
import json
import typing

import pytest
import temporalio.client
from asgiref.sync import async_to_sync
from django.conf import settings
from django.core.management import call_command
Expand Down Expand Up @@ -459,12 +461,21 @@ def test_create_batch_export_from_app_with_disabled_plugin(


@async_to_sync
async def list_workflows(temporal, schedule_id: str):
"""List Workflows scheduled by given Schedule."""
workflows = []
async def wait_for_workflow_executions(
temporal: temporalio.client.Client, query: str, timeout: int = 30, sleep: int = 1
):
"""Wait for Workflow Executions matching query."""
workflows = [workflow async for workflow in temporal.list_workflows(query=query)]

total = 0
while not workflows:
total += sleep

if total > timeout:
raise TimeoutError(f"No backfill Workflow Executions after {timeout} seconds")

while len(workflows) == 0:
workflows = [workflow async for workflow in temporal.list_workflows(f'TemporalScheduledById="{schedule_id}"')]
await asyncio.sleep(sleep)
workflows = [workflow async for workflow in temporal.list_workflows(query=query)]

return workflows

Expand Down Expand Up @@ -492,9 +503,9 @@ def test_create_batch_export_from_app_with_backfill(interval, plugin_config):
output = call_command("create_batch_export_from_app", *args)

batch_export_data = json.loads(output)
workflows = list_workflows(temporal, str(batch_export_data["id"]))
batch_export_id = str(batch_export_data["id"])
workflows = wait_for_workflow_executions(temporal, query=f'TemporalScheduledById="{batch_export_id}"')

# Backfills are triggered by a Workflow now, so there should be two running.
assert len(workflows) == 2
assert len(workflows) == 1
workflow_execution = workflows[0]
assert workflow_execution.workflow_type == f"{export_type.lower()}-export"

0 comments on commit 6eefd44

Please sign in to comment.