diff --git a/posthog/management/commands/fix_person_distinct_ids_after_delete.py b/posthog/management/commands/fix_person_distinct_ids_after_delete.py index 4f0853dd001ba..ea88aef46cda0 100644 --- a/posthog/management/commands/fix_person_distinct_ids_after_delete.py +++ b/posthog/management/commands/fix_person_distinct_ids_after_delete.py @@ -19,7 +19,10 @@ class Command(BaseCommand): def add_arguments(self, parser): parser.add_argument("--team-id", default=None, type=int, help="Specify a team to fix data for.") - parser.add_argument("--new-version", default=2500, type=int, help="New version value to use") + parser.add_argument("--distinct-id", default=None, type=str, help="Specify a distinct ID to fix.") + parser.add_argument( + "--all-distinct-ids", action="store_true", help="Whether to fix *all* distinct IDs for the team." + ) parser.add_argument("--live-run", action="store_true", help="Run changes, default is dry-run") def handle(self, *args, **options): @@ -34,28 +37,39 @@ def run(options, sync: bool = False): exit(1) team_id = options["team_id"] - version = options["new_version"] - distinct_ids = get_distinct_ids_tied_to_deleted_persons(team_id) + distinct_id = options.get("distinct_id") + all_distinct_ids = options.get("all_distinct_ids", False) + if (not distinct_id and not all_distinct_ids) or (distinct_id and all_distinct_ids): + logger.error("You must specify one of --distinct-id or --all-distinct-ids to run this script") + exit(1) + + if all_distinct_ids: + distinct_ids_and_versions = get_distinct_ids_and_versions_tied_to_deleted_persons(team_id) + else: + existing_version = get_version_for_distinct_id(team_id, distinct_id) + distinct_ids_and_versions = [(distinct_id, existing_version)] - for distinct_id in distinct_ids: + for distinct_id, existing_version in distinct_ids_and_versions: + new_version = existing_version + 100 # this can throw but this script can safely be re-run as # updated distinct_ids won't show up in the search anymore # since they no longer belong to deleted persons # it's safer to throw and exit if anything went wrong - update_distinct_id(distinct_id, version, team_id, live_run, sync) + update_distinct_id(distinct_id, new_version, team_id, live_run, sync) - logger.info("Waiting on Kafka producer flush, for up to 5 minutes") - KafkaProducer().flush(5 * 60) - logger.info("Kafka producer queue flushed.") + if live_run: + logger.info("Waiting on Kafka producer flush, for up to 5 minutes") + KafkaProducer().flush(5 * 60) + logger.info("Kafka producer queue flushed.") -def get_distinct_ids_tied_to_deleted_persons(team_id: int) -> list[str]: +def get_distinct_ids_and_versions_tied_to_deleted_persons(team_id: int) -> list[tuple[str, int]]: # find distinct_ids where the person is set to be deleted rows = sync_execute( """ - SELECT distinct_id FROM ( - SELECT distinct_id, argMax(person_id, version) AS person_id FROM person_distinct_id2 WHERE team_id = %(team)s GROUP BY distinct_id + SELECT distinct_id, version FROM ( + SELECT distinct_id, argMax(person_id, version) AS person_id, max(version) as version FROM person_distinct_id2 WHERE team_id = %(team)s GROUP BY distinct_id ) AS pdi2 WHERE pdi2.person_id NOT IN (SELECT id FROM person WHERE team_id = %(team)s) OR @@ -65,7 +79,23 @@ def get_distinct_ids_tied_to_deleted_persons(team_id: int) -> list[str]: "team": team_id, }, ) - return [row[0] for row in rows] + return [(row[0], row[1]) for row in rows] + + +def get_version_for_distinct_id(team_id: int, distinct_id: str) -> int: + rows = sync_execute( + """ + SELECT max(version) as version FROM person_distinct_id2 WHERE team_id = %(team)s AND distinct_id = %(distinct_id)s + """, + { + "team": team_id, + "distinct_id": distinct_id, + }, + ) + assert ( + len(rows) == 1 + ), f"Expected to find exactly one row in person_distinct_id2 for team_id:{team_id}, distinct_id:{distinct_id}, got {len(rows)}" + return rows[0][0] def update_distinct_id(distinct_id: str, version: int, team_id: int, live_run: bool, sync: bool):