diff --git a/posthog/batch_exports/http.py b/posthog/batch_exports/http.py index 5e84d7f446b3d0..179684ea37678e 100644 --- a/posthog/batch_exports/http.py +++ b/posthog/batch_exports/http.py @@ -31,8 +31,7 @@ BatchExportServiceScheduleNotFound, BatchExportWithNoEndNotAllowedError, backfill_export, - batch_export_delete_schedule, - cancel_running_batch_export_backfill, + disable_and_delete_export, pause_batch_export, sync_batch_export, unpause_batch_export, @@ -43,7 +42,6 @@ from posthog.hogql.printer import prepare_ast_for_printing, print_prepared_ast from posthog.models import ( BatchExport, - BatchExportBackfill, BatchExportDestination, BatchExportRun, Team, @@ -436,24 +434,14 @@ def perform_destroy(self, instance: BatchExport): since we are deleting, we assume that we can recover from this state by finishing the delete operation by calling instance.save(). """ - temporal = sync_connect() - - instance.deleted = True - try: - batch_export_delete_schedule(temporal, str(instance.pk)) + disable_and_delete_export(instance) except BatchExportServiceScheduleNotFound as e: logger.warning( "The Schedule %s could not be deleted as it was not found", e.schedule_id, ) - instance.save() - - for backfill in BatchExportBackfill.objects.filter(batch_export=instance): - if backfill.status == BatchExportBackfill.Status.RUNNING: - cancel_running_batch_export_backfill(temporal, backfill.workflow_id) - class BatchExportOrganizationViewSet(BatchExportViewSet): filter_rewrite_rules = {"organization_id": "team__organization_id"} diff --git a/posthog/batch_exports/service.py b/posthog/batch_exports/service.py index 4930665d13f6d3..4d3d44cbdf6060 100644 --- a/posthog/batch_exports/service.py +++ b/posthog/batch_exports/service.py @@ -291,6 +291,21 @@ def unpause_batch_export( backfill_export(temporal, batch_export_id, batch_export.team_id, start_at, end_at) +def disable_and_delete_export(instance: BatchExport): + """Mark a BatchExport as deleted and delete its Temporal Schedule (including backfills).""" + temporal = sync_connect() + + instance.deleted = True + + batch_export_delete_schedule(temporal, str(instance.pk)) + + instance.save() + + for backfill in BatchExportBackfill.objects.filter(batch_export=instance): + if backfill.status == BatchExportBackfill.Status.RUNNING: + cancel_running_batch_export_backfill(temporal, backfill.workflow_id) + + def batch_export_delete_schedule(temporal: Client, schedule_id: str) -> None: """Delete a Temporal Schedule.""" try: diff --git a/posthog/management/commands/migrate_team.py b/posthog/management/commands/migrate_team.py new file mode 100644 index 00000000000000..736937f2a19d20 --- /dev/null +++ b/posthog/management/commands/migrate_team.py @@ -0,0 +1,223 @@ +import datetime as dt +import logging + +from django.db import transaction +from django.core.management.base import BaseCommand, CommandError + +from posthog.batch_exports.models import BATCH_EXPORT_INTERVALS +from posthog.batch_exports.service import ( + backfill_export, + sync_batch_export, + disable_and_delete_export, +) +from posthog.models import ( + BatchExport, + BatchExportBackfill, + BatchExportDestination, + BatchExportRun, + Team, +) +from posthog.temporal.common.client import sync_connect + + +logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) + +EXPORT_NAME = "PostHog HTTP Migration" +VALID_INTERVALS = {i[0] for i in BATCH_EXPORT_INTERVALS} +REGION_URLS = { + "us": "https://app.posthog.com/batch", + "eu": "https://eu.posthog.com/batch", +} + + +def display(message, **kwargs): + print(message) # noqa: T201 + for key, value in kwargs.items(): + print(f" {key} = {value}") # noqa: T201 + print() # noqa: T201 + + +class Command(BaseCommand): + help = "Creates an HTTP batch export for a team to migrate data to another PostHog instance, \ + or another team within the same instance." + + def add_arguments(self, parser): + parser.add_argument( + "--source-team-id", default=None, type=int, help="Team ID to migrate from (on this instance)" + ) + parser.add_argument("--interval", default=None, type=str, help="Interval to use for the batch export") + parser.add_argument( + "--start-at", + default=None, + type=str, + help="Timestamp to start the backfill from in UTC, 'YYYY-MM-DD' or 'YYYY-MM-DD HH:MM:SS'", + ) + parser.add_argument( + "--end-days-from-now", + default=30, + type=int, + help="Number of days from now to automatically end the ongoing export at, the default is usually fine", + ) + parser.add_argument("--dest-token", default=None, type=str, help="Destination Project API Key (token)") + parser.add_argument("--dest-region", default=None, type=str, help="Destination region") + parser.add_argument( + "--delete-existing", default=False, type=bool, help="Delete existing batch export if it exists" + ) + + def handle(self, *args, **options): + team_id = options["source_team_id"] + if not team_id: + raise CommandError("source Team ID is required") + + team = Team.objects.select_related("organization").get(id=team_id) + + display( + "Found source team", + team_id=team_id, + team_name=team.name, + organization_name=team.organization.name, + ) + + existing_export = None + existing_exports = BatchExport.objects.filter( + team=team, destination__type="HTTP", name=EXPORT_NAME, deleted=False + ) + if len(existing_exports) > 1: + raise CommandError("More than one existing migration found -- this should never happen") + if existing_exports: + existing_export = existing_exports[0] + existing_backfill = BatchExportBackfill.objects.get(batch_export=existing_export) + + display( + "Existing migration", + batch_export_id=existing_export.id, + paused=existing_export.paused, + interval=existing_export.interval, + created_at=existing_export.created_at, + last_updated_at=existing_export.last_updated_at, + ) + display( + "Existing backfill", + backfill_id=existing_backfill.id, + status=existing_backfill.status, + start_at=existing_backfill.start_at, + created_at=existing_backfill.created_at, + last_updated_at=existing_backfill.last_updated_at, + ) + + recent_runs = BatchExportRun.objects.filter(batch_export=existing_export).order_by("-created_at")[:2] + + if not recent_runs: + display("No export runs found") + + for run in recent_runs: + display( + "Recent run", + run_id=run.id, + status=run.status, + latest_error=run.latest_error, + data_interval_start=run.data_interval_start, + data_interval_end=run.data_interval_end, + created_at=run.created_at, + last_updated_at=run.last_updated_at, + ) + + if run.status == BatchExportRun.Status.COMPLETED: + # We only want to print the 2nd most recent run if the most recent run isn't + # completed. + break + + if options["delete_existing"]: + display("Deleting existing batch export and backfill") + disable_and_delete_export(existing_export) + display("Deleted existing batch export and backfill successfully") + + interval = options["interval"] + start_at = options["start_at"] + dest_token = options["dest_token"] + dest_region = options["dest_region"] + + if not any( + [ + interval, + start_at, + dest_token, + dest_region, + ] + ): + # User didn't provide any arguments to create a migration, so they must have just wanted + # to check the status and/or delete the existing migration. + display("No other arguments provided, exiting") + return + + if interval not in VALID_INTERVALS: + raise CommandError("invalid interval, choices are: %s" % VALID_INTERVALS) + + if not dest_token.startswith("phc_"): + raise CommandError("invalid destination token, must start with 'phc_'") + + dest_region = dest_region.lower() + if dest_region not in REGION_URLS: + raise CommandError("invalid destination region, choices are: 'us', 'eu'") + url = REGION_URLS[dest_region] + + try: + start_at = parse_to_utc(start_at) + except ValueError as e: + raise CommandError(e.message) + + display( + "Creating migration", + team_id=team_id, + team_name=team.name, + organization_name=team.organization.name, + interval=interval, + start_at=start_at, + dest_token=dest_token, + dest_region=dest_region, + url=url, + ) + result = input("Enter [y] to continue (Ctrl+C to cancel) ") + if result.lower() != "y": + raise CommandError("Didn't receive 'y', exiting") + print() # noqa: T201 + + now = dt.datetime.now(dt.timezone.utc) + # This is a precaution so we don't accidentally leave the export running indefinitely. + end_at = now + dt.timedelta(days=options["end_days_from_now"]) + + destination = BatchExportDestination( + type=BatchExportDestination.Destination.HTTP, + config={"url": url, "token": dest_token}, + ) + batch_export = BatchExport( + team_id=team_id, + destination=destination, + name=EXPORT_NAME, + interval=interval, + paused=True, + end_at=end_at, + ) + sync_batch_export(batch_export, created=True) + + with transaction.atomic(): + destination.save() + batch_export.save() + + temporal = sync_connect() + backfill_id = backfill_export(temporal, str(batch_export.pk), team_id, start_at, end_at=None) + display("Backfill started", batch_export_id=batch_export.id, backfill_id=backfill_id) + + +def parse_to_utc(date_str: str) -> dt.datetime: + try: + parsed_datetime = dt.datetime.strptime(date_str, "%Y-%m-%d") + except ValueError: + try: + parsed_datetime = dt.datetime.strptime(date_str, "%Y-%m-%d %H:%M:%S") + except ValueError: + raise ValueError("Invalid date format. Expected 'YYYY-MM-DD' or 'YYYY-MM-DD HH:MM:SS'.") + + utc_datetime = parsed_datetime.replace(tzinfo=dt.timezone.utc) + return utc_datetime