Skip to content

Commit

Permalink
fix(batch-export-backfills): Cancel ongoing backfills on schedule delete
Browse files Browse the repository at this point in the history
  • Loading branch information
tomasfarias committed Oct 27, 2023
1 parent 4b06e35 commit f309a7d
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 15 deletions.
6 changes: 6 additions & 0 deletions posthog/batch_exports/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,15 @@
BatchExportServiceError,
BatchExportServiceRPCError,
backfill_export,
cancel_running_batch_export_backfill,
delete_schedule,
pause_batch_export,
sync_batch_export,
unpause_batch_export,
)
from posthog.models import (
BatchExport,
BatchExportBackfill,
BatchExportDestination,
BatchExportRun,
Team,
Expand Down Expand Up @@ -324,6 +326,10 @@ def perform_destroy(self, instance: BatchExport):
delete_schedule(temporal, str(instance.pk))
instance.save()

for backfill in BatchExportBackfill.objects.filter(batch_export=instance):
if backfill.status == BatchExportBackfill.Status.RUNNING:
cancel_running_batch_export_backfill(temporal, str(backfill.pk))


class BatchExportLogEntrySerializer(DataclassSerializer):
class Meta:
Expand Down
12 changes: 12 additions & 0 deletions posthog/batch_exports/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,18 @@ async def describe_schedule(temporal: Client, schedule_id: str):
return await handle.describe()


@async_to_sync
async def cancel_running_batch_export_backfill(temporal: Client, workflow_id: str) -> None:
"""Delete a running BatchExportBackfill.
A BatchExportBackfill represents a Temporal Workflow. When deleting the Temporal
Schedule that we are backfilling, we should also clean-up any Workflows that are
still running.
"""
handle = temporal.get_workflow_handle(workflow_id)
await handle.cancel()


@dataclass
class BackfillBatchExportInputs:
"""Inputs for the BackfillBatchExport Workflow."""
Expand Down
32 changes: 17 additions & 15 deletions posthog/models/__init__.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,24 @@
from ..batch_exports.models import (
BatchExport,
BatchExportBackfill,
BatchExportDestination,
BatchExportRun,
)
from ..session_recordings.models.session_recording import SessionRecording
from ..session_recordings.models.session_recording_playlist import (
SessionRecordingPlaylist,
)
from ..session_recordings.models.session_recording_playlist_item import (
SessionRecordingPlaylistItem,
)
from ..warehouse.models import DataWarehouseTable
from .action import Action
from .action_step import ActionStep
from .activity_logging.activity_log import ActivityLog
from .activity_logging.notification_viewed import NotificationViewed
from .annotation import Annotation
from .async_deletion import AsyncDeletion, DeletionType
from .async_migration import AsyncMigration, AsyncMigrationError, MigrationStatus
from ..batch_exports.models import (
BatchExport,
BatchExportDestination,
BatchExportRun,
)
from ..warehouse.models import DataWarehouseTable
from .cohort import Cohort, CohortPeople
from .dashboard import Dashboard
from .dashboard_tile import DashboardTile, Text
Expand All @@ -25,6 +33,7 @@
from .experiment import Experiment
from .exported_asset import ExportedAsset
from .feature_flag import FeatureFlag
from .feedback.survey import Survey
from .filters import Filter, RetentionFilter
from .group import Group
from .group_type_mapping import GroupTypeMapping
Expand All @@ -41,28 +50,20 @@
Plugin,
PluginAttachment,
PluginConfig,
PluginSourceFile,
PluginLogEntry,
PluginSourceFile,
)
from .prompt.prompt import Prompt, PromptSequence, UserPromptState
from .property import Property
from .property_definition import PropertyDefinition
from .sharing_configuration import SharingConfiguration
from .subscription import Subscription
from .feedback.survey import Survey
from .tag import Tag
from .tagged_item import TaggedItem
from .team import Team
from .uploaded_media import UploadedMedia
from .user import User, UserManager
from .user_scene_personalisation import UserScenePersonalisation
from ..session_recordings.models.session_recording import SessionRecording
from ..session_recordings.models.session_recording_playlist import (
SessionRecordingPlaylist,
)
from ..session_recordings.models.session_recording_playlist_item import (
SessionRecordingPlaylistItem,
)

__all__ = [
"Action",
Expand All @@ -73,6 +74,7 @@
"AsyncMigration",
"AsyncMigrationError",
"BatchExport",
"BatchExportBackfill",
"BatchExportDestination",
"BatchExportRun",
"Cohort",
Expand Down

0 comments on commit f309a7d

Please sign in to comment.