Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(batch-export-backfills): Cancel ongoing backfills on schedule delete #18136

Merged
merged 2 commits into from
Oct 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 104 additions & 0 deletions posthog/api/test/batch_exports/test_delete.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
import asyncio

import pytest
import temporalio.client
from asgiref.sync import async_to_sync
from django.test.client import Client as HttpClient
from rest_framework import status
from temporalio.service import RPCError

from posthog.api.test.batch_exports.conftest import start_test_worker
from posthog.api.test.batch_exports.operations import (
backfill_batch_export_ok,
create_batch_export_ok,
delete_batch_export,
delete_batch_export_ok,
Expand Down Expand Up @@ -59,6 +64,105 @@ def test_delete_batch_export(client: HttpClient):
describe_schedule(temporal, batch_export_id)


@async_to_sync
async def wait_for_workflow_executions(
temporal: temporalio.client.Client, query: str, timeout: int = 30, sleep: int = 1
):
"""Wait for Workflow Executions matching query."""
workflows = [workflow async for workflow in temporal.list_workflows(query=query)]

total = 0
while not workflows:
total += sleep

if total > timeout:
raise TimeoutError(f"No backfill Workflow Executions after {timeout} seconds")

await asyncio.sleep(sleep)
workflows = [workflow async for workflow in temporal.list_workflows(query=query)]

return workflows


@async_to_sync
async def wait_for_workflow_in_status(
temporal: temporalio.client.Client,
workflow_id: str,
status: temporalio.client.WorkflowExecutionStatus,
sleep: int = 1,
timeout: int = 30,
):
"""Wait for a Workflow to be in a given status."""
handle = temporal.get_workflow_handle(workflow_id)
workflow = await handle.describe()
tomasfarias marked this conversation as resolved.
Show resolved Hide resolved

total = 0
while workflow.status != status:
total += sleep

if total > timeout:
break

await asyncio.sleep(sleep)
workflow = await handle.describe()

return workflow


@pytest.mark.django_db(transaction=True)
def test_delete_batch_export_cancels_backfills(client: HttpClient):
"""Test deleting a BatchExport cancels ongoing BatchExportBackfill."""
temporal = sync_connect()

destination_data = {
"type": "S3",
"config": {
"bucket_name": "my-production-s3-bucket",
"region": "us-east-1",
"prefix": "posthog-events/",
"aws_access_key_id": "abc123",
"aws_secret_access_key": "secret",
},
}
batch_export_data = {
"name": "my-production-s3-bucket-destination",
"destination": destination_data,
"interval": "hour",
}

organization = create_organization("Test Org")
team = create_team(organization)
user = create_user("[email protected]", "Test User", organization)
client.force_login(user)

with start_test_worker(temporal):
batch_export = create_batch_export_ok(client, team.pk, batch_export_data)
batch_export_id = batch_export["id"]

start_at = "2023-10-23 00:00:00"
end_at = "2023-10-24 00:00:00"
batch_export_backfill = backfill_batch_export_ok(client, team.pk, batch_export_id, start_at, end_at)

# In order for the backfill to be cancelable, it needs to be running and requesting backfills.
# We check this by waiting for executions scheduled by our BatchExport id to pop up.
_ = wait_for_workflow_executions(temporal, query=f'TemporalScheduledById="{batch_export_id}"')

delete_batch_export_ok(client, team.pk, batch_export_id)

response = get_batch_export(client, team.pk, batch_export_id)
assert response.status_code == status.HTTP_404_NOT_FOUND

workflow = wait_for_workflow_in_status(
temporal,
workflow_id=batch_export_backfill["backfill_id"],
status=temporalio.client.WorkflowExecutionStatus.CANCELED,
)
assert workflow.status == temporalio.client.WorkflowExecutionStatus.CANCELED

with pytest.raises(RPCError):
describe_schedule(temporal, batch_export_id)


def test_cannot_delete_export_of_other_organizations(client: HttpClient):
temporal = sync_connect()

Expand Down
10 changes: 8 additions & 2 deletions posthog/batch_exports/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,15 @@
BatchExportServiceError,
BatchExportServiceRPCError,
backfill_export,
cancel_running_batch_export_backfill,
delete_schedule,
pause_batch_export,
sync_batch_export,
unpause_batch_export,
)
from posthog.models import (
BatchExport,
BatchExportBackfill,
BatchExportDestination,
BatchExportRun,
Team,
Expand Down Expand Up @@ -264,9 +266,9 @@ def backfill(self, request: request.Request, *args, **kwargs) -> response.Respon

batch_export = self.get_object()
temporal = sync_connect()
backfill_export(temporal, str(batch_export.pk), team_id, start_at, end_at)
backfill_id = backfill_export(temporal, str(batch_export.pk), team_id, start_at, end_at)

return response.Response()
return response.Response({"backfill_id": backfill_id})

@action(methods=["POST"], detail=True)
def pause(self, request: request.Request, *args, **kwargs) -> response.Response:
Expand Down Expand Up @@ -324,6 +326,10 @@ def perform_destroy(self, instance: BatchExport):
delete_schedule(temporal, str(instance.pk))
instance.save()

for backfill in BatchExportBackfill.objects.filter(batch_export=instance):
if backfill.status == BatchExportBackfill.Status.RUNNING:
cancel_running_batch_export_backfill(temporal, backfill.workflow_id)


class BatchExportLogEntrySerializer(DataclassSerializer):
class Meta:
Expand Down
7 changes: 7 additions & 0 deletions posthog/batch_exports/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,3 +289,10 @@ class Status(models.TextChoices):
auto_now=True,
help_text="The timestamp at which this BatchExportBackfill was last updated.",
)

