Skip to content

Commit

Permalink
test(batch-exports-backfills): Add test on delete
Browse files Browse the repository at this point in the history
  • Loading branch information
tomasfarias committed Oct 23, 2023
1 parent ef4021b commit cac546d
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 8 deletions.
104 changes: 104 additions & 0 deletions posthog/api/test/batch_exports/test_delete.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
import asyncio

import pytest
import temporalio.client
from asgiref.sync import async_to_sync
from django.test.client import Client as HttpClient
from rest_framework import status
from temporalio.service import RPCError

from posthog.api.test.batch_exports.conftest import start_test_worker
from posthog.api.test.batch_exports.operations import (
backfill_batch_export_ok,
create_batch_export_ok,
delete_batch_export,
delete_batch_export_ok,
Expand Down Expand Up @@ -59,6 +64,105 @@ def test_delete_batch_export(client: HttpClient):
describe_schedule(temporal, batch_export_id)


@async_to_sync
async def wait_for_workflow_executions(
temporal: temporalio.client.Client, query: str, timeout: int = 30, sleep: int = 1
):
"""Wait for Workflow Executions matching query."""
workflows = [workflow async for workflow in temporal.list_workflows(query=query)]

total = 0
while not workflows:
total += sleep

if total > timeout:
raise TimeoutError(f"No backfill Workflow Executions after {timeout} seconds")

await asyncio.sleep(sleep)
workflows = [workflow async for workflow in temporal.list_workflows(query=query)]

return workflows


@async_to_sync
async def wait_for_workflow_in_status(
temporal: temporalio.client.Client,
workflow_id: str,
status: temporalio.client.WorkflowExecutionStatus,
sleep: int = 1,
timeout: int = 30,
):
"""Wait for a Workflow to be in a given status."""
handle = temporal.get_workflow_handle(workflow_id)
workflow = await handle.describe()

total = 0
while workflow.status != status:
total += sleep

if total > timeout:
break

await asyncio.sleep(sleep)
workflow = await handle.describe()

return workflow


@pytest.mark.django_db(transaction=True)
def test_delete_batch_export_cancels_backfills(client: HttpClient):
"""Test deleting a BatchExport cancels ongoing BatchExportBackfill."""
temporal = sync_connect()

destination_data = {
"type": "S3",
"config": {
"bucket_name": "my-production-s3-bucket",
"region": "us-east-1",
"prefix": "posthog-events/",
"aws_access_key_id": "abc123",
"aws_secret_access_key": "secret",
},
}
batch_export_data = {
"name": "my-production-s3-bucket-destination",
"destination": destination_data,
"interval": "hour",
}

organization = create_organization("Test Org")
team = create_team(organization)
user = create_user("[email protected]", "Test User", organization)
client.force_login(user)

with start_test_worker(temporal):
batch_export = create_batch_export_ok(client, team.pk, batch_export_data)
batch_export_id = batch_export["id"]

start_at = "2023-10-23 00:00:00"
end_at = "2023-10-24 00:00:00"
batch_export_backfill = backfill_batch_export_ok(client, team.pk, batch_export_id, start_at, end_at)

# In order for the backfill to be cancelable, it needs to be running and requesting backfills.
# We check this by waiting for executions scheduled by our BatchExport id to pop up.
_ = wait_for_workflow_executions(temporal, query=f'TemporalScheduledById="{batch_export_id}"')

delete_batch_export_ok(client, team.pk, batch_export_id)

response = get_batch_export(client, team.pk, batch_export_id)
assert response.status_code == status.HTTP_404_NOT_FOUND

workflow = wait_for_workflow_in_status(
temporal,
workflow_id=batch_export_backfill["backfill_id"],
status=temporalio.client.WorkflowExecutionStatus.CANCELED,
)
assert workflow.status == temporalio.client.WorkflowExecutionStatus.CANCELED

with pytest.raises(RPCError):
describe_schedule(temporal, batch_export_id)


def test_cannot_delete_export_of_other_organizations(client: HttpClient):
temporal = sync_connect()

Expand Down
6 changes: 3 additions & 3 deletions posthog/batch_exports/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,9 +254,9 @@ def backfill(self, request: request.Request, *args, **kwargs) -> response.Respon

batch_export = self.get_object()
temporal = sync_connect()
backfill_export(temporal, str(batch_export.pk), team_id, start_at, end_at)
backfill_id = backfill_export(temporal, str(batch_export.pk), team_id, start_at, end_at)

return response.Response()
return response.Response({"backfill_id": backfill_id})

@action(methods=["POST"], detail=True)
def pause(self, request: request.Request, *args, **kwargs) -> response.Response:
Expand Down Expand Up @@ -316,7 +316,7 @@ def perform_destroy(self, instance: BatchExport):

for backfill in BatchExportBackfill.objects.filter(batch_export=instance):
if backfill.status == BatchExportBackfill.Status.RUNNING:
cancel_running_batch_export_backfill(temporal, str(backfill.pk))
cancel_running_batch_export_backfill(temporal, backfill.workflow_id)


class BatchExportLogEntrySerializer(DataclassSerializer):
Expand Down
7 changes: 7 additions & 0 deletions posthog/batch_exports/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,3 +260,10 @@ class Status(models.TextChoices):
last_updated_at: models.DateTimeField = models.DateTimeField(
auto_now=True, help_text="The timestamp at which this BatchExportBackfill was last updated."
)

@property
def workflow_id(self) -> str:
"""Return the Workflow id that corresponds to this BatchExportBackfill model."""
start_at = self.start_at.strftime("%Y-%m-%dT%H:%M:%S")
end_at = self.end_at.strftime("%Y-%m-%dT%H:%M:%S")
return f"{self.batch_export.id}-Backfill-{start_at}-{end_at}"
14 changes: 9 additions & 5 deletions posthog/batch_exports/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ async def cancel_running_batch_export_backfill(temporal: Client, workflow_id: st
Schedule that we are backfilling, we should also clean-up any Workflows that are
still running.
"""
handle = temporal.get_workflow_handle(workflow_id)
handle = temporal.get_workflow_handle(workflow_id=workflow_id)
await handle.cancel()


Expand All @@ -279,7 +279,7 @@ def backfill_export(
team_id: int,
start_at: dt.datetime,
end_at: dt.datetime,
) -> None:
) -> str:
"""Starts a backfill for given team and batch export covering given date range.
Arguments:
Expand All @@ -300,11 +300,12 @@ def backfill_export(
start_at=start_at.isoformat(),
end_at=end_at.isoformat(),
)
start_backfill_batch_export_workflow(temporal, inputs=inputs)
workflow_id = start_backfill_batch_export_workflow(temporal, inputs=inputs)
return workflow_id


@async_to_sync
async def start_backfill_batch_export_workflow(temporal: Client, inputs: BackfillBatchExportInputs) -> None:
async def start_backfill_batch_export_workflow(temporal: Client, inputs: BackfillBatchExportInputs) -> str:
"""Async call to start a BackfillBatchExportWorkflow."""
handle = temporal.get_schedule_handle(inputs.batch_export_id)
description = await handle.describe()
Expand All @@ -313,13 +314,16 @@ async def start_backfill_batch_export_workflow(temporal: Client, inputs: Backfil
# Adjust end_at to account for jitter if present.
inputs.end_at = (dt.datetime.fromisoformat(inputs.end_at) + description.schedule.spec.jitter).isoformat()

workflow_id = f"{inputs.batch_export_id}-Backfill-{inputs.start_at}-{inputs.end_at}"
await temporal.start_workflow(
"backfill-batch-export",
inputs,
id=f"{inputs.batch_export_id}-Backfill-{inputs.start_at}-{inputs.end_at}",
id=workflow_id,
task_queue=settings.TEMPORAL_TASK_QUEUE,
)

return workflow_id


def create_batch_export_run(
batch_export_id: UUID,
Expand Down

0 comments on commit cac546d

Please sign in to comment.