Skip to content

Commit

Permalink
fix(batch-export-backfills): Cancel ongoing backfills on schedule delete
Browse files Browse the repository at this point in the history
  • Loading branch information
tomasfarias committed Oct 23, 2023
1 parent 437445e commit ef4021b
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 11 deletions.
6 changes: 6 additions & 0 deletions posthog/batch_exports/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,15 @@
BatchExportServiceError,
BatchExportServiceRPCError,
backfill_export,
cancel_running_batch_export_backfill,
delete_schedule,
pause_batch_export,
sync_batch_export,
unpause_batch_export,
)
from posthog.models import (
BatchExport,
BatchExportBackfill,
BatchExportDestination,
BatchExportRun,
Team,
Expand Down Expand Up @@ -312,6 +314,10 @@ def perform_destroy(self, instance: BatchExport):
delete_schedule(temporal, str(instance.pk))
instance.save()

for backfill in BatchExportBackfill.objects.filter(batch_export=instance):
if backfill.status == BatchExportBackfill.Status.RUNNING:
cancel_running_batch_export_backfill(temporal, str(backfill.pk))


class BatchExportLogEntrySerializer(DataclassSerializer):
class Meta:
Expand Down
12 changes: 12 additions & 0 deletions posthog/batch_exports/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,18 @@ async def describe_schedule(temporal: Client, schedule_id: str):
return await handle.describe()


@async_to_sync
async def cancel_running_batch_export_backfill(temporal: Client, workflow_id: str) -> None:
"""Delete a running BatchExportBackfill.
A BatchExportBackfill represents a Temporal Workflow. When deleting the Temporal
Schedule that we are backfilling, we should also clean-up any Workflows that are
still running.
"""
handle = temporal.get_workflow_handle(workflow_id)
await handle.cancel()


@dataclass
class BackfillBatchExportInputs:
"""Inputs for the BackfillBatchExport Workflow."""
Expand Down
34 changes: 23 additions & 11 deletions posthog/models/__init__.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,24 @@
from ..batch_exports.models import (
BatchExport,
BatchExportBackfill,
BatchExportDestination,
BatchExportRun,
)
from ..session_recordings.models.session_recording import SessionRecording
from ..session_recordings.models.session_recording_playlist import (
SessionRecordingPlaylist,
)
from ..session_recordings.models.session_recording_playlist_item import (
SessionRecordingPlaylistItem,
)
from ..warehouse.models import DataWarehouseTable
from .action import Action
from .action_step import ActionStep
from .activity_logging.activity_log import ActivityLog
from .activity_logging.notification_viewed import NotificationViewed
from .annotation import Annotation
from .async_deletion import AsyncDeletion, DeletionType
from .async_migration import AsyncMigration, AsyncMigrationError, MigrationStatus
from ..batch_exports.models import (
BatchExport,
BatchExportDestination,
BatchExportRun,
)
from ..warehouse.models import DataWarehouseTable
from .cohort import Cohort, CohortPeople
from .dashboard import Dashboard
from .dashboard_tile import DashboardTile, Text
Expand All @@ -25,6 +33,7 @@
from .experiment import Experiment
from .exported_asset import ExportedAsset
from .feature_flag import FeatureFlag
from .feedback.survey import Survey
from .filters import Filter, RetentionFilter
from .group import Group
from .group_type_mapping import GroupTypeMapping
Expand All @@ -37,22 +46,24 @@
from .organization_domain import OrganizationDomain
from .person import Person, PersonDistinctId, PersonOverride, PersonOverrideMapping
from .personal_api_key import PersonalAPIKey
from .plugin import Plugin, PluginAttachment, PluginConfig, PluginSourceFile, PluginLogEntry
from .plugin import (
Plugin,
PluginAttachment,
PluginConfig,
PluginLogEntry,
PluginSourceFile,
)
from .prompt.prompt import Prompt, PromptSequence, UserPromptState
from .property import Property
from .property_definition import PropertyDefinition
from .sharing_configuration import SharingConfiguration
from .subscription import Subscription
from .feedback.survey import Survey
from .tag import Tag
from .tagged_item import TaggedItem
from .team import Team
from .uploaded_media import UploadedMedia
from .user import User, UserManager
from .user_scene_personalisation import UserScenePersonalisation
from ..session_recordings.models.session_recording import SessionRecording
from ..session_recordings.models.session_recording_playlist import SessionRecordingPlaylist
from ..session_recordings.models.session_recording_playlist_item import SessionRecordingPlaylistItem

__all__ = [
"Action",
Expand All @@ -63,6 +74,7 @@
"AsyncMigration",
"AsyncMigrationError",
"BatchExport",
"BatchExportBackfill",
"BatchExportDestination",
"BatchExportRun",
"Cohort",
Expand Down

0 comments on commit ef4021b

Please sign in to comment.