From f309a7d245f11d8f1e43082fa4ad6676f0b3b6c0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Far=C3=ADas=20Santana?= Date: Mon, 23 Oct 2023 15:30:16 +0200 Subject: [PATCH 1/2] fix(batch-export-backfills): Cancel ongoing backfills on schedule delete --- posthog/batch_exports/http.py | 6 ++++++ posthog/batch_exports/service.py | 12 ++++++++++++ posthog/models/__init__.py | 32 +++++++++++++++++--------------- 3 files changed, 35 insertions(+), 15 deletions(-) diff --git a/posthog/batch_exports/http.py b/posthog/batch_exports/http.py index aa71cc9060a13..c8aaf0d2bed5e 100644 --- a/posthog/batch_exports/http.py +++ b/posthog/batch_exports/http.py @@ -28,6 +28,7 @@ BatchExportServiceError, BatchExportServiceRPCError, backfill_export, + cancel_running_batch_export_backfill, delete_schedule, pause_batch_export, sync_batch_export, @@ -35,6 +36,7 @@ ) from posthog.models import ( BatchExport, + BatchExportBackfill, BatchExportDestination, BatchExportRun, Team, @@ -324,6 +326,10 @@ def perform_destroy(self, instance: BatchExport): delete_schedule(temporal, str(instance.pk)) instance.save() + for backfill in BatchExportBackfill.objects.filter(batch_export=instance): + if backfill.status == BatchExportBackfill.Status.RUNNING: + cancel_running_batch_export_backfill(temporal, str(backfill.pk)) + class BatchExportLogEntrySerializer(DataclassSerializer): class Meta: diff --git a/posthog/batch_exports/service.py b/posthog/batch_exports/service.py index 008096d5f50bc..858b48bfe25a0 100644 --- a/posthog/batch_exports/service.py +++ b/posthog/batch_exports/service.py @@ -252,6 +252,18 @@ async def describe_schedule(temporal: Client, schedule_id: str): return await handle.describe() +@async_to_sync +async def cancel_running_batch_export_backfill(temporal: Client, workflow_id: str) -> None: + """Delete a running BatchExportBackfill. + + A BatchExportBackfill represents a Temporal Workflow. When deleting the Temporal + Schedule that we are backfilling, we should also clean-up any Workflows that are + still running. + """ + handle = temporal.get_workflow_handle(workflow_id) + await handle.cancel() + + @dataclass class BackfillBatchExportInputs: """Inputs for the BackfillBatchExport Workflow.""" diff --git a/posthog/models/__init__.py b/posthog/models/__init__.py index b5f0586a349bb..053eaf4c5afc5 100644 --- a/posthog/models/__init__.py +++ b/posthog/models/__init__.py @@ -1,3 +1,17 @@ +from ..batch_exports.models import ( + BatchExport, + BatchExportBackfill, + BatchExportDestination, + BatchExportRun, +) +from ..session_recordings.models.session_recording import SessionRecording +from ..session_recordings.models.session_recording_playlist import ( + SessionRecordingPlaylist, +) +from ..session_recordings.models.session_recording_playlist_item import ( + SessionRecordingPlaylistItem, +) +from ..warehouse.models import DataWarehouseTable from .action import Action from .action_step import ActionStep from .activity_logging.activity_log import ActivityLog @@ -5,12 +19,6 @@ from .annotation import Annotation from .async_deletion import AsyncDeletion, DeletionType from .async_migration import AsyncMigration, AsyncMigrationError, MigrationStatus -from ..batch_exports.models import ( - BatchExport, - BatchExportDestination, - BatchExportRun, -) -from ..warehouse.models import DataWarehouseTable from .cohort import Cohort, CohortPeople from .dashboard import Dashboard from .dashboard_tile import DashboardTile, Text @@ -25,6 +33,7 @@ from .experiment import Experiment from .exported_asset import ExportedAsset from .feature_flag import FeatureFlag +from .feedback.survey import Survey from .filters import Filter, RetentionFilter from .group import Group from .group_type_mapping import GroupTypeMapping @@ -41,28 +50,20 @@ Plugin, PluginAttachment, PluginConfig, - PluginSourceFile, PluginLogEntry, + PluginSourceFile, ) from .prompt.prompt import Prompt, PromptSequence, UserPromptState from .property import Property from .property_definition import PropertyDefinition from .sharing_configuration import SharingConfiguration from .subscription import Subscription -from .feedback.survey import Survey from .tag import Tag from .tagged_item import TaggedItem from .team import Team from .uploaded_media import UploadedMedia from .user import User, UserManager from .user_scene_personalisation import UserScenePersonalisation -from ..session_recordings.models.session_recording import SessionRecording -from ..session_recordings.models.session_recording_playlist import ( - SessionRecordingPlaylist, -) -from ..session_recordings.models.session_recording_playlist_item import ( - SessionRecordingPlaylistItem, -) __all__ = [ "Action", @@ -73,6 +74,7 @@ "AsyncMigration", "AsyncMigrationError", "BatchExport", + "BatchExportBackfill", "BatchExportDestination", "BatchExportRun", "Cohort", From 1a9adb40505c86608f9341029a9fb5e0f22fe0d9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Far=C3=ADas=20Santana?= Date: Mon, 23 Oct 2023 20:48:32 +0200 Subject: [PATCH 2/2] test(batch-exports-backfills): Add test on delete --- posthog/api/test/batch_exports/test_delete.py | 104 ++++++++++++++++++ posthog/batch_exports/http.py | 6 +- posthog/batch_exports/models.py | 7 ++ posthog/batch_exports/service.py | 14 ++- 4 files changed, 123 insertions(+), 8 deletions(-) diff --git a/posthog/api/test/batch_exports/test_delete.py b/posthog/api/test/batch_exports/test_delete.py index 69a1e586f019e..20375cecbb768 100644 --- a/posthog/api/test/batch_exports/test_delete.py +++ b/posthog/api/test/batch_exports/test_delete.py @@ -1,10 +1,15 @@ +import asyncio + import pytest +import temporalio.client +from asgiref.sync import async_to_sync from django.test.client import Client as HttpClient from rest_framework import status from temporalio.service import RPCError from posthog.api.test.batch_exports.conftest import start_test_worker from posthog.api.test.batch_exports.operations import ( + backfill_batch_export_ok, create_batch_export_ok, delete_batch_export, delete_batch_export_ok, @@ -59,6 +64,105 @@ def test_delete_batch_export(client: HttpClient): describe_schedule(temporal, batch_export_id) +@async_to_sync +async def wait_for_workflow_executions( + temporal: temporalio.client.Client, query: str, timeout: int = 30, sleep: int = 1 +): + """Wait for Workflow Executions matching query.""" + workflows = [workflow async for workflow in temporal.list_workflows(query=query)] + + total = 0 + while not workflows: + total += sleep + + if total > timeout: + raise TimeoutError(f"No backfill Workflow Executions after {timeout} seconds") + + await asyncio.sleep(sleep) + workflows = [workflow async for workflow in temporal.list_workflows(query=query)] + + return workflows + + +@async_to_sync +async def wait_for_workflow_in_status( + temporal: temporalio.client.Client, + workflow_id: str, + status: temporalio.client.WorkflowExecutionStatus, + sleep: int = 1, + timeout: int = 30, +): + """Wait for a Workflow to be in a given status.""" + handle = temporal.get_workflow_handle(workflow_id) + workflow = await handle.describe() + + total = 0 + while workflow.status != status: + total += sleep + + if total > timeout: + break + + await asyncio.sleep(sleep) + workflow = await handle.describe() + + return workflow + + +@pytest.mark.django_db(transaction=True) +def test_delete_batch_export_cancels_backfills(client: HttpClient): + """Test deleting a BatchExport cancels ongoing BatchExportBackfill.""" + temporal = sync_connect() + + destination_data = { + "type": "S3", + "config": { + "bucket_name": "my-production-s3-bucket", + "region": "us-east-1", + "prefix": "posthog-events/", + "aws_access_key_id": "abc123", + "aws_secret_access_key": "secret", + }, + } + batch_export_data = { + "name": "my-production-s3-bucket-destination", + "destination": destination_data, + "interval": "hour", + } + + organization = create_organization("Test Org") + team = create_team(organization) + user = create_user("test@user.com", "Test User", organization) + client.force_login(user) + + with start_test_worker(temporal): + batch_export = create_batch_export_ok(client, team.pk, batch_export_data) + batch_export_id = batch_export["id"] + + start_at = "2023-10-23 00:00:00" + end_at = "2023-10-24 00:00:00" + batch_export_backfill = backfill_batch_export_ok(client, team.pk, batch_export_id, start_at, end_at) + + # In order for the backfill to be cancelable, it needs to be running and requesting backfills. + # We check this by waiting for executions scheduled by our BatchExport id to pop up. + _ = wait_for_workflow_executions(temporal, query=f'TemporalScheduledById="{batch_export_id}"') + + delete_batch_export_ok(client, team.pk, batch_export_id) + + response = get_batch_export(client, team.pk, batch_export_id) + assert response.status_code == status.HTTP_404_NOT_FOUND + + workflow = wait_for_workflow_in_status( + temporal, + workflow_id=batch_export_backfill["backfill_id"], + status=temporalio.client.WorkflowExecutionStatus.CANCELED, + ) + assert workflow.status == temporalio.client.WorkflowExecutionStatus.CANCELED + + with pytest.raises(RPCError): + describe_schedule(temporal, batch_export_id) + + def test_cannot_delete_export_of_other_organizations(client: HttpClient): temporal = sync_connect() diff --git a/posthog/batch_exports/http.py b/posthog/batch_exports/http.py index c8aaf0d2bed5e..0c906c50b08b6 100644 --- a/posthog/batch_exports/http.py +++ b/posthog/batch_exports/http.py @@ -266,9 +266,9 @@ def backfill(self, request: request.Request, *args, **kwargs) -> response.Respon batch_export = self.get_object() temporal = sync_connect() - backfill_export(temporal, str(batch_export.pk), team_id, start_at, end_at) + backfill_id = backfill_export(temporal, str(batch_export.pk), team_id, start_at, end_at) - return response.Response() + return response.Response({"backfill_id": backfill_id}) @action(methods=["POST"], detail=True) def pause(self, request: request.Request, *args, **kwargs) -> response.Response: @@ -328,7 +328,7 @@ def perform_destroy(self, instance: BatchExport): for backfill in BatchExportBackfill.objects.filter(batch_export=instance): if backfill.status == BatchExportBackfill.Status.RUNNING: - cancel_running_batch_export_backfill(temporal, str(backfill.pk)) + cancel_running_batch_export_backfill(temporal, backfill.workflow_id) class BatchExportLogEntrySerializer(DataclassSerializer): diff --git a/posthog/batch_exports/models.py b/posthog/batch_exports/models.py index dc86c2ce7286a..79a7928fd6b3c 100644 --- a/posthog/batch_exports/models.py +++ b/posthog/batch_exports/models.py @@ -289,3 +289,10 @@ class Status(models.TextChoices): auto_now=True, help_text="The timestamp at which this BatchExportBackfill was last updated.", ) + + @property + def workflow_id(self) -> str: + """Return the Workflow id that corresponds to this BatchExportBackfill model.""" + start_at = self.start_at.strftime("%Y-%m-%dT%H:%M:%S") + end_at = self.end_at.strftime("%Y-%m-%dT%H:%M:%S") + return f"{self.batch_export.id}-Backfill-{start_at}-{end_at}" diff --git a/posthog/batch_exports/service.py b/posthog/batch_exports/service.py index 858b48bfe25a0..114f9693adec7 100644 --- a/posthog/batch_exports/service.py +++ b/posthog/batch_exports/service.py @@ -260,7 +260,7 @@ async def cancel_running_batch_export_backfill(temporal: Client, workflow_id: st Schedule that we are backfilling, we should also clean-up any Workflows that are still running. """ - handle = temporal.get_workflow_handle(workflow_id) + handle = temporal.get_workflow_handle(workflow_id=workflow_id) await handle.cancel() @@ -282,7 +282,7 @@ def backfill_export( team_id: int, start_at: dt.datetime, end_at: dt.datetime, -) -> None: +) -> str: """Starts a backfill for given team and batch export covering given date range. Arguments: @@ -303,11 +303,12 @@ def backfill_export( start_at=start_at.isoformat(), end_at=end_at.isoformat(), ) - start_backfill_batch_export_workflow(temporal, inputs=inputs) + workflow_id = start_backfill_batch_export_workflow(temporal, inputs=inputs) + return workflow_id @async_to_sync -async def start_backfill_batch_export_workflow(temporal: Client, inputs: BackfillBatchExportInputs) -> None: +async def start_backfill_batch_export_workflow(temporal: Client, inputs: BackfillBatchExportInputs) -> str: """Async call to start a BackfillBatchExportWorkflow.""" handle = temporal.get_schedule_handle(inputs.batch_export_id) description = await handle.describe() @@ -316,13 +317,16 @@ async def start_backfill_batch_export_workflow(temporal: Client, inputs: Backfil # Adjust end_at to account for jitter if present. inputs.end_at = (dt.datetime.fromisoformat(inputs.end_at) + description.schedule.spec.jitter).isoformat() + workflow_id = f"{inputs.batch_export_id}-Backfill-{inputs.start_at}-{inputs.end_at}" await temporal.start_workflow( "backfill-batch-export", inputs, - id=f"{inputs.batch_export_id}-Backfill-{inputs.start_at}-{inputs.end_at}", + id=workflow_id, task_queue=settings.TEMPORAL_TASK_QUEUE, ) + return workflow_id + def create_batch_export_run( batch_export_id: UUID,