From 61d64b54ca40dfbdf3c0994dea8cc6bda8cebcdf Mon Sep 17 00:00:00 2001 From: Ross Date: Thu, 10 Oct 2024 17:13:08 +0100 Subject: [PATCH] feat(data-warehouse): Add API for cancelling a batch export run (#25514) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Tomás Farías Santana --- posthog/api/test/batch_exports/operations.py | 11 +++ posthog/api/test/batch_exports/test_runs.py | 86 ++++++++++++++++++++ posthog/batch_exports/http.py | 15 +++- posthog/batch_exports/models.py | 5 ++ posthog/batch_exports/service.py | 10 +++ 5 files changed, 126 insertions(+), 1 deletion(-) diff --git a/posthog/api/test/batch_exports/operations.py b/posthog/api/test/batch_exports/operations.py index 20f7d2761e2bf..625a22f5c4304 100644 --- a/posthog/api/test/batch_exports/operations.py +++ b/posthog/api/test/batch_exports/operations.py @@ -1,5 +1,6 @@ from django.test.client import Client as TestClient from rest_framework import status + from posthog.models.utils import UUIDT @@ -120,3 +121,13 @@ def get_batch_export_log_entries(client: TestClient, team_id: int, batch_export_ def get_batch_export_run_log_entries(client: TestClient, team_id: int, batch_export_id: str, run_id, **extra): return client.get(f"/api/projects/{team_id}/batch_exports/{batch_export_id}/runs/{run_id}/logs", extra) + + +def cancel_batch_export_run(client: TestClient, team_id: int, batch_export_id: str, run_id): + return client.post(f"/api/projects/{team_id}/batch_exports/{batch_export_id}/runs/{run_id}/cancel") + + +def cancel_batch_export_run_ok(client: TestClient, team_id: int, batch_export_id: str, run_id): + response = client.post(f"/api/projects/{team_id}/batch_exports/{batch_export_id}/runs/{run_id}/cancel") + assert response.status_code == status.HTTP_200_OK, response.json() + return response.json() diff --git a/posthog/api/test/batch_exports/test_runs.py b/posthog/api/test/batch_exports/test_runs.py index feac9b5c63e18..f8ec68a623641 100644 --- a/posthog/api/test/batch_exports/test_runs.py +++ b/posthog/api/test/batch_exports/test_runs.py @@ -1,12 +1,19 @@ +import asyncio + import pytest +import temporalio.client +from asgiref.sync import async_to_sync from django.test.client import Client as HttpClient from rest_framework import status from posthog.api.test.batch_exports.conftest import start_test_worker from posthog.api.test.batch_exports.operations import ( + backfill_batch_export_ok, + cancel_batch_export_run_ok, create_batch_export_ok, get_batch_export, get_batch_export_runs, + get_batch_export_runs_ok, ) from posthog.api.test.test_organization import create_organization from posthog.api.test.test_team import create_team @@ -143,3 +150,82 @@ def test_batch_exports_are_partitioned_by_team(client: HttpClient): response = get_batch_export(client, team.pk, batch_export["id"]) assert response.status_code == status.HTTP_404_NOT_FOUND, response.json() + + +# TODO - this was in test_delete.py too so maybe extract it out into operations.py? +@async_to_sync +async def wait_for_workflow_executions( + temporal: temporalio.client.Client, query: str, timeout: int = 30, sleep: int = 1 +): + """Wait for Workflow Executions matching query.""" + workflows = [workflow async for workflow in temporal.list_workflows(query=query)] + + total = 0 + while not workflows: + total += sleep + + if total > timeout: + raise TimeoutError(f"No backfill Workflow Executions after {timeout} seconds") + + await asyncio.sleep(sleep) + workflows = [workflow async for workflow in temporal.list_workflows(query=query)] + + return workflows + + +@pytest.mark.django_db(transaction=True) +def test_cancelling_a_batch_export_run(client: HttpClient): + """Test cancelling a BatchExportRun.""" + temporal = sync_connect() + + destination_data = { + "type": "S3", + "config": { + "bucket_name": "my-production-s3-bucket", + "region": "us-east-1", + "prefix": "posthog-events/", + "aws_access_key_id": "abc123", + "aws_secret_access_key": "secret", + }, + } + batch_export_data = { + "name": "my-production-s3-bucket-destination", + "destination": destination_data, + "interval": "hour", + } + + organization = create_organization("Test Org") + team = create_team(organization) + user = create_user("test@user.com", "Test User", organization) + client.force_login(user) + + with start_test_worker(temporal): + batch_export = create_batch_export_ok( + client, + team.pk, + batch_export_data, + ) + batch_export_id = batch_export["id"] + + start_at = "2023-10-23T00:00:00+00:00" + end_at = "2023-10-24T00:00:00+00:00" + backfill_batch_export_ok(client, team.pk, batch_export_id, start_at, end_at) + + # In order for a run to be cancelable we need a running workflow execution + _ = wait_for_workflow_executions(temporal, query=f'TemporalScheduledById="{batch_export_id}"') + + data = get_batch_export_runs_ok(client, team.pk, batch_export_id) + assert len(data["results"]) == 1 + run = data["results"][0] + assert run["status"] == "Running" + + data = cancel_batch_export_run_ok(client, team.pk, batch_export_id, run["id"]) + assert data["cancelled"] is True + + data = get_batch_export_runs_ok(client, team.pk, batch_export_id) + assert len(data["results"]) == 1 + run = data["results"][0] + assert run["status"] == "Cancelled" + + +# TODO - add a test to ensure we can't cancel a completed run? diff --git a/posthog/batch_exports/http.py b/posthog/batch_exports/http.py index bd56a092771e3..8698e4c450a29 100644 --- a/posthog/batch_exports/http.py +++ b/posthog/batch_exports/http.py @@ -6,7 +6,6 @@ from django.db import transaction from django.utils.timezone import now from rest_framework import filters, request, response, serializers, viewsets -from posthog.api.utils import action from rest_framework.exceptions import ( NotAuthenticated, NotFound, @@ -17,6 +16,7 @@ from posthog.api.log_entries import LogEntryMixin from posthog.api.routing import TeamAndOrgViewSetMixin +from posthog.api.utils import action from posthog.batch_exports.models import BATCH_EXPORT_INTERVALS from posthog.batch_exports.service import ( BatchExportIdError, @@ -25,6 +25,7 @@ BatchExportServiceRPCError, BatchExportWithNoEndNotAllowedError, backfill_export, + cancel_running_batch_export_run, disable_and_delete_export, pause_batch_export, sync_batch_export, @@ -148,6 +149,18 @@ def retry(self, *args, **kwargs) -> response.Response: return response.Response({"backfill_id": backfill_id}) + @action(methods=["POST"], detail=True, required_scopes=["batch_export:write"]) + def cancel(self, *args, **kwargs) -> response.Response: + """Cancel a batch export run.""" + + batch_export_run: BatchExportRun = self.get_object() + + temporal = sync_connect() + # TODO: check status of run beforehand + cancel_running_batch_export_run(temporal, batch_export_run) + + return response.Response({"cancelled": True}) + class BatchExportDestinationSerializer(serializers.ModelSerializer): """Serializer for an BatchExportDestination model.""" diff --git a/posthog/batch_exports/models.py b/posthog/batch_exports/models.py index e319d73db30ab..da3a4576b6682 100644 --- a/posthog/batch_exports/models.py +++ b/posthog/batch_exports/models.py @@ -107,6 +107,11 @@ class Status(models.TextChoices): null=True, help_text="The total count of records that should be exported in this BatchExportRun." ) + @property + def workflow_id(self) -> str: + """Return the Workflow id that corresponds to this BatchExportRun model.""" + return f"{self.batch_export.id}-{self.data_interval_end:%Y-%m-%dT%H:%M:%S}Z" + def fetch_batch_export_run_count( *, diff --git a/posthog/batch_exports/service.py b/posthog/batch_exports/service.py index a38886769362b..015dd01c6b4fa 100644 --- a/posthog/batch_exports/service.py +++ b/posthog/batch_exports/service.py @@ -409,6 +409,16 @@ async def cancel_running_batch_export_backfill(temporal: Client, batch_export_ba await batch_export_backfill.asave() +def cancel_running_batch_export_run(temporal: Client, batch_export_run: BatchExportRun) -> None: + """Cancel a running BatchExportRun.""" + + handle = temporal.get_workflow_handle(workflow_id=batch_export_run.workflow_id) + async_to_sync(handle.cancel)() + + batch_export_run.status = BatchExportRun.Status.CANCELLED + batch_export_run.save() + + @dataclass class BackfillBatchExportInputs: """Inputs for the BackfillBatchExport Workflow."""