Skip to content

Commit

Permalink
feat(data-warehouse): Add API for cancelling a batch export run (#25514)
Browse files Browse the repository at this point in the history
Co-authored-by: Tomás Farías Santana <[email protected]>
  • Loading branch information
rossgray and tomasfarias authored Oct 10, 2024
1 parent 7ee4028 commit 61d64b5
Show file tree
Hide file tree
Showing 5 changed files with 126 additions and 1 deletion.
11 changes: 11 additions & 0 deletions posthog/api/test/batch_exports/operations.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from django.test.client import Client as TestClient
from rest_framework import status

from posthog.models.utils import UUIDT


Expand Down Expand Up @@ -120,3 +121,13 @@ def get_batch_export_log_entries(client: TestClient, team_id: int, batch_export_

def get_batch_export_run_log_entries(client: TestClient, team_id: int, batch_export_id: str, run_id, **extra):
return client.get(f"/api/projects/{team_id}/batch_exports/{batch_export_id}/runs/{run_id}/logs", extra)


def cancel_batch_export_run(client: TestClient, team_id: int, batch_export_id: str, run_id):
return client.post(f"/api/projects/{team_id}/batch_exports/{batch_export_id}/runs/{run_id}/cancel")


def cancel_batch_export_run_ok(client: TestClient, team_id: int, batch_export_id: str, run_id):
response = client.post(f"/api/projects/{team_id}/batch_exports/{batch_export_id}/runs/{run_id}/cancel")
assert response.status_code == status.HTTP_200_OK, response.json()
return response.json()
86 changes: 86 additions & 0 deletions posthog/api/test/batch_exports/test_runs.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
import asyncio

import pytest
import temporalio.client
from asgiref.sync import async_to_sync
from django.test.client import Client as HttpClient
from rest_framework import status

from posthog.api.test.batch_exports.conftest import start_test_worker
from posthog.api.test.batch_exports.operations import (
backfill_batch_export_ok,
cancel_batch_export_run_ok,
create_batch_export_ok,
get_batch_export,
get_batch_export_runs,
get_batch_export_runs_ok,
)
from posthog.api.test.test_organization import create_organization
from posthog.api.test.test_team import create_team
Expand Down Expand Up @@ -143,3 +150,82 @@ def test_batch_exports_are_partitioned_by_team(client: HttpClient):

response = get_batch_export(client, team.pk, batch_export["id"])
assert response.status_code == status.HTTP_404_NOT_FOUND, response.json()


# TODO - this was in test_delete.py too so maybe extract it out into operations.py?
@async_to_sync
async def wait_for_workflow_executions(
temporal: temporalio.client.Client, query: str, timeout: int = 30, sleep: int = 1
):
"""Wait for Workflow Executions matching query."""
workflows = [workflow async for workflow in temporal.list_workflows(query=query)]

total = 0
while not workflows:
total += sleep

if total > timeout:
raise TimeoutError(f"No backfill Workflow Executions after {timeout} seconds")

await asyncio.sleep(sleep)
workflows = [workflow async for workflow in temporal.list_workflows(query=query)]

return workflows


@pytest.mark.django_db(transaction=True)
def test_cancelling_a_batch_export_run(client: HttpClient):
"""Test cancelling a BatchExportRun."""
temporal = sync_connect()

destination_data = {
"type": "S3",
"config": {
"bucket_name": "my-production-s3-bucket",
"region": "us-east-1",
"prefix": "posthog-events/",
"aws_access_key_id": "abc123",
"aws_secret_access_key": "secret",
},
}
batch_export_data = {
"name": "my-production-s3-bucket-destination",
"destination": destination_data,
"interval": "hour",
}

organization = create_organization("Test Org")
team = create_team(organization)
user = create_user("[email protected]", "Test User", organization)
client.force_login(user)

with start_test_worker(temporal):
batch_export = create_batch_export_ok(
client,
team.pk,
batch_export_data,
)
batch_export_id = batch_export["id"]

start_at = "2023-10-23T00:00:00+00:00"
end_at = "2023-10-24T00:00:00+00:00"
backfill_batch_export_ok(client, team.pk, batch_export_id, start_at, end_at)

# In order for a run to be cancelable we need a running workflow execution
_ = wait_for_workflow_executions(temporal, query=f'TemporalScheduledById="{batch_export_id}"')

data = get_batch_export_runs_ok(client, team.pk, batch_export_id)
assert len(data["results"]) == 1
run = data["results"][0]
assert run["status"] == "Running"

data = cancel_batch_export_run_ok(client, team.pk, batch_export_id, run["id"])
assert data["cancelled"] is True

data = get_batch_export_runs_ok(client, team.pk, batch_export_id)
assert len(data["results"]) == 1
run = data["results"][0]
assert run["status"] == "Cancelled"


# TODO - add a test to ensure we can't cancel a completed run?
15 changes: 14 additions & 1 deletion posthog/batch_exports/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from django.db import transaction
from django.utils.timezone import now
from rest_framework import filters, request, response, serializers, viewsets
from posthog.api.utils import action
from rest_framework.exceptions import (
NotAuthenticated,
NotFound,
Expand All @@ -17,6 +16,7 @@

from posthog.api.log_entries import LogEntryMixin
from posthog.api.routing import TeamAndOrgViewSetMixin
from posthog.api.utils import action
from posthog.batch_exports.models import BATCH_EXPORT_INTERVALS
from posthog.batch_exports.service import (
BatchExportIdError,
Expand All @@ -25,6 +25,7 @@
BatchExportServiceRPCError,
BatchExportWithNoEndNotAllowedError,
backfill_export,
cancel_running_batch_export_run,
disable_and_delete_export,
pause_batch_export,
sync_batch_export,
Expand Down Expand Up @@ -148,6 +149,18 @@ def retry(self, *args, **kwargs) -> response.Response:

return response.Response({"backfill_id": backfill_id})

@action(methods=["POST"], detail=True, required_scopes=["batch_export:write"])
def cancel(self, *args, **kwargs) -> response.Response:
"""Cancel a batch export run."""

batch_export_run: BatchExportRun = self.get_object()

temporal = sync_connect()
# TODO: check status of run beforehand
cancel_running_batch_export_run(temporal, batch_export_run)

return response.Response({"cancelled": True})


class BatchExportDestinationSerializer(serializers.ModelSerializer):
"""Serializer for an BatchExportDestination model."""
Expand Down
5 changes: 5 additions & 0 deletions posthog/batch_exports/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,11 @@ class Status(models.TextChoices):
null=True, help_text="The total count of records that should be exported in this BatchExportRun."
)

@property
def workflow_id(self) -> str:
"""Return the Workflow id that corresponds to this BatchExportRun model."""
return f"{self.batch_export.id}-{self.data_interval_end:%Y-%m-%dT%H:%M:%S}Z"


def fetch_batch_export_run_count(
*,
Expand Down
10 changes: 10 additions & 0 deletions posthog/batch_exports/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,16 @@ async def cancel_running_batch_export_backfill(temporal: Client, batch_export_ba
await batch_export_backfill.asave()


def cancel_running_batch_export_run(temporal: Client, batch_export_run: BatchExportRun) -> None:
"""Cancel a running BatchExportRun."""

handle = temporal.get_workflow_handle(workflow_id=batch_export_run.workflow_id)
async_to_sync(handle.cancel)()

batch_export_run.status = BatchExportRun.Status.CANCELLED
batch_export_run.save()


@dataclass
class BackfillBatchExportInputs:
"""Inputs for the BackfillBatchExport Workflow."""
Expand Down

0 comments on commit 61d64b5

Please sign in to comment.