@property
def workflow_id(self) -> str:
"""Return the Workflow id that corresponds to this BatchExportBackfill model."""
start_at = self.start_at.strftime("%Y-%m-%dT%H:%M:%S")
end_at = self.end_at.strftime("%Y-%m-%dT%H:%M:%S")
return f"{self.batch_export.id}-Backfill-{start_at}-{end_at}"
24 changes: 20 additions & 4 deletions posthog/batch_exports/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,18 @@ async def describe_schedule(temporal: Client, schedule_id: str):
return await handle.describe()


@async_to_sync
async def cancel_running_batch_export_backfill(temporal: Client, workflow_id: str) -> None:
"""Delete a running BatchExportBackfill.

A BatchExportBackfill represents a Temporal Workflow. When deleting the Temporal
Schedule that we are backfilling, we should also clean-up any Workflows that are
still running.
"""
handle = temporal.get_workflow_handle(workflow_id=workflow_id)
await handle.cancel()


@dataclass
class BackfillBatchExportInputs:
"""Inputs for the BackfillBatchExport Workflow."""
Expand All @@ -270,7 +282,7 @@ def backfill_export(
team_id: int,
start_at: dt.datetime,
end_at: dt.datetime,
) -> None:
) -> str:
"""Starts a backfill for given team and batch export covering given date range.

Arguments:
Expand All @@ -291,11 +303,12 @@ def backfill_export(
start_at=start_at.isoformat(),
end_at=end_at.isoformat(),
)
start_backfill_batch_export_workflow(temporal, inputs=inputs)
workflow_id = start_backfill_batch_export_workflow(temporal, inputs=inputs)
return workflow_id


@async_to_sync
async def start_backfill_batch_export_workflow(temporal: Client, inputs: BackfillBatchExportInputs) -> None:
async def start_backfill_batch_export_workflow(temporal: Client, inputs: BackfillBatchExportInputs) -> str:
"""Async call to start a BackfillBatchExportWorkflow."""
handle = temporal.get_schedule_handle(inputs.batch_export_id)
description = await handle.describe()
Expand All @@ -304,13 +317,16 @@ async def start_backfill_batch_export_workflow(temporal: Client, inputs: Backfil
# Adjust end_at to account for jitter if present.
inputs.end_at = (dt.datetime.fromisoformat(inputs.end_at) + description.schedule.spec.jitter).isoformat()

workflow_id = f"{inputs.batch_export_id}-Backfill-{inputs.start_at}-{inputs.end_at}"
await temporal.start_workflow(
"backfill-batch-export",
inputs,
id=f"{inputs.batch_export_id}-Backfill-{inputs.start_at}-{inputs.end_at}",
id=workflow_id,
task_queue=settings.TEMPORAL_TASK_QUEUE,
)

return workflow_id


def create_batch_export_run(
batch_export_id: UUID,
Expand Down
32 changes: 17 additions & 15 deletions posthog/models/__init__.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,24 @@
from ..batch_exports.models import (
BatchExport,
BatchExportBackfill,
BatchExportDestination,
BatchExportRun,
)
from ..session_recordings.models.session_recording import SessionRecording
from ..session_recordings.models.session_recording_playlist import (
SessionRecordingPlaylist,
)
from ..session_recordings.models.session_recording_playlist_item import (
SessionRecordingPlaylistItem,
)
from ..warehouse.models import DataWarehouseTable
from .action import Action
from .action_step import ActionStep
from .activity_logging.activity_log import ActivityLog
from .activity_logging.notification_viewed import NotificationViewed
from .annotation import Annotation
from .async_deletion import AsyncDeletion, DeletionType
from .async_migration import AsyncMigration, AsyncMigrationError, MigrationStatus
from ..batch_exports.models import (
BatchExport,
BatchExportDestination,
BatchExportRun,
)
from ..warehouse.models import DataWarehouseTable
from .cohort import Cohort, CohortPeople
from .dashboard import Dashboard
from .dashboard_tile import DashboardTile, Text
Expand All @@ -25,6 +33,7 @@
from .experiment import Experiment
from .exported_asset import ExportedAsset
from .feature_flag import FeatureFlag
from .feedback.survey import Survey
from .filters import Filter, RetentionFilter
from .group import Group
from .group_type_mapping import GroupTypeMapping
Expand All @@ -41,28 +50,20 @@
Plugin,
PluginAttachment,
PluginConfig,
PluginSourceFile,
PluginLogEntry,
PluginSourceFile,
)
from .prompt.prompt import Prompt, PromptSequence, UserPromptState
from .property import Property
from .property_definition import PropertyDefinition
from .sharing_configuration import SharingConfiguration
from .subscription import Subscription
from .feedback.survey import Survey
from .tag import Tag
from .tagged_item import TaggedItem
from .team import Team
from .uploaded_media import UploadedMedia
from .user import User, UserManager
from .user_scene_personalisation import UserScenePersonalisation
from ..session_recordings.models.session_recording import SessionRecording
from ..session_recordings.models.session_recording_playlist import (
SessionRecordingPlaylist,
)
from ..session_recordings.models.session_recording_playlist_item import (
SessionRecordingPlaylistItem,
)

__all__ = [
"Action",
Expand All @@ -73,6 +74,7 @@
"AsyncMigration",
"AsyncMigrationError",
"BatchExport",
"BatchExportBackfill",
"BatchExportDestination",
"BatchExportRun",
"Cohort",
Expand Down
Loading