-
Notifications
You must be signed in to change notification settings - Fork 1.4k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(persons): Add simple backfill command for distinct ID overrides. (…
- Loading branch information
Showing
2 changed files
with
126 additions
and
0 deletions.
There are no files selected for viewing
70 changes: 70 additions & 0 deletions
70
posthog/management/commands/backfill_distinct_id_overrides.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,70 @@ | ||
from __future__ import annotations | ||
|
||
import logging | ||
from dataclasses import dataclass | ||
|
||
import structlog | ||
from django.core.management.base import BaseCommand, CommandError | ||
|
||
from posthog.clickhouse.client.execute import sync_execute | ||
from posthog.models.team.team import Team | ||
|
||
|
||
logger = structlog.get_logger(__name__) | ||
|
||
|
||
@dataclass | ||
class BackfillQuery: | ||
team_id: int | ||
|
||
def execute(self, dry_run: bool = False) -> None: | ||
query = """ | ||
SELECT | ||
team_id, | ||
distinct_id, | ||
argMax(person_id, version), | ||
argMax(is_deleted, version), | ||
max(version) | ||
FROM person_distinct_id2 | ||
WHERE | ||
team_id = %(team_id)s | ||
AND version > 0 | ||
GROUP BY team_id, distinct_id | ||
""" | ||
|
||
parameters = { | ||
"team_id": self.team_id, | ||
} | ||
|
||
if dry_run: | ||
[(count,)] = sync_execute(f"SELECT count() FROM ({query})", parameters) | ||
logger.info("%r would have inserted %r records.", self, count) | ||
else: | ||
# XXX: Nothing useful to report here, unfortunately... all that is | ||
# returned is an empty result set. | ||
sync_execute( | ||
f""" | ||
INSERT INTO person_distinct_id_overrides | ||
(team_id, distinct_id, person_id, is_deleted, version) | ||
{query} | ||
""", | ||
parameters, | ||
) | ||
|
||
|
||
class Command(BaseCommand): | ||
help = "Backfill person_distinct_id_overrides records." | ||
|
||
def add_arguments(self, parser): | ||
parser.add_argument("--team-id", required=True, type=int, help="team to backfill for") | ||
parser.add_argument( | ||
"--live-run", action="store_true", help="actually execute INSERT queries (default is dry-run)" | ||
) | ||
|
||
def handle(self, *, live_run: bool, team_id: int, **options): | ||
logger.setLevel(logging.INFO) | ||
|
||
if not Team.objects.filter(id=team_id).exists(): | ||
raise CommandError(f"Team with id={team_id!r} does not exist") | ||
|
||
BackfillQuery(team_id).execute(dry_run=not live_run) |
56 changes: 56 additions & 0 deletions
56
posthog/management/commands/test/test_backfill_distinct_id_overrides.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,56 @@ | ||
import operator | ||
import uuid | ||
|
||
from posthog.clickhouse.client.execute import sync_execute | ||
from posthog.management.commands.backfill_distinct_id_overrides import BackfillQuery | ||
from posthog.test.base import BaseTest, ClickhouseTestMixin | ||
|
||
|
||
class ExecuteBackfillTestCase(ClickhouseTestMixin, BaseTest): | ||
def __run_test_backfill(self, dry_run: bool) -> None: | ||
distinct_id = "override-me" | ||
|
||
rows_for_distinct_id = [ | ||
{"team_id": self.team.id, "distinct_id": distinct_id, "person_id": uuid.uuid4(), "version": version} | ||
for version in range(3) | ||
] | ||
|
||
# never were overridden (version = 0), so no overrides should be created | ||
rows_to_ignore = [ | ||
{"team_id": self.team.id, "distinct_id": f"ignore-me/{i}", "person_id": uuid.uuid4(), "version": 0} | ||
for i in range(5) | ||
] | ||
|
||
sync_execute( | ||
"INSERT INTO person_distinct_id2 (team_id, distinct_id, person_id, version) VALUES", | ||
[*rows_for_distinct_id, *rows_to_ignore], | ||
) | ||
|
||
# nothing should be in the override table yet | ||
assert sync_execute( | ||
"SELECT count() FROM person_distinct_id_overrides WHERE team_id = %(team_id)s", | ||
{"team_id": self.team.id}, | ||
) == [(0,)] | ||
|
||
BackfillQuery(self.team.id).execute(dry_run=dry_run) | ||
|
||
read_columns = ["team_id", "distinct_id", "person_id", "version"] | ||
distinct_id_override_rows = sync_execute( | ||
f""" | ||
SELECT {', '.join(read_columns)} | ||
FROM person_distinct_id_overrides | ||
WHERE team_id = %(team_id)s | ||
""", | ||
{"team_id": self.team.id}, | ||
) | ||
|
||
if not dry_run: | ||
assert distinct_id_override_rows == [operator.itemgetter(*read_columns)(rows_for_distinct_id[-1])] | ||
else: | ||
assert distinct_id_override_rows == [] | ||
|
||
def test_execute_backfill(self) -> None: | ||
self.__run_test_backfill(dry_run=False) | ||
|
||
def test_execute_backfill_dry_run(self) -> None: | ||
self.__run_test_backfill(dry_run=True) |