From 516493faa43a0a53f50bd90da4e50de7260c35b2 Mon Sep 17 00:00:00 2001 From: ted kaemming <65315+tkaemming@users.noreply.github.com> Date: Tue, 27 Feb 2024 11:24:22 -0700 Subject: [PATCH] feat(persons): Add simple backfill command for distinct ID overrides. (#20562) --- .../backfill_distinct_id_overrides.py | 70 +++++++++++++++++++ .../test_backfill_distinct_id_overrides.py | 56 +++++++++++++++ 2 files changed, 126 insertions(+) create mode 100644 posthog/management/commands/backfill_distinct_id_overrides.py create mode 100644 posthog/management/commands/test/test_backfill_distinct_id_overrides.py diff --git a/posthog/management/commands/backfill_distinct_id_overrides.py b/posthog/management/commands/backfill_distinct_id_overrides.py new file mode 100644 index 0000000000000..35133ef4addbf --- /dev/null +++ b/posthog/management/commands/backfill_distinct_id_overrides.py @@ -0,0 +1,70 @@ +from __future__ import annotations + +import logging +from dataclasses import dataclass + +import structlog +from django.core.management.base import BaseCommand, CommandError + +from posthog.clickhouse.client.execute import sync_execute +from posthog.models.team.team import Team + + +logger = structlog.get_logger(__name__) + + +@dataclass +class BackfillQuery: + team_id: int + + def execute(self, dry_run: bool = False) -> None: + query = """ + SELECT + team_id, + distinct_id, + argMax(person_id, version), + argMax(is_deleted, version), + max(version) + FROM person_distinct_id2 + WHERE + team_id = %(team_id)s + AND version > 0 + GROUP BY team_id, distinct_id + """ + + parameters = { + "team_id": self.team_id, + } + + if dry_run: + [(count,)] = sync_execute(f"SELECT count() FROM ({query})", parameters) + logger.info("%r would have inserted %r records.", self, count) + else: + # XXX: Nothing useful to report here, unfortunately... all that is + # returned is an empty result set. + sync_execute( + f""" + INSERT INTO person_distinct_id_overrides + (team_id, distinct_id, person_id, is_deleted, version) + {query} + """, + parameters, + ) + + +class Command(BaseCommand): + help = "Backfill person_distinct_id_overrides records." + + def add_arguments(self, parser): + parser.add_argument("--team-id", required=True, type=int, help="team to backfill for") + parser.add_argument( + "--live-run", action="store_true", help="actually execute INSERT queries (default is dry-run)" + ) + + def handle(self, *, live_run: bool, team_id: int, **options): + logger.setLevel(logging.INFO) + + if not Team.objects.filter(id=team_id).exists(): + raise CommandError(f"Team with id={team_id!r} does not exist") + + BackfillQuery(team_id).execute(dry_run=not live_run) diff --git a/posthog/management/commands/test/test_backfill_distinct_id_overrides.py b/posthog/management/commands/test/test_backfill_distinct_id_overrides.py new file mode 100644 index 0000000000000..6a161d6205d18 --- /dev/null +++ b/posthog/management/commands/test/test_backfill_distinct_id_overrides.py @@ -0,0 +1,56 @@ +import operator +import uuid + +from posthog.clickhouse.client.execute import sync_execute +from posthog.management.commands.backfill_distinct_id_overrides import BackfillQuery +from posthog.test.base import BaseTest, ClickhouseTestMixin + + +class ExecuteBackfillTestCase(ClickhouseTestMixin, BaseTest): + def __run_test_backfill(self, dry_run: bool) -> None: + distinct_id = "override-me" + + rows_for_distinct_id = [ + {"team_id": self.team.id, "distinct_id": distinct_id, "person_id": uuid.uuid4(), "version": version} + for version in range(3) + ] + + # never were overridden (version = 0), so no overrides should be created + rows_to_ignore = [ + {"team_id": self.team.id, "distinct_id": f"ignore-me/{i}", "person_id": uuid.uuid4(), "version": 0} + for i in range(5) + ] + + sync_execute( + "INSERT INTO person_distinct_id2 (team_id, distinct_id, person_id, version) VALUES", + [*rows_for_distinct_id, *rows_to_ignore], + ) + + # nothing should be in the override table yet + assert sync_execute( + "SELECT count() FROM person_distinct_id_overrides WHERE team_id = %(team_id)s", + {"team_id": self.team.id}, + ) == [(0,)] + + BackfillQuery(self.team.id).execute(dry_run=dry_run) + + read_columns = ["team_id", "distinct_id", "person_id", "version"] + distinct_id_override_rows = sync_execute( + f""" + SELECT {', '.join(read_columns)} + FROM person_distinct_id_overrides + WHERE team_id = %(team_id)s + """, + {"team_id": self.team.id}, + ) + + if not dry_run: + assert distinct_id_override_rows == [operator.itemgetter(*read_columns)(rows_for_distinct_id[-1])] + else: + assert distinct_id_override_rows == [] + + def test_execute_backfill(self) -> None: + self.__run_test_backfill(dry_run=False) + + def test_execute_backfill_dry_run(self) -> None: + self.__run_test_backfill(dry_run=True)