Skip to content

Commit

Permalink
chore(batch-exports): add team migration management command
Browse files Browse the repository at this point in the history
  • Loading branch information
bretthoerner committed Mar 7, 2024
1 parent 42074f1 commit 218b770
Show file tree
Hide file tree
Showing 3 changed files with 243 additions and 14 deletions.
16 changes: 2 additions & 14 deletions posthog/batch_exports/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@
BatchExportServiceScheduleNotFound,
BatchExportWithNoEndNotAllowedError,
backfill_export,
batch_export_delete_schedule,
cancel_running_batch_export_backfill,
disable_and_delete_export,
pause_batch_export,
sync_batch_export,
unpause_batch_export,
Expand All @@ -43,7 +42,6 @@
from posthog.hogql.printer import prepare_ast_for_printing, print_prepared_ast
from posthog.models import (
BatchExport,
BatchExportBackfill,
BatchExportDestination,
BatchExportRun,
Team,
Expand Down Expand Up @@ -436,24 +434,14 @@ def perform_destroy(self, instance: BatchExport):
since we are deleting, we assume that we can recover from this state by finishing the delete operation by calling
instance.save().
"""
temporal = sync_connect()

instance.deleted = True

try:
batch_export_delete_schedule(temporal, str(instance.pk))
disable_and_delete_export(instance)
except BatchExportServiceScheduleNotFound as e:
logger.warning(
"The Schedule %s could not be deleted as it was not found",
e.schedule_id,
)

instance.save()

for backfill in BatchExportBackfill.objects.filter(batch_export=instance):
if backfill.status == BatchExportBackfill.Status.RUNNING:
cancel_running_batch_export_backfill(temporal, backfill.workflow_id)


class BatchExportOrganizationViewSet(BatchExportViewSet):
filter_rewrite_rules = {"organization_id": "team__organization_id"}
Expand Down
15 changes: 15 additions & 0 deletions posthog/batch_exports/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,21 @@ def unpause_batch_export(
backfill_export(temporal, batch_export_id, batch_export.team_id, start_at, end_at)


def disable_and_delete_export(instance: BatchExport):
"""Mark a BatchExport as deleted and delete its Temporal Schedule (including backfills)."""
temporal = sync_connect()

instance.deleted = True

batch_export_delete_schedule(temporal, str(instance.pk))

instance.save()

for backfill in BatchExportBackfill.objects.filter(batch_export=instance):
if backfill.status == BatchExportBackfill.Status.RUNNING:
cancel_running_batch_export_backfill(temporal, backfill.workflow_id)


def batch_export_delete_schedule(temporal: Client, schedule_id: str) -> None:
"""Delete a Temporal Schedule."""
try:
Expand Down
226 changes: 226 additions & 0 deletions posthog/management/commands/migrate_team.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
import datetime as dt
import logging

from django.db import transaction
from django.core.management.base import BaseCommand, CommandError

from posthog.batch_exports.models import BATCH_EXPORT_INTERVALS
from posthog.batch_exports.service import (
backfill_export,
sync_batch_export,
disable_and_delete_export,
)
from posthog.models import (
BatchExport,
BatchExportBackfill,
BatchExportDestination,
BatchExportRun,
Team,
)
from posthog.temporal.common.client import sync_connect


logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

EXPORT_NAME = "PostHog HTTP Migration"
VALID_INTERVALS = {i[0] for i in BATCH_EXPORT_INTERVALS}
REGION_URLS = {
"us": "https://app.posthog.com/batch",
"eu": "https://eu.posthog.com/batch",
}


def display(message, **kwargs):
print(message) # noqa: T201
for key, value in kwargs.items():
print(f" {key} = {value}") # noqa: T201

Check failure

Code scanning / CodeQL

Clear-text logging of sensitive information High

This expression logs
sensitive data (secret)
as clear text.
This expression logs
sensitive data (password)
as clear text.
This expression logs
sensitive data (password)
as clear text.
This expression logs
sensitive data (certificate)
as clear text.
This expression logs
sensitive data (password)
as clear text.
This expression logs
sensitive data (password)
as clear text.
print() # noqa: T201


class Command(BaseCommand):
help = "Creates an HTTP batch export for a team to migrate data to another PostHog instance, \
or another team within the same instance."

def add_arguments(self, parser):
parser.add_argument(
"--source-team-id", default=None, type=int, help="Team ID to migrate from (on this instance)"
)
parser.add_argument("--interval", default=None, type=str, help="Interval to use for the batch export")
parser.add_argument(
"--start-at",
default=None,
type=str,
help="Timestamp to start the backfill from in UTC, 'YYYY-MM-DD' or 'YYYY-MM-DD HH:MM:SS'",
)
parser.add_argument(
"--end-days-from-now",
default=30,
type=int,
help="Number of days from now to automatically end the ongoing export at, the default is usually fine",
)
parser.add_argument("--dest-token", default=None, type=str, help="Destination Project API Key (token)")
parser.add_argument("--dest-region", default=None, type=str, help="Destination region")
parser.add_argument(
"--delete-existing", default=False, type=bool, help="Delete existing batch export if it exists"
)

def handle(self, *args, **options):
team_id = options["source_team_id"]
if not team_id:
raise CommandError("source Team ID is required")

team = Team.objects.select_related("organization").get(id=team_id)

display(
"Found source team",
team_id=team_id,
team_name=team.name,
organization_name=team.organization.name,
)

existing_export = None
existing_exports = BatchExport.objects.filter(
team=team, destination__type="HTTP", name=EXPORT_NAME, deleted=False
)
if len(existing_exports) > 1:
raise CommandError("More than one existing migration found -- this should never happen")
if existing_exports:
existing_export = existing_exports[0]
existing_backfill = BatchExportBackfill.objects.get(batch_export=existing_export)

display(
"Existing migration",
batch_export_id=existing_export.id,
paused=existing_export.paused,
interval=existing_export.interval,
created_at=existing_export.created_at,
last_updated_at=existing_export.last_updated_at,
)
display(
"Existing backfill",
backfill_id=existing_backfill.id,
status=existing_backfill.status,
start_at=existing_backfill.start_at,
created_at=existing_backfill.created_at,
last_updated_at=existing_backfill.last_updated_at,
)

recent_runs = BatchExportRun.objects.filter(batch_export=existing_export).order_by("-created_at")[:2]

if not recent_runs:
display("No export runs found")

for run in recent_runs:
display(
"Recent run",
run_id=run.id,
status=run.status,
latest_error=run.latest_error,
data_interval_start=run.data_interval_start,
data_interval_end=run.data_interval_end,
created_at=run.created_at,
last_updated_at=run.last_updated_at,
)

if run.status == BatchExportRun.Status.COMPLETED:
# We only want to print the 2nd most recent run if the most recent run isn't
# completed.
break

if options["delete_existing"]:
display("Deleting existing batch export and backfill")
disable_and_delete_export(existing_export)
display("Deleted existing batch export and backfill successfully")
else:
display("Existing migration job exists and won't be deleted, exiting")
return

interval = options["interval"]
start_at = options["start_at"]
dest_token = options["dest_token"]
dest_region = options["dest_region"]

if not any(
[
interval,
start_at,
dest_token,
dest_region,
]
):
# User didn't provide any arguments to create a migration, so they must have just wanted
# to check the status and/or delete the existing migration.
display("No other arguments provided, exiting")
return

if interval not in VALID_INTERVALS:
raise CommandError("invalid interval, choices are: %s" % VALID_INTERVALS)

if not dest_token.startswith("phc_"):
raise CommandError("invalid destination token, must start with 'phc_'")

dest_region = dest_region.lower()
if dest_region not in REGION_URLS:
raise CommandError("invalid destination region, choices are: 'us', 'eu'")
url = REGION_URLS[dest_region]

try:
start_at = parse_to_utc(start_at)
except ValueError as e:
raise CommandError("couldn't parse start_at: %s" % e)

display(
"Creating migration",
team_id=team_id,
team_name=team.name,
organization_name=team.organization.name,
interval=interval,
start_at=start_at,
dest_token=dest_token,
dest_region=dest_region,
url=url,
)
result = input("Enter [y] to continue (Ctrl+C to cancel) ")
if result.lower() != "y":
raise CommandError("Didn't receive 'y', exiting")
print() # noqa: T201

now = dt.datetime.now(dt.timezone.utc)
# This is a precaution so we don't accidentally leave the export running indefinitely.
end_at = now + dt.timedelta(days=options["end_days_from_now"])

destination = BatchExportDestination(
type=BatchExportDestination.Destination.HTTP,
config={"url": url, "token": dest_token},
)
batch_export = BatchExport(
team_id=team_id,
destination=destination,
name=EXPORT_NAME,
interval=interval,
paused=True,
end_at=end_at,
)
sync_batch_export(batch_export, created=True)

with transaction.atomic():
destination.save()
batch_export.save()

temporal = sync_connect()
backfill_id = backfill_export(temporal, str(batch_export.pk), team_id, start_at, end_at=None)
display("Backfill started", batch_export_id=batch_export.id, backfill_id=backfill_id)


def parse_to_utc(date_str: str) -> dt.datetime:
try:
parsed_datetime = dt.datetime.strptime(date_str, "%Y-%m-%d")
except ValueError:
try:
parsed_datetime = dt.datetime.strptime(date_str, "%Y-%m-%d %H:%M:%S")
except ValueError:
raise ValueError("Invalid date format. Expected 'YYYY-MM-DD' or 'YYYY-MM-DD HH:MM:SS'.")

utc_datetime = parsed_datetime.replace(tzinfo=dt.timezone.utc)
return utc_datetime

0 comments on commit 218b770

Please sign in to comment.