From 5a61365b9feaebd71e78c8cba433338eeb5026fc Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Thu, 14 Dec 2023 11:11:28 -0800 Subject: [PATCH 01/16] Start to split out Postgres override handling into its own abstraction. --- .../batch_exports/squash_person_overrides.py | 190 +++++++++--------- 1 file changed, 99 insertions(+), 91 deletions(-) diff --git a/posthog/temporal/batch_exports/squash_person_overrides.py b/posthog/temporal/batch_exports/squash_person_overrides.py index 943dd91192e21..7bffeef6514ac 100644 --- a/posthog/temporal/batch_exports/squash_person_overrides.py +++ b/posthog/temporal/batch_exports/squash_person_overrides.py @@ -87,35 +87,6 @@ AND created_at >= 0; """ -SELECT_ID_FROM_OVERRIDE_UUID = """ -SELECT - id -FROM - posthog_personoverridemapping -WHERE - team_id = %(team_id)s - AND uuid = %(uuid)s; -""" - -DELETE_FROM_PERSON_OVERRIDES = """ -DELETE FROM - posthog_personoverride -WHERE - team_id = %(team_id)s - AND old_person_id = %(old_person_id)s - AND override_person_id = %(override_person_id)s - AND version = %(latest_version)s -RETURNING - old_person_id; -""" - -DELETE_FROM_PERSON_OVERRIDE_MAPPINGS = """ -DELETE FROM - posthog_personoverridemapping -WHERE - id = %(id)s; -""" - class PersonOverrideToDelete(NamedTuple): """A person override that should be deleted after squashing. @@ -436,6 +407,101 @@ async def delete_squashed_person_overrides_from_clickhouse(inputs: QueryInputs) sync_execute(query.format(database=settings.CLICKHOUSE_DATABASE), parameters) +class PostgresPersonOverridesManager: + def __init__(self, connection, dry_run: bool): + self.connection = connection + self.dry_run = dry_run + + SELECT_ID_FROM_OVERRIDE_UUID = """ + SELECT + id + FROM + posthog_personoverridemapping + WHERE + team_id = %(team_id)s + AND uuid = %(uuid)s; + """ + + DELETE_FROM_PERSON_OVERRIDES = """ + DELETE FROM + posthog_personoverride + WHERE + team_id = %(team_id)s + AND old_person_id = %(old_person_id)s + AND override_person_id = %(override_person_id)s + AND version = %(latest_version)s + RETURNING + old_person_id; + """ + + DELETE_FROM_PERSON_OVERRIDE_MAPPINGS = """ + DELETE FROM + posthog_personoverridemapping + WHERE + id = %(id)s; + """ + + def delete(self, person_override: SerializablePersonOverrideToDelete) -> None: + with self.connection.cursor() as cursor: + cursor.execute( + self.SELECT_ID_FROM_OVERRIDE_UUID, + { + "team_id": person_override.team_id, + "uuid": person_override.old_person_id, + }, + ) + row = cursor.fetchone() + if not row: + return + + old_person_id = row[0] + + cursor.execute( + self.SELECT_ID_FROM_OVERRIDE_UUID, + { + "team_id": person_override.team_id, + "uuid": person_override.override_person_id, + }, + ) + row = cursor.fetchone() + if not row: + return + + override_person_id = row[0] + + parameters = { + "team_id": person_override.team_id, + "old_person_id": old_person_id, + "override_person_id": override_person_id, + "latest_version": person_override.latest_version, + } + + if self.dry_run is True: + activity.logger.info("This is a DRY RUN so nothing will be deleted.") + activity.logger.info( + "Would have run query: %s with parameters %s", + self.DELETE_FROM_PERSON_OVERRIDES, + parameters, + ) + return + + cursor.execute(self.DELETE_FROM_PERSON_OVERRIDES, parameters) + row = cursor.fetchone() + if not row: + # There is no existing mapping for this (old_person_id, override_person_id) pair. + # It could be that a newer one was added (with a later version). + return + + deleted_id = row[0] + + cursor.execute( + self.DELETE_FROM_PERSON_OVERRIDE_MAPPINGS, + { + "id": deleted_id, + }, + ) + + @activity.defn async def delete_squashed_person_overrides_from_postgres(inputs: QueryInputs) -> None: """Execute the query to delete from Postgres persons that have been squashed. @@ -454,68 +520,10 @@ async def delete_squashed_person_overrides_from_postgres(inputs: QueryInputs) -> port=settings.DATABASES["default"]["PORT"], **settings.DATABASES["default"].get("SSL_OPTIONS", {}), ) as connection: - with connection.cursor() as cursor: - for person_override_to_delete in inputs.iter_person_overides_to_delete(): - activity.logger.debug("%s", person_override_to_delete) - - cursor.execute( - SELECT_ID_FROM_OVERRIDE_UUID, - { - "team_id": person_override_to_delete.team_id, - "uuid": person_override_to_delete.old_person_id, - }, - ) - - row = cursor.fetchone() - if not row: - continue - old_person_id = row[0] - - cursor.execute( - SELECT_ID_FROM_OVERRIDE_UUID, - { - "team_id": person_override_to_delete.team_id, - "uuid": person_override_to_delete.override_person_id, - }, - ) - - row = cursor.fetchone() - if not row: - continue - - override_person_id = row[0] - - parameters = { - "team_id": person_override_to_delete.team_id, - "old_person_id": old_person_id, - "override_person_id": override_person_id, - "latest_version": person_override_to_delete.latest_version, - } - - if inputs.dry_run is True: - activity.logger.info("This is a DRY RUN so nothing will be deleted.") - activity.logger.info( - "Would have run query: %s with parameters %s", - DELETE_FROM_PERSON_OVERRIDES, - parameters, - ) - continue - - cursor.execute(DELETE_FROM_PERSON_OVERRIDES, parameters) - - row = cursor.fetchone() - if not row: - # There is no existing mapping for this (old_person_id, override_person_id) pair. - # It could be that a newer one was added (with a later version). - continue - deleted_id = row[0] - - cursor.execute( - DELETE_FROM_PERSON_OVERRIDE_MAPPINGS, - { - "id": deleted_id, - }, - ) + person_overrides_manager = PostgresPersonOverridesManager(connection, inputs.dry_run) + for person_override_to_delete in inputs.iter_person_overides_to_delete(): + activity.logger.debug("%s", person_override_to_delete) + person_overrides_manager.delete(person_override_to_delete) @contextlib.asynccontextmanager From 77e347361b88cad4115506330a193b29581beb56 Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Thu, 14 Dec 2023 11:36:42 -0800 Subject: [PATCH 02/16] Start to modify test fixtures. --- .../batch_exports/squash_person_overrides.py | 67 +++++++++++++++++-- .../test_squash_person_overrides_workflow.py | 59 ++-------------- 2 files changed, 68 insertions(+), 58 deletions(-) diff --git a/posthog/temporal/batch_exports/squash_person_overrides.py b/posthog/temporal/batch_exports/squash_person_overrides.py index 7bffeef6514ac..b5b2a52bf0e3b 100644 --- a/posthog/temporal/batch_exports/squash_person_overrides.py +++ b/posthog/temporal/batch_exports/squash_person_overrides.py @@ -408,9 +408,8 @@ async def delete_squashed_person_overrides_from_clickhouse(inputs: QueryInputs) class PostgresPersonOverridesManager: - def __init__(self, connection, dry_run: bool): + def __init__(self, connection): self.connection = connection - self.dry_run = dry_run SELECT_ID_FROM_OVERRIDE_UUID = """ SELECT @@ -441,7 +440,52 @@ def __init__(self, connection, dry_run: bool): id = %(id)s; """ - def delete(self, person_override: SerializablePersonOverrideToDelete) -> None: + def insert(self, team_id: int, old_person_id: UUID, override_person_id: UUID) -> None: + with self.connection.cursor() as cursor: + person_ids = [] + for person_uuid in (override_person_id, old_person_id): + cursor.execute( + """ + INSERT INTO posthog_personoverridemapping( + team_id, + uuid + ) + VALUES ( + %(team_id)s, + %(uuid)s + ) + ON CONFLICT("team_id", "uuid") DO NOTHING + RETURNING id + """, + {"team_id": team_id, "uuid": person_uuid}, + ) + person_ids.append(cursor.fetchone()) + + cursor.execute( + """ + INSERT INTO posthog_personoverride( + team_id, + old_person_id, + override_person_id, + oldest_event, + version + ) + VALUES ( + %(team_id)s, + %(old_person_id)s, + %(override_person_id)s, + NOW(), + 1 + ); + """, + { + "team_id": team_id, + "old_person_id": person_ids[1], + "override_person_id": person_ids[0], + }, + ) + + def delete(self, person_override: SerializablePersonOverrideToDelete, dry_run: bool = False) -> None: with self.connection.cursor() as cursor: cursor.execute( self.SELECT_ID_FROM_OVERRIDE_UUID, @@ -476,7 +520,7 @@ def delete(self, person_override: SerializablePersonOverrideToDelete) -> None: "latest_version": person_override.latest_version, } - if self.dry_run is True: + if dry_run is True: activity.logger.info("This is a DRY RUN so nothing will be deleted.") activity.logger.info( "Would have run query: %s with parameters %s", @@ -501,6 +545,17 @@ def delete(self, person_override: SerializablePersonOverrideToDelete) -> None: }, ) + def clear(self, team_id: int) -> None: + with self.connection.cursor() as cursor: + cursor.execute( + "DELETE FROM posthog_personoverride WHERE team_id = %s", + [team_id], + ) + cursor.execute( + "DELETE FROM posthog_personoverridemapping WHERE team_id = %s", + [team_id], + ) + @activity.defn async def delete_squashed_person_overrides_from_postgres(inputs: QueryInputs) -> None: @@ -520,10 +575,10 @@ async def delete_squashed_person_overrides_from_postgres(inputs: QueryInputs) -> port=settings.DATABASES["default"]["PORT"], **settings.DATABASES["default"].get("SSL_OPTIONS", {}), ) as connection: - person_overrides_manager = PostgresPersonOverridesManager(connection, inputs.dry_run) + person_overrides_manager = PostgresPersonOverridesManager(connection) for person_override_to_delete in inputs.iter_person_overides_to_delete(): activity.logger.debug("%s", person_override_to_delete) - person_overrides_manager.delete(person_override_to_delete) + person_overrides_manager.delete(person_override_to_delete, inputs.dry_run) @contextlib.asynccontextmanager diff --git a/posthog/temporal/tests/test_squash_person_overrides_workflow.py b/posthog/temporal/tests/test_squash_person_overrides_workflow.py index fa72cb585b6b5..e518cb48b8fcb 100644 --- a/posthog/temporal/tests/test_squash_person_overrides_workflow.py +++ b/posthog/temporal/tests/test_squash_person_overrides_workflow.py @@ -23,6 +23,7 @@ PERSON_OVERRIDES_CREATE_TABLE_SQL, ) from posthog.temporal.batch_exports.squash_person_overrides import ( + PostgresPersonOverridesManager, QueryInputs, SerializablePersonOverrideToDelete, SquashPersonOverridesInputs, @@ -950,62 +951,16 @@ def person_overrides(query_inputs, team_id, pg_connection): person_override = PersonOverrideTuple(old_person_id, override_person_id) with pg_connection: - with pg_connection.cursor() as cursor: - person_ids = [] - for person_uuid in (override_person_id, old_person_id): - cursor.execute( - """ - INSERT INTO posthog_personoverridemapping( - team_id, - uuid - ) - VALUES ( - %(team_id)s, - %(uuid)s - ) - ON CONFLICT("team_id", "uuid") DO NOTHING - RETURNING id - """, - {"team_id": team_id, "uuid": person_uuid}, - ) - person_ids.append(cursor.fetchone()) - - cursor.execute( - """ - INSERT INTO posthog_personoverride( - team_id, - old_person_id, - override_person_id, - oldest_event, - version - ) - VALUES ( - %(team_id)s, - %(old_person_id)s, - %(override_person_id)s, - NOW(), - 1 - ); - """, - { - "team_id": team_id, - "old_person_id": person_ids[1], - "override_person_id": person_ids[0], - }, - ) + PostgresPersonOverridesManager(pg_connection).insert( + team_id, + old_person_id=person_override.old_person_id, + override_person_id=person_override.override_person_id, + ) yield person_override with pg_connection: - with pg_connection.cursor() as cursor: - cursor.execute( - "DELETE FROM posthog_personoverride WHERE team_id = %s AND old_person_id = %s", - [team_id, person_ids[1]], - ) - cursor.execute( - "DELETE FROM posthog_personoverridemapping WHERE team_id = %s AND (uuid = %s OR uuid = %s)", - [team_id, old_person_id, override_person_id], - ) + PostgresPersonOverridesManager(pg_connection).clear(team_id) @pytest.mark.django_db From 5273918b64ad0f12a52e9ee95e017c030040ed51 Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Thu, 14 Dec 2023 12:15:02 -0800 Subject: [PATCH 03/16] Unpack team ID fixture row into scalar value. --- posthog/temporal/tests/test_squash_person_overrides_workflow.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/posthog/temporal/tests/test_squash_person_overrides_workflow.py b/posthog/temporal/tests/test_squash_person_overrides_workflow.py index e518cb48b8fcb..147401da729b1 100644 --- a/posthog/temporal/tests/test_squash_person_overrides_workflow.py +++ b/posthog/temporal/tests/test_squash_person_overrides_workflow.py @@ -929,7 +929,7 @@ def team_id(query_inputs, organization_uuid, pg_connection): """, {"organization_uuid": organization_uuid}, ) - team_id = cursor.fetchone() + [team_id] = cursor.fetchone() yield team_id From 3d30396f6c5f20562fa33dc01c42e429139d369d Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Thu, 14 Dec 2023 12:18:22 -0800 Subject: [PATCH 04/16] Generalize tests where applicable to handle both modes. --- .../batch_exports/squash_person_overrides.py | 21 +++++++++ .../test_squash_person_overrides_workflow.py | 44 +++++-------------- 2 files changed, 31 insertions(+), 34 deletions(-) diff --git a/posthog/temporal/batch_exports/squash_person_overrides.py b/posthog/temporal/batch_exports/squash_person_overrides.py index b5b2a52bf0e3b..ab4900f0587a3 100644 --- a/posthog/temporal/batch_exports/squash_person_overrides.py +++ b/posthog/temporal/batch_exports/squash_person_overrides.py @@ -440,6 +440,27 @@ def __init__(self, connection): id = %(id)s; """ + def fetchall(self, team_id: int): + with self.connection.cursor() as cursor: + cursor.execute( + """ + SELECT + override.team_id, + old_person.uuid, + override_person.uuid + FROM posthog_personoverride override + LEFT OUTER JOIN + posthog_personoverridemapping old_person + ON override.team_id = old_person.team_id AND override.old_person_id = old_person.id + LEFT OUTER JOIN + posthog_personoverridemapping override_person + ON override.team_id = override_person.team_id AND override.override_person_id = override_person.id + WHERE override.team_id = %(team_id)s + """, + {"team_id": team_id}, + ) + return cursor.fetchall() + def insert(self, team_id: int, old_person_id: UUID, override_person_id: UUID) -> None: with self.connection.cursor() as cursor: person_ids = [] diff --git a/posthog/temporal/tests/test_squash_person_overrides_workflow.py b/posthog/temporal/tests/test_squash_person_overrides_workflow.py index 147401da729b1..703761b631a2b 100644 --- a/posthog/temporal/tests/test_squash_person_overrides_workflow.py +++ b/posthog/temporal/tests/test_squash_person_overrides_workflow.py @@ -976,14 +976,9 @@ async def test_delete_squashed_person_overrides_from_postgres( # These are sanity checks to ensure the fixtures are working properly. # If any assertions fail here, its likely a test setup issue. with pg_connection: - with pg_connection.cursor() as cursor: - cursor.execute("SELECT id, team_id, uuid FROM posthog_personoverridemapping") - mappings = cursor.fetchall() - assert len(mappings) == 2 - - cursor.execute("SELECT * FROM posthog_personoverride") - overrides = cursor.fetchall() - assert len(overrides) == 1 + assert PostgresPersonOverridesManager(pg_connection).fetchall(team_id) == [ + (team_id, person_overrides.old_person_id, person_overrides.override_person_id) + ] person_overrides_to_delete = [ SerializablePersonOverrideToDelete( @@ -1001,15 +996,7 @@ async def test_delete_squashed_person_overrides_from_postgres( await activity_environment.run(delete_squashed_person_overrides_from_postgres, query_inputs) with pg_connection: - with pg_connection.cursor() as cursor: - cursor.execute("SELECT team_id, uuid FROM posthog_personoverridemapping") - mappings = cursor.fetchall() - assert len(mappings) == 1 - assert mappings[0][1] == person_overrides.override_person_id - - cursor.execute("SELECT * FROM posthog_personoverride") - overrides = cursor.fetchall() - assert len(overrides) == 0 + assert PostgresPersonOverridesManager(pg_connection).fetchall(team_id) == [] @pytest.mark.django_db @@ -1021,14 +1008,9 @@ async def test_delete_squashed_person_overrides_from_postgres_dry_run( # These are sanity checks to ensure the fixtures are working properly. # If any assertions fail here, its likely a test setup issue. with pg_connection: - with pg_connection.cursor() as cursor: - cursor.execute("SELECT id, team_id, uuid FROM posthog_personoverridemapping") - mappings = cursor.fetchall() - assert len(mappings) == 2 - - cursor.execute("SELECT * FROM posthog_personoverride") - overrides = cursor.fetchall() - assert len(overrides) == 1 + assert PostgresPersonOverridesManager(pg_connection).fetchall(team_id) == [ + (team_id, person_overrides.old_person_id, person_overrides.override_person_id) + ] person_overrides_to_delete = [ SerializablePersonOverrideToDelete( @@ -1046,15 +1028,9 @@ async def test_delete_squashed_person_overrides_from_postgres_dry_run( await activity_environment.run(delete_squashed_person_overrides_from_postgres, query_inputs) with pg_connection: - with pg_connection.cursor() as cursor: - cursor.execute("SELECT team_id, uuid FROM posthog_personoverridemapping") - mappings = cursor.fetchall() - assert len(mappings) == 2 - assert mappings[0][1] == person_overrides.override_person_id - - cursor.execute("SELECT * FROM posthog_personoverride") - overrides = cursor.fetchall() - assert len(overrides) == 1 + assert PostgresPersonOverridesManager(pg_connection).fetchall(team_id) == [ + (team_id, person_overrides.old_person_id, person_overrides.override_person_id) + ] @pytest.mark.django_db From 842d53af30c4322117cbaeba72c043243aa1cdbf Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Thu, 14 Dec 2023 12:28:36 -0800 Subject: [PATCH 05/16] Inline queries in applicable functions --- .../batch_exports/squash_person_overrides.py | 70 +++++++++---------- 1 file changed, 35 insertions(+), 35 deletions(-) diff --git a/posthog/temporal/batch_exports/squash_person_overrides.py b/posthog/temporal/batch_exports/squash_person_overrides.py index ab4900f0587a3..58a1829f9ee3c 100644 --- a/posthog/temporal/batch_exports/squash_person_overrides.py +++ b/posthog/temporal/batch_exports/squash_person_overrides.py @@ -411,35 +411,6 @@ class PostgresPersonOverridesManager: def __init__(self, connection): self.connection = connection - SELECT_ID_FROM_OVERRIDE_UUID = """ - SELECT - id - FROM - posthog_personoverridemapping - WHERE - team_id = %(team_id)s - AND uuid = %(uuid)s; - """ - - DELETE_FROM_PERSON_OVERRIDES = """ - DELETE FROM - posthog_personoverride - WHERE - team_id = %(team_id)s - AND old_person_id = %(old_person_id)s - AND override_person_id = %(override_person_id)s - AND version = %(latest_version)s - RETURNING - old_person_id; - """ - - DELETE_FROM_PERSON_OVERRIDE_MAPPINGS = """ - DELETE FROM - posthog_personoverridemapping - WHERE - id = %(id)s; - """ - def fetchall(self, team_id: int): with self.connection.cursor() as cursor: cursor.execute( @@ -508,8 +479,18 @@ def insert(self, team_id: int, old_person_id: UUID, override_person_id: UUID) -> def delete(self, person_override: SerializablePersonOverrideToDelete, dry_run: bool = False) -> None: with self.connection.cursor() as cursor: + SELECT_ID_FROM_OVERRIDE_UUID = """ + SELECT + id + FROM + posthog_personoverridemapping + WHERE + team_id = %(team_id)s + AND uuid = %(uuid)s; + """ + cursor.execute( - self.SELECT_ID_FROM_OVERRIDE_UUID, + SELECT_ID_FROM_OVERRIDE_UUID, { "team_id": person_override.team_id, "uuid": person_override.old_person_id, @@ -522,7 +503,7 @@ def delete(self, person_override: SerializablePersonOverrideToDelete, dry_run: b old_person_id = row[0] cursor.execute( - self.SELECT_ID_FROM_OVERRIDE_UUID, + SELECT_ID_FROM_OVERRIDE_UUID, { "team_id": person_override.team_id, "uuid": person_override.override_person_id, @@ -534,6 +515,18 @@ def delete(self, person_override: SerializablePersonOverrideToDelete, dry_run: b override_person_id = row[0] + DELETE_FROM_PERSON_OVERRIDES = """ + DELETE FROM + posthog_personoverride + WHERE + team_id = %(team_id)s + AND old_person_id = %(old_person_id)s + AND override_person_id = %(override_person_id)s + AND version = %(latest_version)s + RETURNING + old_person_id; + """ + parameters = { "team_id": person_override.team_id, "old_person_id": old_person_id, @@ -545,12 +538,12 @@ def delete(self, person_override: SerializablePersonOverrideToDelete, dry_run: b activity.logger.info("This is a DRY RUN so nothing will be deleted.") activity.logger.info( "Would have run query: %s with parameters %s", - self.DELETE_FROM_PERSON_OVERRIDES, + DELETE_FROM_PERSON_OVERRIDES, parameters, ) return - cursor.execute(self.DELETE_FROM_PERSON_OVERRIDES, parameters) + cursor.execute(DELETE_FROM_PERSON_OVERRIDES, parameters) row = cursor.fetchone() if not row: # There is no existing mapping for this (old_person_id, override_person_id) pair. @@ -559,10 +552,17 @@ def delete(self, person_override: SerializablePersonOverrideToDelete, dry_run: b deleted_id = row[0] + DELETE_FROM_PERSON_OVERRIDE_MAPPINGS = """ + DELETE FROM + posthog_personoverridemapping + WHERE + id = %(deleted_id)s; + """ + cursor.execute( - self.DELETE_FROM_PERSON_OVERRIDE_MAPPINGS, + DELETE_FROM_PERSON_OVERRIDE_MAPPINGS, { - "id": deleted_id, + "deleted_id": deleted_id, }, ) From 8af86032387d8ac4b46a80e676cb00aa0c18537e Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Thu, 14 Dec 2023 12:34:02 -0800 Subject: [PATCH 06/16] Add `FlatPostgresPersonOverridesManager` --- .../batch_exports/squash_person_overrides.py | 83 +++++++++++++++++++ 1 file changed, 83 insertions(+) diff --git a/posthog/temporal/batch_exports/squash_person_overrides.py b/posthog/temporal/batch_exports/squash_person_overrides.py index 58a1829f9ee3c..18c72ef0fd738 100644 --- a/posthog/temporal/batch_exports/squash_person_overrides.py +++ b/posthog/temporal/batch_exports/squash_person_overrides.py @@ -578,6 +578,89 @@ def clear(self, team_id: int) -> None: ) +class FlatPostgresPersonOverridesManager: + def __init__(self, connection): + self.connection = connection + + def fetchall(self, team_id: int): + with self.connection.cursor() as cursor: + cursor.execute( + """ + SELECT + team_id, + old_person_id, + override_person_id + FROM posthog_flatpersonoverride + WHERE team_id = %(team_id)s + """, + {"team_id": team_id}, + ) + return cursor.fetchall() + + def insert(self, team_id: int, old_person_id: UUID, override_person_id: UUID) -> None: + with self.connection.cursor() as cursor: + cursor.execute( + """ + INSERT INTO posthog_flatpersonoverride( + team_id, + old_person_id, + override_person_id, + oldest_event, + version + ) + VALUES ( + %(team_id)s, + %(old_person_id)s, + %(override_person_id)s, + NOW(), + 1 + ); + """, + { + "team_id": team_id, + "old_person_id": old_person_id, + "override_person_id": override_person_id, + }, + ) + + def delete(self, person_override: SerializablePersonOverrideToDelete, dry_run: bool = False) -> None: + query = """ + DELETE FROM + posthog_flatpersonoverride + WHERE + team_id = %(team_id)s + AND old_person_id = %(old_person_id)s + AND override_person_id = %(override_person_id)s + AND version = %(latest_version)s + """ + + parameters = { + "team_id": person_override.team_id, + "old_person_id": person_override.old_person_id, + "override_person_id": person_override.override_person_id, + "latest_version": person_override.latest_version, + } + + if dry_run is True: + activity.logger.info("This is a DRY RUN so nothing will be deleted.") + activity.logger.info( + "Would have run query: %s with parameters %s", + query, + parameters, + ) + return + + with self.connection.cursor() as cursor: + cursor.execute(query, parameters) + + def clear(self, team_id: int) -> None: + with self.connection.cursor() as cursor: + cursor.execute( + "DELETE FROM posthog_flatpersonoverride WHERE team_id = %s", + [team_id], + ) + + @activity.defn async def delete_squashed_person_overrides_from_postgres(inputs: QueryInputs) -> None: """Execute the query to delete from Postgres persons that have been squashed. From 78f842677960d962cc7eae07a415b35e93ace154 Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Thu, 14 Dec 2023 12:42:33 -0800 Subject: [PATCH 07/16] A working yet very questionable integration into the workflow and tests --- .../batch_exports/squash_person_overrides.py | 14 +++++++- .../test_squash_person_overrides_workflow.py | 33 ++++++++++++++----- 2 files changed, 38 insertions(+), 9 deletions(-) diff --git a/posthog/temporal/batch_exports/squash_person_overrides.py b/posthog/temporal/batch_exports/squash_person_overrides.py index 18c72ef0fd738..179011d31e9d3 100644 --- a/posthog/temporal/batch_exports/squash_person_overrides.py +++ b/posthog/temporal/batch_exports/squash_person_overrides.py @@ -154,6 +154,7 @@ class QueryInputs: person_overrides_to_delete: list[SerializablePersonOverrideToDelete] = field(default_factory=list) dictionary_name: str = "person_overrides_join_dict" dry_run: bool = True + overrides_manager: str = "mappings" # XXX _latest_created_at: str | datetime | None = None def __post_init__(self) -> None: @@ -183,6 +184,9 @@ def iter_person_overides_to_delete(self) -> Iterable[SerializablePersonOverrideT for person_override_to_delete in self.person_overrides_to_delete: yield SerializablePersonOverrideToDelete(*person_override_to_delete) + def get_overrides_manager(self, connection): + return PERSON_OVERRIDES_MANAGERS[self.overrides_manager](connection) + @activity.defn async def prepare_person_overrides(inputs: QueryInputs) -> None: @@ -661,6 +665,12 @@ def clear(self, team_id: int) -> None: ) +PERSON_OVERRIDES_MANAGERS = { + "mappings": PostgresPersonOverridesManager, + "flat": FlatPostgresPersonOverridesManager, +} + + @activity.defn async def delete_squashed_person_overrides_from_postgres(inputs: QueryInputs) -> None: """Execute the query to delete from Postgres persons that have been squashed. @@ -679,7 +689,7 @@ async def delete_squashed_person_overrides_from_postgres(inputs: QueryInputs) -> port=settings.DATABASES["default"]["PORT"], **settings.DATABASES["default"].get("SSL_OPTIONS", {}), ) as connection: - person_overrides_manager = PostgresPersonOverridesManager(connection) + person_overrides_manager = inputs.get_overrides_manager(connection) for person_override_to_delete in inputs.iter_person_overides_to_delete(): activity.logger.debug("%s", person_override_to_delete) person_overrides_manager.delete(person_override_to_delete, inputs.dry_run) @@ -746,6 +756,7 @@ class SquashPersonOverridesInputs: dictionary_name: str = "person_overrides_join_dict" last_n_months: int = 1 dry_run: bool = True + overrides_manager: str = "mappings" def iter_partition_ids(self) -> Iterator[str]: """Iterate over configured partition ids. @@ -865,6 +876,7 @@ async def run(self, inputs: SquashPersonOverridesInputs): dictionary_name=inputs.dictionary_name, team_ids=inputs.team_ids, dry_run=inputs.dry_run, + overrides_manager=inputs.overrides_manager, ) async with person_overrides_dictionary( diff --git a/posthog/temporal/tests/test_squash_person_overrides_workflow.py b/posthog/temporal/tests/test_squash_person_overrides_workflow.py index 703761b631a2b..f557c618d0a2f 100644 --- a/posthog/temporal/tests/test_squash_person_overrides_workflow.py +++ b/posthog/temporal/tests/test_squash_person_overrides_workflow.py @@ -23,6 +23,7 @@ PERSON_OVERRIDES_CREATE_TABLE_SQL, ) from posthog.temporal.batch_exports.squash_person_overrides import ( + PERSON_OVERRIDES_MANAGERS, PostgresPersonOverridesManager, QueryInputs, SerializablePersonOverrideToDelete, @@ -938,20 +939,27 @@ def team_id(query_inputs, organization_uuid, pg_connection): cursor.execute("DELETE FROM posthog_team WHERE id = %s", [team_id]) +@pytest.fixture(params=PERSON_OVERRIDES_MANAGERS.keys()) +def overrides_manager(request): + yield request.param + + @pytest.fixture -def person_overrides(query_inputs, team_id, pg_connection): +def person_overrides(query_inputs, team_id, pg_connection, overrides_manager): """Create a PersonOverrideMapping and a PersonOverride. We cannot use the Django ORM safely in an async context, so we INSERT INTO directly on the database. This means we need to clean up after ourselves, which we do after yielding. """ + query_inputs.overrides_manager = overrides_manager # XXX this is truly awful + old_person_id = uuid4() override_person_id = uuid4() person_override = PersonOverrideTuple(old_person_id, override_person_id) with pg_connection: - PostgresPersonOverridesManager(pg_connection).insert( + query_inputs.get_overrides_manager(pg_connection).insert( team_id, old_person_id=person_override.old_person_id, override_person_id=person_override.override_person_id, @@ -960,7 +968,7 @@ def person_overrides(query_inputs, team_id, pg_connection): yield person_override with pg_connection: - PostgresPersonOverridesManager(pg_connection).clear(team_id) + query_inputs.get_overrides_manager(pg_connection).clear(team_id) @pytest.mark.django_db @@ -976,7 +984,7 @@ async def test_delete_squashed_person_overrides_from_postgres( # These are sanity checks to ensure the fixtures are working properly. # If any assertions fail here, its likely a test setup issue. with pg_connection: - assert PostgresPersonOverridesManager(pg_connection).fetchall(team_id) == [ + assert query_inputs.get_overrides_manager(pg_connection).fetchall(team_id) == [ (team_id, person_overrides.old_person_id, person_overrides.override_person_id) ] @@ -996,7 +1004,7 @@ async def test_delete_squashed_person_overrides_from_postgres( await activity_environment.run(delete_squashed_person_overrides_from_postgres, query_inputs) with pg_connection: - assert PostgresPersonOverridesManager(pg_connection).fetchall(team_id) == [] + assert query_inputs.get_overrides_manager(pg_connection).fetchall(team_id) == [] @pytest.mark.django_db @@ -1008,7 +1016,7 @@ async def test_delete_squashed_person_overrides_from_postgres_dry_run( # These are sanity checks to ensure the fixtures are working properly. # If any assertions fail here, its likely a test setup issue. with pg_connection: - assert PostgresPersonOverridesManager(pg_connection).fetchall(team_id) == [ + assert query_inputs.get_overrides_manager(pg_connection).fetchall(team_id) == [ (team_id, person_overrides.old_person_id, person_overrides.override_person_id) ] @@ -1028,7 +1036,7 @@ async def test_delete_squashed_person_overrides_from_postgres_dry_run( await activity_environment.run(delete_squashed_person_overrides_from_postgres, query_inputs) with pg_connection: - assert PostgresPersonOverridesManager(pg_connection).fetchall(team_id) == [ + assert query_inputs.get_overrides_manager(pg_connection).fetchall(team_id) == [ (team_id, person_overrides.old_person_id, person_overrides.override_person_id) ] @@ -1043,9 +1051,13 @@ async def test_delete_squashed_person_overrides_from_postgres_with_newer_overrid For the purposes of this unit test, we take the person overrides as given. A comprehensive test will cover the entire worflow end-to-end. """ + # These are sanity checks to ensure the fixtures are working properly. # If any assertions fail here, its likely a test setup issue. with pg_connection: + if not isinstance(query_inputs.get_overrides_manager(pg_connection), PostgresPersonOverridesManager): + pytest.xfail("overrides manager not supported") + with pg_connection.cursor() as cursor: cursor.execute("SELECT id, team_id, uuid FROM posthog_personoverridemapping") mappings = cursor.fetchall() @@ -1127,6 +1139,7 @@ async def test_squash_person_overrides_workflow( person_overrides_data, person_overrides, person_overrides_table, + overrides_manager, ): """Test the squash_person_overrides workflow end-to-end.""" client = await Client.connect( @@ -1138,6 +1151,7 @@ async def test_squash_person_overrides_workflow( inputs = SquashPersonOverridesInputs( partition_ids=["202001"], dry_run=False, + overrides_manager=overrides_manager, ) async with Worker( @@ -1176,6 +1190,7 @@ async def test_squash_person_overrides_workflow_with_newer_overrides( person_overrides_data, person_overrides, newer_overrides, + overrides_manager, ): """Test the squash_person_overrides workflow end-to-end with newer overrides.""" client = await Client.connect( @@ -1187,6 +1202,7 @@ async def test_squash_person_overrides_workflow_with_newer_overrides( inputs = SquashPersonOverridesInputs( partition_ids=["202001"], dry_run=False, + overrides_manager=overrides_manager, ) async with Worker( @@ -1217,7 +1233,7 @@ async def test_squash_person_overrides_workflow_with_newer_overrides( @pytest.mark.django_db @pytest.mark.asyncio async def test_squash_person_overrides_workflow_with_limited_team_ids( - query_inputs, events_to_override, person_overrides_data, person_overrides + query_inputs, events_to_override, person_overrides_data, person_overrides, overrides_manager ): """Test the squash_person_overrides workflow end-to-end.""" client = await Client.connect( @@ -1230,6 +1246,7 @@ async def test_squash_person_overrides_workflow_with_limited_team_ids( inputs = SquashPersonOverridesInputs( partition_ids=["202001"], team_ids=[random_team], + overrides_manager=overrides_manager, dry_run=False, ) From 0d5634a9540f9d2ff45366d0fa28117369608e87 Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Thu, 14 Dec 2023 17:14:23 -0800 Subject: [PATCH 08/16] Clean up some names and organization. --- .../batch_exports/squash_person_overrides.py | 749 +++++++++--------- .../test_squash_person_overrides_workflow.py | 27 +- 2 files changed, 390 insertions(+), 386 deletions(-) diff --git a/posthog/temporal/batch_exports/squash_person_overrides.py b/posthog/temporal/batch_exports/squash_person_overrides.py index 179011d31e9d3..3c5fe2313ab67 100644 --- a/posthog/temporal/batch_exports/squash_person_overrides.py +++ b/posthog/temporal/batch_exports/squash_person_overrides.py @@ -132,6 +132,269 @@ class SerializablePersonOverrideToDelete(NamedTuple): oldest_event_at: str +class PostgresPersonOverridesManager: + def __init__(self, connection): + self.connection = connection + + def fetchall(self, team_id: int): + with self.connection.cursor() as cursor: + cursor.execute( + """ + SELECT + override.team_id, + old_person.uuid, + override_person.uuid + FROM posthog_personoverride override + LEFT OUTER JOIN + posthog_personoverridemapping old_person + ON override.team_id = old_person.team_id AND override.old_person_id = old_person.id + LEFT OUTER JOIN + posthog_personoverridemapping override_person + ON override.team_id = override_person.team_id AND override.override_person_id = override_person.id + WHERE override.team_id = %(team_id)s + """, + {"team_id": team_id}, + ) + return cursor.fetchall() + + def insert(self, team_id: int, old_person_id: UUID, override_person_id: UUID) -> None: + with self.connection.cursor() as cursor: + person_ids = [] + for person_uuid in (override_person_id, old_person_id): + cursor.execute( + """ + INSERT INTO posthog_personoverridemapping( + team_id, + uuid + ) + VALUES ( + %(team_id)s, + %(uuid)s + ) + ON CONFLICT("team_id", "uuid") DO NOTHING + RETURNING id + """, + {"team_id": team_id, "uuid": person_uuid}, + ) + person_ids.append(cursor.fetchone()) + + cursor.execute( + """ + INSERT INTO posthog_personoverride( + team_id, + old_person_id, + override_person_id, + oldest_event, + version + ) + VALUES ( + %(team_id)s, + %(old_person_id)s, + %(override_person_id)s, + NOW(), + 1 + ); + """, + { + "team_id": team_id, + "old_person_id": person_ids[1], + "override_person_id": person_ids[0], + }, + ) + + def delete(self, person_override: SerializablePersonOverrideToDelete, dry_run: bool = False) -> None: + with self.connection.cursor() as cursor: + SELECT_ID_FROM_OVERRIDE_UUID = """ + SELECT + id + FROM + posthog_personoverridemapping + WHERE + team_id = %(team_id)s + AND uuid = %(uuid)s; + """ + + cursor.execute( + SELECT_ID_FROM_OVERRIDE_UUID, + { + "team_id": person_override.team_id, + "uuid": person_override.old_person_id, + }, + ) + row = cursor.fetchone() + if not row: + return + + old_person_id = row[0] + + cursor.execute( + SELECT_ID_FROM_OVERRIDE_UUID, + { + "team_id": person_override.team_id, + "uuid": person_override.override_person_id, + }, + ) + row = cursor.fetchone() + if not row: + return + + override_person_id = row[0] + + DELETE_FROM_PERSON_OVERRIDES = """ + DELETE FROM + posthog_personoverride + WHERE + team_id = %(team_id)s + AND old_person_id = %(old_person_id)s + AND override_person_id = %(override_person_id)s + AND version = %(latest_version)s + RETURNING + old_person_id; + """ + + parameters = { + "team_id": person_override.team_id, + "old_person_id": old_person_id, + "override_person_id": override_person_id, + "latest_version": person_override.latest_version, + } + + if dry_run is True: + activity.logger.info("This is a DRY RUN so nothing will be deleted.") + activity.logger.info( + "Would have run query: %s with parameters %s", + DELETE_FROM_PERSON_OVERRIDES, + parameters, + ) + return + + cursor.execute(DELETE_FROM_PERSON_OVERRIDES, parameters) + row = cursor.fetchone() + if not row: + # There is no existing mapping for this (old_person_id, override_person_id) pair. + # It could be that a newer one was added (with a later version). + return + + deleted_id = row[0] + + DELETE_FROM_PERSON_OVERRIDE_MAPPINGS = """ + DELETE FROM + posthog_personoverridemapping + WHERE + id = %(deleted_id)s; + """ + + cursor.execute( + DELETE_FROM_PERSON_OVERRIDE_MAPPINGS, + { + "deleted_id": deleted_id, + }, + ) + + def clear(self, team_id: int) -> None: + with self.connection.cursor() as cursor: + cursor.execute( + "DELETE FROM posthog_personoverride WHERE team_id = %s", + [team_id], + ) + cursor.execute( + "DELETE FROM posthog_personoverridemapping WHERE team_id = %s", + [team_id], + ) + + +class FlatPostgresPersonOverridesManager: + def __init__(self, connection): + self.connection = connection + + def fetchall(self, team_id: int): + with self.connection.cursor() as cursor: + cursor.execute( + """ + SELECT + team_id, + old_person_id, + override_person_id + FROM posthog_flatpersonoverride + WHERE team_id = %(team_id)s + """, + {"team_id": team_id}, + ) + return cursor.fetchall() + + def insert(self, team_id: int, old_person_id: UUID, override_person_id: UUID) -> None: + with self.connection.cursor() as cursor: + cursor.execute( + """ + INSERT INTO posthog_flatpersonoverride( + team_id, + old_person_id, + override_person_id, + oldest_event, + version + ) + VALUES ( + %(team_id)s, + %(old_person_id)s, + %(override_person_id)s, + NOW(), + 1 + ); + """, + { + "team_id": team_id, + "old_person_id": old_person_id, + "override_person_id": override_person_id, + }, + ) + + def delete(self, person_override: SerializablePersonOverrideToDelete, dry_run: bool = False) -> None: + query = """ + DELETE FROM + posthog_flatpersonoverride + WHERE + team_id = %(team_id)s + AND old_person_id = %(old_person_id)s + AND override_person_id = %(override_person_id)s + AND version = %(latest_version)s + """ + + parameters = { + "team_id": person_override.team_id, + "old_person_id": person_override.old_person_id, + "override_person_id": person_override.override_person_id, + "latest_version": person_override.latest_version, + } + + if dry_run is True: + activity.logger.info("This is a DRY RUN so nothing will be deleted.") + activity.logger.info( + "Would have run query: %s with parameters %s", + query, + parameters, + ) + return + + with self.connection.cursor() as cursor: + cursor.execute(query, parameters) + + def clear(self, team_id: int) -> None: + with self.connection.cursor() as cursor: + cursor.execute( + "DELETE FROM posthog_flatpersonoverride WHERE team_id = %s", + [team_id], + ) + + +POSTGRES_PERSON_OVERRIDES_MANAGERS = { + "mappings": PostgresPersonOverridesManager, + "flat": FlatPostgresPersonOverridesManager, +} + +DEFAULT_POSTGRES_PERSON_OVERRIDES_MANAGER = "mappings" +assert DEFAULT_POSTGRES_PERSON_OVERRIDES_MANAGER in POSTGRES_PERSON_OVERRIDES_MANAGERS + + @dataclass class QueryInputs: """Inputs for activities that run queries in the SquashPersonOverrides workflow. @@ -154,8 +417,8 @@ class QueryInputs: person_overrides_to_delete: list[SerializablePersonOverrideToDelete] = field(default_factory=list) dictionary_name: str = "person_overrides_join_dict" dry_run: bool = True - overrides_manager: str = "mappings" # XXX _latest_created_at: str | datetime | None = None + postgres_person_overrides_manager: str = DEFAULT_POSTGRES_PERSON_OVERRIDES_MANAGER def __post_init__(self) -> None: if isinstance(self._latest_created_at, datetime): @@ -184,8 +447,8 @@ def iter_person_overides_to_delete(self) -> Iterable[SerializablePersonOverrideT for person_override_to_delete in self.person_overrides_to_delete: yield SerializablePersonOverrideToDelete(*person_override_to_delete) - def get_overrides_manager(self, connection): - return PERSON_OVERRIDES_MANAGERS[self.overrides_manager](connection) + def get_postgres_person_overrides_manager(self, connection): + return POSTGRES_PERSON_OVERRIDES_MANAGERS[self.postgres_person_overrides_manager](connection) @activity.defn @@ -277,398 +540,138 @@ async def select_persons_to_delete(inputs: QueryInputs) -> list[SerializablePers {"latest_created_at": latest_created_at}, ) - if not isinstance(to_delete_rows, list): - # Could return None if no results or int if this were an insert. - # Mostly to appease type checker - return [] - - # We need to be absolutely sure which is the oldest event for a given person - # as we cannot delete persons that have events in the past that aren't being - # squashed by this workflow. - persons_to_delete = [] - older_persons_to_delete = [] - for row in to_delete_rows: - person_to_delete = PersonOverrideToDelete._make(row) - person_oldest_event_at = person_to_delete.oldest_event_at - - try: - absolute_oldest_event_at = sync_execute( - SELECT_CREATED_AT_FOR_PERSON_EVENT_QUERY.format(database=settings.CLICKHOUSE_DATABASE), - { - "team_id": person_to_delete.team_id, - "old_person_id": person_to_delete.old_person_id, - "oldest_event_at": person_oldest_event_at, - }, - )[0][0] - - except IndexError: - # Let's be safe and treat this as no rows found. - absolute_oldest_event_at = EPOCH - - # ClickHouse min() likes to return the epoch when no rows found. - # Granted, I'm assuming that we were not ingesting events in 1970... - if absolute_oldest_event_at != EPOCH: - min_oldest_event_at = min( - person_oldest_event_at, - absolute_oldest_event_at, - ) - else: - min_oldest_event_at = person_oldest_event_at - - person_to_delete = person_to_delete._replace(oldest_event_at=min_oldest_event_at) - - if person_to_delete.is_in_partitions(inputs.partition_ids): - persons_to_delete.append(person_to_delete._make_serializable()) - else: - older_persons_to_delete.append(person_to_delete) - - # There could be older overrides that we haven't cleaned up yet. - # As the squash logic will always prefer new ones, there is no reason to keep the - # older ones around if we schedule the old ones to be deleted. - # So, let's delete those too. - persons_to_delete_ids = set(person.old_person_id for person in persons_to_delete) - persons_to_delete.extend( - ( - older_person._make_serializable() - for older_person in older_persons_to_delete - if older_person.old_person_id in persons_to_delete_ids - ) - ) - - return persons_to_delete - - -@activity.defn -async def squash_events_partition(inputs: QueryInputs) -> None: - """Execute the squash query for a given partition_id and persons to_override. - - As ClickHouse doesn't support an UPDATE ... FROM statement ala PostgreSQL, we must - do this in 4 basic steps: - - 1. Stop ingesting data into person_overrides. - 2. Build a DICTIONARY from person_overrides. - 3. Perform ALTER TABLE UPDATE using dictGet to query the DICTIONARY. - 4. Clean up the DICTIONARY once done. - """ - from django.conf import settings - - query = SQUASH_EVENTS_QUERY - - latest_created_at = inputs.latest_created_at.timestamp() if inputs.latest_created_at else inputs.latest_created_at - - for partition_id in inputs.partition_ids: - activity.logger.info("Executing squash query on partition %s", partition_id) - - parameters = { - "partition_id": partition_id, - "team_ids": inputs.team_ids, - # We pass this as a timestamp ourselves as clickhouse-driver will drop any microseconds from the datetime. - # This would cause the latest merge event to be ignored. - # See: https://github.com/mymarilyn/clickhouse-driver/issues/306 - "latest_created_at": latest_created_at, - } - - if inputs.dry_run is True: - activity.logger.info("This is a DRY RUN so nothing will be squashed.") - activity.logger.info("Would have run query: %s with parameters %s", query, parameters) - continue - - sync_execute( - query.format( - database=settings.CLICKHOUSE_DATABASE, - dictionary_name=inputs.dictionary_name, - team_id_filter="AND team_id in %(team_ids)s" if inputs.team_ids else "", - ), - parameters, - ) - - -@activity.defn -async def delete_squashed_person_overrides_from_clickhouse(inputs: QueryInputs) -> None: - """Execute the query to delete persons from ClickHouse that have been squashed.""" - from django.conf import settings - - activity.logger.info("Deleting squashed persons from ClickHouse") - - old_person_ids_to_delete = tuple(person.old_person_id for person in inputs.iter_person_overides_to_delete()) - activity.logger.debug("%s", old_person_ids_to_delete) - - query = DELETE_SQUASHED_PERSON_OVERRIDES_QUERY - latest_created_at = inputs.latest_created_at.timestamp() if inputs.latest_created_at else inputs.latest_created_at - parameters = { - "old_person_ids": old_person_ids_to_delete, - # We pass this as a timestamp ourselves as clickhouse-driver will drop any microseconds from the datetime. - # This would cause the latest merge event to be ignored. - # See: https://github.com/mymarilyn/clickhouse-driver/issues/306 - "latest_created_at": latest_created_at, - } - - if inputs.dry_run is True: - activity.logger.info("This is a DRY RUN so nothing will be deleted.") - activity.logger.info("Would have run query: %s with parameters %s", query, parameters) - return - - sync_execute(query.format(database=settings.CLICKHOUSE_DATABASE), parameters) - - -class PostgresPersonOverridesManager: - def __init__(self, connection): - self.connection = connection - - def fetchall(self, team_id: int): - with self.connection.cursor() as cursor: - cursor.execute( - """ - SELECT - override.team_id, - old_person.uuid, - override_person.uuid - FROM posthog_personoverride override - LEFT OUTER JOIN - posthog_personoverridemapping old_person - ON override.team_id = old_person.team_id AND override.old_person_id = old_person.id - LEFT OUTER JOIN - posthog_personoverridemapping override_person - ON override.team_id = override_person.team_id AND override.override_person_id = override_person.id - WHERE override.team_id = %(team_id)s - """, - {"team_id": team_id}, - ) - return cursor.fetchall() - - def insert(self, team_id: int, old_person_id: UUID, override_person_id: UUID) -> None: - with self.connection.cursor() as cursor: - person_ids = [] - for person_uuid in (override_person_id, old_person_id): - cursor.execute( - """ - INSERT INTO posthog_personoverridemapping( - team_id, - uuid - ) - VALUES ( - %(team_id)s, - %(uuid)s - ) - ON CONFLICT("team_id", "uuid") DO NOTHING - RETURNING id - """, - {"team_id": team_id, "uuid": person_uuid}, - ) - person_ids.append(cursor.fetchone()) - - cursor.execute( - """ - INSERT INTO posthog_personoverride( - team_id, - old_person_id, - override_person_id, - oldest_event, - version - ) - VALUES ( - %(team_id)s, - %(old_person_id)s, - %(override_person_id)s, - NOW(), - 1 - ); - """, - { - "team_id": team_id, - "old_person_id": person_ids[1], - "override_person_id": person_ids[0], - }, - ) - - def delete(self, person_override: SerializablePersonOverrideToDelete, dry_run: bool = False) -> None: - with self.connection.cursor() as cursor: - SELECT_ID_FROM_OVERRIDE_UUID = """ - SELECT - id - FROM - posthog_personoverridemapping - WHERE - team_id = %(team_id)s - AND uuid = %(uuid)s; - """ + if not isinstance(to_delete_rows, list): + # Could return None if no results or int if this were an insert. + # Mostly to appease type checker + return [] - cursor.execute( - SELECT_ID_FROM_OVERRIDE_UUID, + # We need to be absolutely sure which is the oldest event for a given person + # as we cannot delete persons that have events in the past that aren't being + # squashed by this workflow. + persons_to_delete = [] + older_persons_to_delete = [] + for row in to_delete_rows: + person_to_delete = PersonOverrideToDelete._make(row) + person_oldest_event_at = person_to_delete.oldest_event_at + + try: + absolute_oldest_event_at = sync_execute( + SELECT_CREATED_AT_FOR_PERSON_EVENT_QUERY.format(database=settings.CLICKHOUSE_DATABASE), { - "team_id": person_override.team_id, - "uuid": person_override.old_person_id, + "team_id": person_to_delete.team_id, + "old_person_id": person_to_delete.old_person_id, + "oldest_event_at": person_oldest_event_at, }, - ) - row = cursor.fetchone() - if not row: - return + )[0][0] - old_person_id = row[0] + except IndexError: + # Let's be safe and treat this as no rows found. + absolute_oldest_event_at = EPOCH - cursor.execute( - SELECT_ID_FROM_OVERRIDE_UUID, - { - "team_id": person_override.team_id, - "uuid": person_override.override_person_id, - }, + # ClickHouse min() likes to return the epoch when no rows found. + # Granted, I'm assuming that we were not ingesting events in 1970... + if absolute_oldest_event_at != EPOCH: + min_oldest_event_at = min( + person_oldest_event_at, + absolute_oldest_event_at, ) - row = cursor.fetchone() - if not row: - return + else: + min_oldest_event_at = person_oldest_event_at - override_person_id = row[0] + person_to_delete = person_to_delete._replace(oldest_event_at=min_oldest_event_at) - DELETE_FROM_PERSON_OVERRIDES = """ - DELETE FROM - posthog_personoverride - WHERE - team_id = %(team_id)s - AND old_person_id = %(old_person_id)s - AND override_person_id = %(override_person_id)s - AND version = %(latest_version)s - RETURNING - old_person_id; - """ + if person_to_delete.is_in_partitions(inputs.partition_ids): + persons_to_delete.append(person_to_delete._make_serializable()) + else: + older_persons_to_delete.append(person_to_delete) - parameters = { - "team_id": person_override.team_id, - "old_person_id": old_person_id, - "override_person_id": override_person_id, - "latest_version": person_override.latest_version, - } + # There could be older overrides that we haven't cleaned up yet. + # As the squash logic will always prefer new ones, there is no reason to keep the + # older ones around if we schedule the old ones to be deleted. + # So, let's delete those too. + persons_to_delete_ids = set(person.old_person_id for person in persons_to_delete) + persons_to_delete.extend( + ( + older_person._make_serializable() + for older_person in older_persons_to_delete + if older_person.old_person_id in persons_to_delete_ids + ) + ) - if dry_run is True: - activity.logger.info("This is a DRY RUN so nothing will be deleted.") - activity.logger.info( - "Would have run query: %s with parameters %s", - DELETE_FROM_PERSON_OVERRIDES, - parameters, - ) - return + return persons_to_delete - cursor.execute(DELETE_FROM_PERSON_OVERRIDES, parameters) - row = cursor.fetchone() - if not row: - # There is no existing mapping for this (old_person_id, override_person_id) pair. - # It could be that a newer one was added (with a later version). - return - deleted_id = row[0] +@activity.defn +async def squash_events_partition(inputs: QueryInputs) -> None: + """Execute the squash query for a given partition_id and persons to_override. - DELETE_FROM_PERSON_OVERRIDE_MAPPINGS = """ - DELETE FROM - posthog_personoverridemapping - WHERE - id = %(deleted_id)s; - """ + As ClickHouse doesn't support an UPDATE ... FROM statement ala PostgreSQL, we must + do this in 4 basic steps: - cursor.execute( - DELETE_FROM_PERSON_OVERRIDE_MAPPINGS, - { - "deleted_id": deleted_id, - }, - ) + 1. Stop ingesting data into person_overrides. + 2. Build a DICTIONARY from person_overrides. + 3. Perform ALTER TABLE UPDATE using dictGet to query the DICTIONARY. + 4. Clean up the DICTIONARY once done. + """ + from django.conf import settings - def clear(self, team_id: int) -> None: - with self.connection.cursor() as cursor: - cursor.execute( - "DELETE FROM posthog_personoverride WHERE team_id = %s", - [team_id], - ) - cursor.execute( - "DELETE FROM posthog_personoverridemapping WHERE team_id = %s", - [team_id], - ) + query = SQUASH_EVENTS_QUERY + latest_created_at = inputs.latest_created_at.timestamp() if inputs.latest_created_at else inputs.latest_created_at -class FlatPostgresPersonOverridesManager: - def __init__(self, connection): - self.connection = connection + for partition_id in inputs.partition_ids: + activity.logger.info("Executing squash query on partition %s", partition_id) - def fetchall(self, team_id: int): - with self.connection.cursor() as cursor: - cursor.execute( - """ - SELECT - team_id, - old_person_id, - override_person_id - FROM posthog_flatpersonoverride - WHERE team_id = %(team_id)s - """, - {"team_id": team_id}, - ) - return cursor.fetchall() + parameters = { + "partition_id": partition_id, + "team_ids": inputs.team_ids, + # We pass this as a timestamp ourselves as clickhouse-driver will drop any microseconds from the datetime. + # This would cause the latest merge event to be ignored. + # See: https://github.com/mymarilyn/clickhouse-driver/issues/306 + "latest_created_at": latest_created_at, + } - def insert(self, team_id: int, old_person_id: UUID, override_person_id: UUID) -> None: - with self.connection.cursor() as cursor: - cursor.execute( - """ - INSERT INTO posthog_flatpersonoverride( - team_id, - old_person_id, - override_person_id, - oldest_event, - version - ) - VALUES ( - %(team_id)s, - %(old_person_id)s, - %(override_person_id)s, - NOW(), - 1 - ); - """, - { - "team_id": team_id, - "old_person_id": old_person_id, - "override_person_id": override_person_id, - }, - ) + if inputs.dry_run is True: + activity.logger.info("This is a DRY RUN so nothing will be squashed.") + activity.logger.info("Would have run query: %s with parameters %s", query, parameters) + continue - def delete(self, person_override: SerializablePersonOverrideToDelete, dry_run: bool = False) -> None: - query = """ - DELETE FROM - posthog_flatpersonoverride - WHERE - team_id = %(team_id)s - AND old_person_id = %(old_person_id)s - AND override_person_id = %(override_person_id)s - AND version = %(latest_version)s - """ + sync_execute( + query.format( + database=settings.CLICKHOUSE_DATABASE, + dictionary_name=inputs.dictionary_name, + team_id_filter="AND team_id in %(team_ids)s" if inputs.team_ids else "", + ), + parameters, + ) - parameters = { - "team_id": person_override.team_id, - "old_person_id": person_override.old_person_id, - "override_person_id": person_override.override_person_id, - "latest_version": person_override.latest_version, - } - if dry_run is True: - activity.logger.info("This is a DRY RUN so nothing will be deleted.") - activity.logger.info( - "Would have run query: %s with parameters %s", - query, - parameters, - ) - return +@activity.defn +async def delete_squashed_person_overrides_from_clickhouse(inputs: QueryInputs) -> None: + """Execute the query to delete persons from ClickHouse that have been squashed.""" + from django.conf import settings - with self.connection.cursor() as cursor: - cursor.execute(query, parameters) + activity.logger.info("Deleting squashed persons from ClickHouse") - def clear(self, team_id: int) -> None: - with self.connection.cursor() as cursor: - cursor.execute( - "DELETE FROM posthog_flatpersonoverride WHERE team_id = %s", - [team_id], - ) + old_person_ids_to_delete = tuple(person.old_person_id for person in inputs.iter_person_overides_to_delete()) + activity.logger.debug("%s", old_person_ids_to_delete) + + query = DELETE_SQUASHED_PERSON_OVERRIDES_QUERY + latest_created_at = inputs.latest_created_at.timestamp() if inputs.latest_created_at else inputs.latest_created_at + parameters = { + "old_person_ids": old_person_ids_to_delete, + # We pass this as a timestamp ourselves as clickhouse-driver will drop any microseconds from the datetime. + # This would cause the latest merge event to be ignored. + # See: https://github.com/mymarilyn/clickhouse-driver/issues/306 + "latest_created_at": latest_created_at, + } + if inputs.dry_run is True: + activity.logger.info("This is a DRY RUN so nothing will be deleted.") + activity.logger.info("Would have run query: %s with parameters %s", query, parameters) + return -PERSON_OVERRIDES_MANAGERS = { - "mappings": PostgresPersonOverridesManager, - "flat": FlatPostgresPersonOverridesManager, -} + sync_execute(query.format(database=settings.CLICKHOUSE_DATABASE), parameters) @activity.defn @@ -689,10 +692,10 @@ async def delete_squashed_person_overrides_from_postgres(inputs: QueryInputs) -> port=settings.DATABASES["default"]["PORT"], **settings.DATABASES["default"].get("SSL_OPTIONS", {}), ) as connection: - person_overrides_manager = inputs.get_overrides_manager(connection) + overrides_manager = inputs.get_postgres_person_overrides_manager(connection) for person_override_to_delete in inputs.iter_person_overides_to_delete(): activity.logger.debug("%s", person_override_to_delete) - person_overrides_manager.delete(person_override_to_delete, inputs.dry_run) + overrides_manager.delete(person_override_to_delete, inputs.dry_run) @contextlib.asynccontextmanager @@ -756,7 +759,7 @@ class SquashPersonOverridesInputs: dictionary_name: str = "person_overrides_join_dict" last_n_months: int = 1 dry_run: bool = True - overrides_manager: str = "mappings" + postgres_person_overrides_manager: str = DEFAULT_POSTGRES_PERSON_OVERRIDES_MANAGER def iter_partition_ids(self) -> Iterator[str]: """Iterate over configured partition ids. @@ -876,7 +879,7 @@ async def run(self, inputs: SquashPersonOverridesInputs): dictionary_name=inputs.dictionary_name, team_ids=inputs.team_ids, dry_run=inputs.dry_run, - overrides_manager=inputs.overrides_manager, + postgres_person_overrides_manager=inputs.postgres_person_overrides_manager, ) async with person_overrides_dictionary( diff --git a/posthog/temporal/tests/test_squash_person_overrides_workflow.py b/posthog/temporal/tests/test_squash_person_overrides_workflow.py index f557c618d0a2f..b1bee479e5995 100644 --- a/posthog/temporal/tests/test_squash_person_overrides_workflow.py +++ b/posthog/temporal/tests/test_squash_person_overrides_workflow.py @@ -23,7 +23,7 @@ PERSON_OVERRIDES_CREATE_TABLE_SQL, ) from posthog.temporal.batch_exports.squash_person_overrides import ( - PERSON_OVERRIDES_MANAGERS, + POSTGRES_PERSON_OVERRIDES_MANAGERS, PostgresPersonOverridesManager, QueryInputs, SerializablePersonOverrideToDelete, @@ -939,7 +939,7 @@ def team_id(query_inputs, organization_uuid, pg_connection): cursor.execute("DELETE FROM posthog_team WHERE id = %s", [team_id]) -@pytest.fixture(params=PERSON_OVERRIDES_MANAGERS.keys()) +@pytest.fixture(params=POSTGRES_PERSON_OVERRIDES_MANAGERS.keys()) def overrides_manager(request): yield request.param @@ -959,7 +959,7 @@ def person_overrides(query_inputs, team_id, pg_connection, overrides_manager): person_override = PersonOverrideTuple(old_person_id, override_person_id) with pg_connection: - query_inputs.get_overrides_manager(pg_connection).insert( + query_inputs.get_postgres_person_overrides_manager(pg_connection).insert( team_id, old_person_id=person_override.old_person_id, override_person_id=person_override.override_person_id, @@ -968,7 +968,7 @@ def person_overrides(query_inputs, team_id, pg_connection, overrides_manager): yield person_override with pg_connection: - query_inputs.get_overrides_manager(pg_connection).clear(team_id) + query_inputs.get_postgres_person_overrides_manager(pg_connection).clear(team_id) @pytest.mark.django_db @@ -984,7 +984,7 @@ async def test_delete_squashed_person_overrides_from_postgres( # These are sanity checks to ensure the fixtures are working properly. # If any assertions fail here, its likely a test setup issue. with pg_connection: - assert query_inputs.get_overrides_manager(pg_connection).fetchall(team_id) == [ + assert query_inputs.get_postgres_person_overrides_manager(pg_connection).fetchall(team_id) == [ (team_id, person_overrides.old_person_id, person_overrides.override_person_id) ] @@ -1004,7 +1004,7 @@ async def test_delete_squashed_person_overrides_from_postgres( await activity_environment.run(delete_squashed_person_overrides_from_postgres, query_inputs) with pg_connection: - assert query_inputs.get_overrides_manager(pg_connection).fetchall(team_id) == [] + assert query_inputs.get_postgres_person_overrides_manager(pg_connection).fetchall(team_id) == [] @pytest.mark.django_db @@ -1016,7 +1016,7 @@ async def test_delete_squashed_person_overrides_from_postgres_dry_run( # These are sanity checks to ensure the fixtures are working properly. # If any assertions fail here, its likely a test setup issue. with pg_connection: - assert query_inputs.get_overrides_manager(pg_connection).fetchall(team_id) == [ + assert query_inputs.get_postgres_person_overrides_manager(pg_connection).fetchall(team_id) == [ (team_id, person_overrides.old_person_id, person_overrides.override_person_id) ] @@ -1036,7 +1036,7 @@ async def test_delete_squashed_person_overrides_from_postgres_dry_run( await activity_environment.run(delete_squashed_person_overrides_from_postgres, query_inputs) with pg_connection: - assert query_inputs.get_overrides_manager(pg_connection).fetchall(team_id) == [ + assert query_inputs.get_postgres_person_overrides_manager(pg_connection).fetchall(team_id) == [ (team_id, person_overrides.old_person_id, person_overrides.override_person_id) ] @@ -1055,8 +1055,9 @@ async def test_delete_squashed_person_overrides_from_postgres_with_newer_overrid # These are sanity checks to ensure the fixtures are working properly. # If any assertions fail here, its likely a test setup issue. with pg_connection: - if not isinstance(query_inputs.get_overrides_manager(pg_connection), PostgresPersonOverridesManager): - pytest.xfail("overrides manager not supported") + overrides_manager = query_inputs.get_postgres_person_overrides_manager(pg_connection) + if not isinstance(overrides_manager, PostgresPersonOverridesManager): + pytest.xfail(f"{overrides_manager!r} does not support mappings") with pg_connection.cursor() as cursor: cursor.execute("SELECT id, team_id, uuid FROM posthog_personoverridemapping") @@ -1151,7 +1152,7 @@ async def test_squash_person_overrides_workflow( inputs = SquashPersonOverridesInputs( partition_ids=["202001"], dry_run=False, - overrides_manager=overrides_manager, + postgres_person_overrides_manager=overrides_manager, ) async with Worker( @@ -1202,7 +1203,7 @@ async def test_squash_person_overrides_workflow_with_newer_overrides( inputs = SquashPersonOverridesInputs( partition_ids=["202001"], dry_run=False, - overrides_manager=overrides_manager, + postgres_person_overrides_manager=overrides_manager, ) async with Worker( @@ -1246,7 +1247,7 @@ async def test_squash_person_overrides_workflow_with_limited_team_ids( inputs = SquashPersonOverridesInputs( partition_ids=["202001"], team_ids=[random_team], - overrides_manager=overrides_manager, + postgres_person_overrides_manager=overrides_manager, dry_run=False, ) From 130a5784a9bd281774bc2969050d6a9abba1209b Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Thu, 14 Dec 2023 17:35:17 -0800 Subject: [PATCH 09/16] Comment gross test behavior --- .../tests/test_squash_person_overrides_workflow.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/posthog/temporal/tests/test_squash_person_overrides_workflow.py b/posthog/temporal/tests/test_squash_person_overrides_workflow.py index b1bee479e5995..a30a74cb745e8 100644 --- a/posthog/temporal/tests/test_squash_person_overrides_workflow.py +++ b/posthog/temporal/tests/test_squash_person_overrides_workflow.py @@ -945,14 +945,21 @@ def overrides_manager(request): @pytest.fixture -def person_overrides(query_inputs, team_id, pg_connection, overrides_manager): +def person_overrides(query_inputs: QueryInputs, team_id, pg_connection, overrides_manager): """Create a PersonOverrideMapping and a PersonOverride. We cannot use the Django ORM safely in an async context, so we INSERT INTO directly on the database. This means we need to clean up after ourselves, which we do after yielding. """ - query_inputs.overrides_manager = overrides_manager # XXX this is truly awful + # XXX: Several activity-based tests use this person overrides fixture and + # should vary their behavior to ensure that they work with both the old + # (mappings) and new (flat) approaches, but not all tests that use + # `query_inputs` need to be vary on the overrides manager type as many of + # them don't use Postgres overrides at all. To ensure that whenever Postgres + # overrides *are* used, we need to update the fixture here. This indirection + # isn't good, but this code should be short-lived, right? (... right???) + query_inputs.postgres_person_overrides_manager = overrides_manager old_person_id = uuid4() override_person_id = uuid4() From 88074ccccc562cf059b52d103da0661189c17dde Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Thu, 14 Dec 2023 17:39:36 -0800 Subject: [PATCH 10/16] Tag the person override fixtures with a type --- .../test_squash_person_overrides_workflow.py | 21 ++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/posthog/temporal/tests/test_squash_person_overrides_workflow.py b/posthog/temporal/tests/test_squash_person_overrides_workflow.py index a30a74cb745e8..e317aeb474396 100644 --- a/posthog/temporal/tests/test_squash_person_overrides_workflow.py +++ b/posthog/temporal/tests/test_squash_person_overrides_workflow.py @@ -944,8 +944,11 @@ def overrides_manager(request): yield request.param +PostgresPersonOverrideFixtures = PersonOverrideTuple + + @pytest.fixture -def person_overrides(query_inputs: QueryInputs, team_id, pg_connection, overrides_manager): +def person_overrides(query_inputs: QueryInputs, team_id, pg_connection, overrides_manager) -> PersonOverrideTuple: """Create a PersonOverrideMapping and a PersonOverride. We cannot use the Django ORM safely in an async context, so we INSERT INTO directly @@ -981,7 +984,7 @@ def person_overrides(query_inputs: QueryInputs, team_id, pg_connection, override @pytest.mark.django_db @pytest.mark.asyncio async def test_delete_squashed_person_overrides_from_postgres( - query_inputs, activity_environment, team_id, person_overrides, pg_connection + query_inputs, activity_environment, team_id, person_overrides: PostgresPersonOverrideFixtures, pg_connection ): """Test we can delete person overrides that have already been squashed. @@ -1017,7 +1020,7 @@ async def test_delete_squashed_person_overrides_from_postgres( @pytest.mark.django_db @pytest.mark.asyncio async def test_delete_squashed_person_overrides_from_postgres_dry_run( - query_inputs, activity_environment, team_id, person_overrides, pg_connection + query_inputs, activity_environment, team_id, person_overrides: PostgresPersonOverrideFixtures, pg_connection ): """Test we do not delete person overrides when dry_run=True.""" # These are sanity checks to ensure the fixtures are working properly. @@ -1051,7 +1054,7 @@ async def test_delete_squashed_person_overrides_from_postgres_dry_run( @pytest.mark.django_db @pytest.mark.asyncio async def test_delete_squashed_person_overrides_from_postgres_with_newer_override( - query_inputs, activity_environment, team_id, person_overrides, pg_connection + query_inputs, activity_environment, team_id, person_overrides: PostgresPersonOverrideFixtures, pg_connection ): """Test we do not delete a newer mapping from Postgres. @@ -1145,7 +1148,7 @@ async def test_squash_person_overrides_workflow( query_inputs, events_to_override, person_overrides_data, - person_overrides, + person_overrides: PostgresPersonOverrideFixtures, person_overrides_table, overrides_manager, ): @@ -1196,7 +1199,7 @@ async def test_squash_person_overrides_workflow_with_newer_overrides( query_inputs, events_to_override, person_overrides_data, - person_overrides, + person_overrides: PostgresPersonOverrideFixtures, newer_overrides, overrides_manager, ): @@ -1241,7 +1244,11 @@ async def test_squash_person_overrides_workflow_with_newer_overrides( @pytest.mark.django_db @pytest.mark.asyncio async def test_squash_person_overrides_workflow_with_limited_team_ids( - query_inputs, events_to_override, person_overrides_data, person_overrides, overrides_manager + query_inputs, + events_to_override, + person_overrides_data, + person_overrides: PostgresPersonOverrideFixtures, + overrides_manager, ): """Test the squash_person_overrides workflow end-to-end.""" client = await Client.connect( From ddc4e37e4a91c117def42342ce579936a053fa1d Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Thu, 14 Dec 2023 17:55:15 -0800 Subject: [PATCH 11/16] Tidy up tests --- .../test_squash_person_overrides_workflow.py | 73 +++++++++---------- 1 file changed, 36 insertions(+), 37 deletions(-) diff --git a/posthog/temporal/tests/test_squash_person_overrides_workflow.py b/posthog/temporal/tests/test_squash_person_overrides_workflow.py index e317aeb474396..77b0cec0457a1 100644 --- a/posthog/temporal/tests/test_squash_person_overrides_workflow.py +++ b/posthog/temporal/tests/test_squash_person_overrides_workflow.py @@ -2,7 +2,7 @@ import random from collections import defaultdict, namedtuple from datetime import datetime, timedelta -from typing import TypedDict +from typing import NamedTuple, TypedDict from uuid import UUID, uuid4 import psycopg2 @@ -939,16 +939,13 @@ def team_id(query_inputs, organization_uuid, pg_connection): cursor.execute("DELETE FROM posthog_team WHERE id = %s", [team_id]) -@pytest.fixture(params=POSTGRES_PERSON_OVERRIDES_MANAGERS.keys()) -def overrides_manager(request): - yield request.param - +class PostgresPersonOverrideFixtures(NamedTuple): + manager: str + override: PersonOverrideTuple -PostgresPersonOverrideFixtures = PersonOverrideTuple - -@pytest.fixture -def person_overrides(query_inputs: QueryInputs, team_id, pg_connection, overrides_manager) -> PersonOverrideTuple: +@pytest.fixture(params=POSTGRES_PERSON_OVERRIDES_MANAGERS.keys()) +def person_override_fixtures(request, query_inputs: QueryInputs, team_id, pg_connection) -> PersonOverrideTuple: """Create a PersonOverrideMapping and a PersonOverride. We cannot use the Django ORM safely in an async context, so we INSERT INTO directly @@ -962,7 +959,7 @@ def person_overrides(query_inputs: QueryInputs, team_id, pg_connection, override # them don't use Postgres overrides at all. To ensure that whenever Postgres # overrides *are* used, we need to update the fixture here. This indirection # isn't good, but this code should be short-lived, right? (... right???) - query_inputs.postgres_person_overrides_manager = overrides_manager + query_inputs.postgres_person_overrides_manager = request.param old_person_id = uuid4() override_person_id = uuid4() @@ -975,7 +972,7 @@ def person_overrides(query_inputs: QueryInputs, team_id, pg_connection, override override_person_id=person_override.override_person_id, ) - yield person_override + yield PostgresPersonOverrideFixtures(request.param, person_override) with pg_connection: query_inputs.get_postgres_person_overrides_manager(pg_connection).clear(team_id) @@ -984,25 +981,27 @@ def person_overrides(query_inputs: QueryInputs, team_id, pg_connection, override @pytest.mark.django_db @pytest.mark.asyncio async def test_delete_squashed_person_overrides_from_postgres( - query_inputs, activity_environment, team_id, person_overrides: PostgresPersonOverrideFixtures, pg_connection + query_inputs, activity_environment, team_id, person_override_fixtures: PostgresPersonOverrideFixtures, pg_connection ): """Test we can delete person overrides that have already been squashed. For the purposes of this unit test, we take the person overrides as given. A comprehensive test will cover the entire worflow end-to-end. """ + person_override = person_override_fixtures.override + # These are sanity checks to ensure the fixtures are working properly. # If any assertions fail here, its likely a test setup issue. with pg_connection: assert query_inputs.get_postgres_person_overrides_manager(pg_connection).fetchall(team_id) == [ - (team_id, person_overrides.old_person_id, person_overrides.override_person_id) + (team_id, person_override.old_person_id, person_override.override_person_id) ] person_overrides_to_delete = [ SerializablePersonOverrideToDelete( team_id, - person_overrides.old_person_id, - person_overrides.override_person_id, + person_override.old_person_id, + person_override.override_person_id, OVERRIDES_CREATED_AT.isoformat(), 1, OLDEST_EVENT_AT.isoformat(), @@ -1020,21 +1019,23 @@ async def test_delete_squashed_person_overrides_from_postgres( @pytest.mark.django_db @pytest.mark.asyncio async def test_delete_squashed_person_overrides_from_postgres_dry_run( - query_inputs, activity_environment, team_id, person_overrides: PostgresPersonOverrideFixtures, pg_connection + query_inputs, activity_environment, team_id, person_override_fixtures: PostgresPersonOverrideFixtures, pg_connection ): """Test we do not delete person overrides when dry_run=True.""" + person_override = person_override_fixtures.override + # These are sanity checks to ensure the fixtures are working properly. # If any assertions fail here, its likely a test setup issue. with pg_connection: assert query_inputs.get_postgres_person_overrides_manager(pg_connection).fetchall(team_id) == [ - (team_id, person_overrides.old_person_id, person_overrides.override_person_id) + (team_id, person_override.old_person_id, person_override.override_person_id) ] person_overrides_to_delete = [ SerializablePersonOverrideToDelete( team_id, - person_overrides.old_person_id, - person_overrides.override_person_id, + person_override.old_person_id, + person_override.override_person_id, OVERRIDES_CREATED_AT.isoformat(), 1, OLDEST_EVENT_AT.isoformat(), @@ -1047,20 +1048,21 @@ async def test_delete_squashed_person_overrides_from_postgres_dry_run( with pg_connection: assert query_inputs.get_postgres_person_overrides_manager(pg_connection).fetchall(team_id) == [ - (team_id, person_overrides.old_person_id, person_overrides.override_person_id) + (team_id, person_override.old_person_id, person_override.override_person_id) ] @pytest.mark.django_db @pytest.mark.asyncio async def test_delete_squashed_person_overrides_from_postgres_with_newer_override( - query_inputs, activity_environment, team_id, person_overrides: PostgresPersonOverrideFixtures, pg_connection + query_inputs, activity_environment, team_id, person_override_fixtures: PostgresPersonOverrideFixtures, pg_connection ): """Test we do not delete a newer mapping from Postgres. For the purposes of this unit test, we take the person overrides as given. A comprehensive test will cover the entire worflow end-to-end. """ + person_override = person_override_fixtures.override # These are sanity checks to ensure the fixtures are working properly. # If any assertions fail here, its likely a test setup issue. @@ -1093,7 +1095,7 @@ async def test_delete_squashed_person_overrides_from_postgres_with_newer_overrid { "team_id": team_id, "old_person_id": [ - mapping[0] for mapping in mappings if mapping[2] == person_overrides.old_person_id + mapping[0] for mapping in mappings if mapping[2] == person_override.old_person_id ][0], }, ) @@ -1102,8 +1104,8 @@ async def test_delete_squashed_person_overrides_from_postgres_with_newer_overrid # We are schedulling for deletion an override with lower version number, so nothing should happen. SerializablePersonOverrideToDelete( team_id, - person_overrides.old_person_id, - person_overrides.override_person_id, + person_override.old_person_id, + person_override.override_person_id, OVERRIDES_CREATED_AT.isoformat(), 1, OLDEST_EVENT_AT.isoformat(), @@ -1121,8 +1123,8 @@ async def test_delete_squashed_person_overrides_from_postgres_with_newer_overrid # Nothing was deleted from mappings table assert len(mappings) == 2 - assert person_overrides.override_person_id in [mapping[2] for mapping in mappings] - assert person_overrides.old_person_id in [mapping[2] for mapping in mappings] + assert person_override.override_person_id in [mapping[2] for mapping in mappings] + assert person_override.old_person_id in [mapping[2] for mapping in mappings] cursor.execute("SELECT team_id, old_person_id, override_person_id, version FROM posthog_personoverride") overrides = cursor.fetchall() @@ -1133,11 +1135,11 @@ async def test_delete_squashed_person_overrides_from_postgres_with_newer_overrid team_id, old_person_id, override_person_id, version = overrides[0] assert team_id == team_id assert ( - old_person_id == [mapping[0] for mapping in mappings if mapping[2] == person_overrides.old_person_id][0] + old_person_id == [mapping[0] for mapping in mappings if mapping[2] == person_override.old_person_id][0] ) assert ( override_person_id - == [mapping[0] for mapping in mappings if mapping[2] == person_overrides.override_person_id][0] + == [mapping[0] for mapping in mappings if mapping[2] == person_override.override_person_id][0] ) assert version == 2 @@ -1148,9 +1150,8 @@ async def test_squash_person_overrides_workflow( query_inputs, events_to_override, person_overrides_data, - person_overrides: PostgresPersonOverrideFixtures, + person_override_fixtures: PostgresPersonOverrideFixtures, person_overrides_table, - overrides_manager, ): """Test the squash_person_overrides workflow end-to-end.""" client = await Client.connect( @@ -1162,7 +1163,7 @@ async def test_squash_person_overrides_workflow( inputs = SquashPersonOverridesInputs( partition_ids=["202001"], dry_run=False, - postgres_person_overrides_manager=overrides_manager, + postgres_person_overrides_manager=person_override_fixtures.manager, ) async with Worker( @@ -1199,9 +1200,8 @@ async def test_squash_person_overrides_workflow_with_newer_overrides( query_inputs, events_to_override, person_overrides_data, - person_overrides: PostgresPersonOverrideFixtures, + person_override_fixtures: PostgresPersonOverrideFixtures, newer_overrides, - overrides_manager, ): """Test the squash_person_overrides workflow end-to-end with newer overrides.""" client = await Client.connect( @@ -1213,7 +1213,7 @@ async def test_squash_person_overrides_workflow_with_newer_overrides( inputs = SquashPersonOverridesInputs( partition_ids=["202001"], dry_run=False, - postgres_person_overrides_manager=overrides_manager, + postgres_person_overrides_manager=person_override_fixtures.manager, ) async with Worker( @@ -1247,8 +1247,7 @@ async def test_squash_person_overrides_workflow_with_limited_team_ids( query_inputs, events_to_override, person_overrides_data, - person_overrides: PostgresPersonOverrideFixtures, - overrides_manager, + person_override_fixtures: PostgresPersonOverrideFixtures, ): """Test the squash_person_overrides workflow end-to-end.""" client = await Client.connect( @@ -1261,7 +1260,7 @@ async def test_squash_person_overrides_workflow_with_limited_team_ids( inputs = SquashPersonOverridesInputs( partition_ids=["202001"], team_ids=[random_team], - postgres_person_overrides_manager=overrides_manager, + postgres_person_overrides_manager=person_override_fixtures.manager, dry_run=False, ) From b78e686787a202efde03de4b95e3ccbe57ad7653 Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Thu, 14 Dec 2023 18:06:27 -0800 Subject: [PATCH 12/16] Consolidate override data structures --- .../batch_exports/squash_person_overrides.py | 35 ++++++----- .../test_squash_person_overrides_workflow.py | 62 +++++++------------ 2 files changed, 41 insertions(+), 56 deletions(-) diff --git a/posthog/temporal/batch_exports/squash_person_overrides.py b/posthog/temporal/batch_exports/squash_person_overrides.py index 3c5fe2313ab67..89c85ae0eae0b 100644 --- a/posthog/temporal/batch_exports/squash_person_overrides.py +++ b/posthog/temporal/batch_exports/squash_person_overrides.py @@ -3,7 +3,7 @@ from collections.abc import Iterator from dataclasses import asdict, dataclass, field from datetime import datetime, timedelta, timezone -from typing import AsyncIterator, Iterable, NamedTuple +from typing import AsyncIterator, Iterable, NamedTuple, Sequence from uuid import UUID import psycopg2 @@ -132,16 +132,20 @@ class SerializablePersonOverrideToDelete(NamedTuple): oldest_event_at: str +class PersonOverrideTuple(NamedTuple): + old_person_id: UUID + override_person_id: UUID + + class PostgresPersonOverridesManager: def __init__(self, connection): self.connection = connection - def fetchall(self, team_id: int): + def fetchall(self, team_id: int) -> Sequence[PersonOverrideTuple]: with self.connection.cursor() as cursor: cursor.execute( """ SELECT - override.team_id, old_person.uuid, override_person.uuid FROM posthog_personoverride override @@ -155,12 +159,12 @@ def fetchall(self, team_id: int): """, {"team_id": team_id}, ) - return cursor.fetchall() + return [PersonOverrideTuple(*row) for row in cursor.fetchall()] - def insert(self, team_id: int, old_person_id: UUID, override_person_id: UUID) -> None: + def insert(self, team_id: int, override: PersonOverrideTuple) -> None: with self.connection.cursor() as cursor: - person_ids = [] - for person_uuid in (override_person_id, old_person_id): + mapping_ids = [] + for person_uuid in (override.override_person_id, override.old_person_id): cursor.execute( """ INSERT INTO posthog_personoverridemapping( @@ -176,7 +180,7 @@ def insert(self, team_id: int, old_person_id: UUID, override_person_id: UUID) -> """, {"team_id": team_id, "uuid": person_uuid}, ) - person_ids.append(cursor.fetchone()) + mapping_ids.append(cursor.fetchone()) cursor.execute( """ @@ -197,8 +201,8 @@ def insert(self, team_id: int, old_person_id: UUID, override_person_id: UUID) -> """, { "team_id": team_id, - "old_person_id": person_ids[1], - "override_person_id": person_ids[0], + "old_person_id": mapping_ids[1], + "override_person_id": mapping_ids[0], }, ) @@ -307,12 +311,11 @@ class FlatPostgresPersonOverridesManager: def __init__(self, connection): self.connection = connection - def fetchall(self, team_id: int): + def fetchall(self, team_id: int) -> Sequence[PersonOverrideTuple]: with self.connection.cursor() as cursor: cursor.execute( """ SELECT - team_id, old_person_id, override_person_id FROM posthog_flatpersonoverride @@ -320,9 +323,9 @@ def fetchall(self, team_id: int): """, {"team_id": team_id}, ) - return cursor.fetchall() + return [PersonOverrideTuple(*row) for row in cursor.fetchall()] - def insert(self, team_id: int, old_person_id: UUID, override_person_id: UUID) -> None: + def insert(self, team_id: int, override: PersonOverrideTuple) -> None: with self.connection.cursor() as cursor: cursor.execute( """ @@ -343,8 +346,8 @@ def insert(self, team_id: int, old_person_id: UUID, override_person_id: UUID) -> """, { "team_id": team_id, - "old_person_id": old_person_id, - "override_person_id": override_person_id, + "old_person_id": override.old_person_id, + "override_person_id": override.override_person_id, }, ) diff --git a/posthog/temporal/tests/test_squash_person_overrides_workflow.py b/posthog/temporal/tests/test_squash_person_overrides_workflow.py index 77b0cec0457a1..b70b294d1b786 100644 --- a/posthog/temporal/tests/test_squash_person_overrides_workflow.py +++ b/posthog/temporal/tests/test_squash_person_overrides_workflow.py @@ -1,6 +1,6 @@ import operator import random -from collections import defaultdict, namedtuple +from collections import defaultdict from datetime import datetime, timedelta from typing import NamedTuple, TypedDict from uuid import UUID, uuid4 @@ -24,6 +24,7 @@ ) from posthog.temporal.batch_exports.squash_person_overrides import ( POSTGRES_PERSON_OVERRIDES_MANAGERS, + PersonOverrideTuple, PostgresPersonOverridesManager, QueryInputs, SerializablePersonOverrideToDelete, @@ -85,9 +86,6 @@ def person_overrides_table(query_inputs): sync_execute("DROP TABLE person_overrides") -PersonOverrideTuple = namedtuple("PersonOverrideTuple", ("old_person_id", "override_person_id")) - - OVERRIDES_CREATED_AT = datetime.fromisoformat("2020-01-02T00:00:00.123123+00:00") OLDEST_EVENT_AT = OVERRIDES_CREATED_AT - timedelta(days=1) @@ -961,18 +959,12 @@ def person_override_fixtures(request, query_inputs: QueryInputs, team_id, pg_con # isn't good, but this code should be short-lived, right? (... right???) query_inputs.postgres_person_overrides_manager = request.param - old_person_id = uuid4() - override_person_id = uuid4() - person_override = PersonOverrideTuple(old_person_id, override_person_id) + override = PersonOverrideTuple(uuid4(), uuid4()) with pg_connection: - query_inputs.get_postgres_person_overrides_manager(pg_connection).insert( - team_id, - old_person_id=person_override.old_person_id, - override_person_id=person_override.override_person_id, - ) + query_inputs.get_postgres_person_overrides_manager(pg_connection).insert(team_id, override) - yield PostgresPersonOverrideFixtures(request.param, person_override) + yield PostgresPersonOverrideFixtures(request.param, override) with pg_connection: query_inputs.get_postgres_person_overrides_manager(pg_connection).clear(team_id) @@ -988,20 +980,18 @@ async def test_delete_squashed_person_overrides_from_postgres( For the purposes of this unit test, we take the person overrides as given. A comprehensive test will cover the entire worflow end-to-end. """ - person_override = person_override_fixtures.override + override = person_override_fixtures.override # These are sanity checks to ensure the fixtures are working properly. # If any assertions fail here, its likely a test setup issue. with pg_connection: - assert query_inputs.get_postgres_person_overrides_manager(pg_connection).fetchall(team_id) == [ - (team_id, person_override.old_person_id, person_override.override_person_id) - ] + assert query_inputs.get_postgres_person_overrides_manager(pg_connection).fetchall(team_id) == [override] person_overrides_to_delete = [ SerializablePersonOverrideToDelete( team_id, - person_override.old_person_id, - person_override.override_person_id, + override.old_person_id, + override.override_person_id, OVERRIDES_CREATED_AT.isoformat(), 1, OLDEST_EVENT_AT.isoformat(), @@ -1022,20 +1012,18 @@ async def test_delete_squashed_person_overrides_from_postgres_dry_run( query_inputs, activity_environment, team_id, person_override_fixtures: PostgresPersonOverrideFixtures, pg_connection ): """Test we do not delete person overrides when dry_run=True.""" - person_override = person_override_fixtures.override + override = person_override_fixtures.override # These are sanity checks to ensure the fixtures are working properly. # If any assertions fail here, its likely a test setup issue. with pg_connection: - assert query_inputs.get_postgres_person_overrides_manager(pg_connection).fetchall(team_id) == [ - (team_id, person_override.old_person_id, person_override.override_person_id) - ] + assert query_inputs.get_postgres_person_overrides_manager(pg_connection).fetchall(team_id) == [override] person_overrides_to_delete = [ SerializablePersonOverrideToDelete( team_id, - person_override.old_person_id, - person_override.override_person_id, + override.old_person_id, + override.override_person_id, OVERRIDES_CREATED_AT.isoformat(), 1, OLDEST_EVENT_AT.isoformat(), @@ -1047,9 +1035,7 @@ async def test_delete_squashed_person_overrides_from_postgres_dry_run( await activity_environment.run(delete_squashed_person_overrides_from_postgres, query_inputs) with pg_connection: - assert query_inputs.get_postgres_person_overrides_manager(pg_connection).fetchall(team_id) == [ - (team_id, person_override.old_person_id, person_override.override_person_id) - ] + assert query_inputs.get_postgres_person_overrides_manager(pg_connection).fetchall(team_id) == [override] @pytest.mark.django_db @@ -1062,7 +1048,7 @@ async def test_delete_squashed_person_overrides_from_postgres_with_newer_overrid For the purposes of this unit test, we take the person overrides as given. A comprehensive test will cover the entire worflow end-to-end. """ - person_override = person_override_fixtures.override + override = person_override_fixtures.override # These are sanity checks to ensure the fixtures are working properly. # If any assertions fail here, its likely a test setup issue. @@ -1094,9 +1080,7 @@ async def test_delete_squashed_person_overrides_from_postgres_with_newer_overrid """, { "team_id": team_id, - "old_person_id": [ - mapping[0] for mapping in mappings if mapping[2] == person_override.old_person_id - ][0], + "old_person_id": [mapping[0] for mapping in mappings if mapping[2] == override.old_person_id][0], }, ) @@ -1104,8 +1088,8 @@ async def test_delete_squashed_person_overrides_from_postgres_with_newer_overrid # We are schedulling for deletion an override with lower version number, so nothing should happen. SerializablePersonOverrideToDelete( team_id, - person_override.old_person_id, - person_override.override_person_id, + override.old_person_id, + override.override_person_id, OVERRIDES_CREATED_AT.isoformat(), 1, OLDEST_EVENT_AT.isoformat(), @@ -1123,8 +1107,8 @@ async def test_delete_squashed_person_overrides_from_postgres_with_newer_overrid # Nothing was deleted from mappings table assert len(mappings) == 2 - assert person_override.override_person_id in [mapping[2] for mapping in mappings] - assert person_override.old_person_id in [mapping[2] for mapping in mappings] + assert override.override_person_id in [mapping[2] for mapping in mappings] + assert override.old_person_id in [mapping[2] for mapping in mappings] cursor.execute("SELECT team_id, old_person_id, override_person_id, version FROM posthog_personoverride") overrides = cursor.fetchall() @@ -1134,12 +1118,10 @@ async def test_delete_squashed_person_overrides_from_postgres_with_newer_overrid team_id, old_person_id, override_person_id, version = overrides[0] assert team_id == team_id - assert ( - old_person_id == [mapping[0] for mapping in mappings if mapping[2] == person_override.old_person_id][0] - ) + assert old_person_id == [mapping[0] for mapping in mappings if mapping[2] == override.old_person_id][0] assert ( override_person_id - == [mapping[0] for mapping in mappings if mapping[2] == person_override.override_person_id][0] + == [mapping[0] for mapping in mappings if mapping[2] == override.override_person_id][0] ) assert version == 2 From 36a30acc1a23d846d3e0bfe5664dc0e053082a9c Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Thu, 14 Dec 2023 18:12:24 -0800 Subject: [PATCH 13/16] Remove some unused query input fixtures --- .../tests/test_squash_person_overrides_workflow.py | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/posthog/temporal/tests/test_squash_person_overrides_workflow.py b/posthog/temporal/tests/test_squash_person_overrides_workflow.py index b70b294d1b786..94a01ac65d1c2 100644 --- a/posthog/temporal/tests/test_squash_person_overrides_workflow.py +++ b/posthog/temporal/tests/test_squash_person_overrides_workflow.py @@ -72,7 +72,7 @@ def activity_environment(): @pytest.fixture -def person_overrides_table(query_inputs): +def person_overrides_table(): """Manage person_overrides tables for testing.""" sync_execute(PERSON_OVERRIDES_CREATE_TABLE_SQL) sync_execute(KAFKA_PERSON_OVERRIDES_TABLE_SQL) @@ -129,9 +129,7 @@ def person_overrides_data(person_overrides_table): @pytest.fixture def query_inputs(): """A default set of QueryInputs to use in all tests.""" - query_inputs = QueryInputs() - - return query_inputs + return QueryInputs() @pytest.mark.django_db @@ -1129,7 +1127,6 @@ async def test_delete_squashed_person_overrides_from_postgres_with_newer_overrid @pytest.mark.django_db @pytest.mark.asyncio async def test_squash_person_overrides_workflow( - query_inputs, events_to_override, person_overrides_data, person_override_fixtures: PostgresPersonOverrideFixtures, @@ -1179,7 +1176,6 @@ async def test_squash_person_overrides_workflow( @pytest.mark.django_db @pytest.mark.asyncio async def test_squash_person_overrides_workflow_with_newer_overrides( - query_inputs, events_to_override, person_overrides_data, person_override_fixtures: PostgresPersonOverrideFixtures, @@ -1226,7 +1222,6 @@ async def test_squash_person_overrides_workflow_with_newer_overrides( @pytest.mark.django_db @pytest.mark.asyncio async def test_squash_person_overrides_workflow_with_limited_team_ids( - query_inputs, events_to_override, person_overrides_data, person_override_fixtures: PostgresPersonOverrideFixtures, From 75ad916ed0766ec8fd6995969a196baa55d00957 Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Thu, 14 Dec 2023 18:13:41 -0800 Subject: [PATCH 14/16] Clean up more names --- .../test_squash_person_overrides_workflow.py | 40 +++++++++++++------ 1 file changed, 27 insertions(+), 13 deletions(-) diff --git a/posthog/temporal/tests/test_squash_person_overrides_workflow.py b/posthog/temporal/tests/test_squash_person_overrides_workflow.py index 94a01ac65d1c2..3b8ee34c40448 100644 --- a/posthog/temporal/tests/test_squash_person_overrides_workflow.py +++ b/posthog/temporal/tests/test_squash_person_overrides_workflow.py @@ -941,7 +941,9 @@ class PostgresPersonOverrideFixtures(NamedTuple): @pytest.fixture(params=POSTGRES_PERSON_OVERRIDES_MANAGERS.keys()) -def person_override_fixtures(request, query_inputs: QueryInputs, team_id, pg_connection) -> PersonOverrideTuple: +def postgres_person_override_fixtures( + request, query_inputs: QueryInputs, team_id, pg_connection +) -> PersonOverrideTuple: """Create a PersonOverrideMapping and a PersonOverride. We cannot use the Django ORM safely in an async context, so we INSERT INTO directly @@ -971,14 +973,18 @@ def person_override_fixtures(request, query_inputs: QueryInputs, team_id, pg_con @pytest.mark.django_db @pytest.mark.asyncio async def test_delete_squashed_person_overrides_from_postgres( - query_inputs, activity_environment, team_id, person_override_fixtures: PostgresPersonOverrideFixtures, pg_connection + query_inputs, + activity_environment, + team_id, + postgres_person_override_fixtures: PostgresPersonOverrideFixtures, + pg_connection, ): """Test we can delete person overrides that have already been squashed. For the purposes of this unit test, we take the person overrides as given. A comprehensive test will cover the entire worflow end-to-end. """ - override = person_override_fixtures.override + override = postgres_person_override_fixtures.override # These are sanity checks to ensure the fixtures are working properly. # If any assertions fail here, its likely a test setup issue. @@ -1007,10 +1013,14 @@ async def test_delete_squashed_person_overrides_from_postgres( @pytest.mark.django_db @pytest.mark.asyncio async def test_delete_squashed_person_overrides_from_postgres_dry_run( - query_inputs, activity_environment, team_id, person_override_fixtures: PostgresPersonOverrideFixtures, pg_connection + query_inputs, + activity_environment, + team_id, + postgres_person_override_fixtures: PostgresPersonOverrideFixtures, + pg_connection, ): """Test we do not delete person overrides when dry_run=True.""" - override = person_override_fixtures.override + override = postgres_person_override_fixtures.override # These are sanity checks to ensure the fixtures are working properly. # If any assertions fail here, its likely a test setup issue. @@ -1039,14 +1049,18 @@ async def test_delete_squashed_person_overrides_from_postgres_dry_run( @pytest.mark.django_db @pytest.mark.asyncio async def test_delete_squashed_person_overrides_from_postgres_with_newer_override( - query_inputs, activity_environment, team_id, person_override_fixtures: PostgresPersonOverrideFixtures, pg_connection + query_inputs, + activity_environment, + team_id, + postgres_person_override_fixtures: PostgresPersonOverrideFixtures, + pg_connection, ): """Test we do not delete a newer mapping from Postgres. For the purposes of this unit test, we take the person overrides as given. A comprehensive test will cover the entire worflow end-to-end. """ - override = person_override_fixtures.override + override = postgres_person_override_fixtures.override # These are sanity checks to ensure the fixtures are working properly. # If any assertions fail here, its likely a test setup issue. @@ -1129,7 +1143,7 @@ async def test_delete_squashed_person_overrides_from_postgres_with_newer_overrid async def test_squash_person_overrides_workflow( events_to_override, person_overrides_data, - person_override_fixtures: PostgresPersonOverrideFixtures, + postgres_person_override_fixtures: PostgresPersonOverrideFixtures, person_overrides_table, ): """Test the squash_person_overrides workflow end-to-end.""" @@ -1142,7 +1156,7 @@ async def test_squash_person_overrides_workflow( inputs = SquashPersonOverridesInputs( partition_ids=["202001"], dry_run=False, - postgres_person_overrides_manager=person_override_fixtures.manager, + postgres_person_overrides_manager=postgres_person_override_fixtures.manager, ) async with Worker( @@ -1178,7 +1192,7 @@ async def test_squash_person_overrides_workflow( async def test_squash_person_overrides_workflow_with_newer_overrides( events_to_override, person_overrides_data, - person_override_fixtures: PostgresPersonOverrideFixtures, + postgres_person_override_fixtures: PostgresPersonOverrideFixtures, newer_overrides, ): """Test the squash_person_overrides workflow end-to-end with newer overrides.""" @@ -1191,7 +1205,7 @@ async def test_squash_person_overrides_workflow_with_newer_overrides( inputs = SquashPersonOverridesInputs( partition_ids=["202001"], dry_run=False, - postgres_person_overrides_manager=person_override_fixtures.manager, + postgres_person_overrides_manager=postgres_person_override_fixtures.manager, ) async with Worker( @@ -1224,7 +1238,7 @@ async def test_squash_person_overrides_workflow_with_newer_overrides( async def test_squash_person_overrides_workflow_with_limited_team_ids( events_to_override, person_overrides_data, - person_override_fixtures: PostgresPersonOverrideFixtures, + postgres_person_override_fixtures: PostgresPersonOverrideFixtures, ): """Test the squash_person_overrides workflow end-to-end.""" client = await Client.connect( @@ -1237,7 +1251,7 @@ async def test_squash_person_overrides_workflow_with_limited_team_ids( inputs = SquashPersonOverridesInputs( partition_ids=["202001"], team_ids=[random_team], - postgres_person_overrides_manager=person_override_fixtures.manager, + postgres_person_overrides_manager=postgres_person_override_fixtures.manager, dry_run=False, ) From 7d04c335dbba29e5f9171d92a413d59756ef7da6 Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Thu, 14 Dec 2023 18:47:07 -0800 Subject: [PATCH 15/16] Fix typing in tests --- .../temporal/tests/test_squash_person_overrides_workflow.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/posthog/temporal/tests/test_squash_person_overrides_workflow.py b/posthog/temporal/tests/test_squash_person_overrides_workflow.py index 3b8ee34c40448..4e90610914ef4 100644 --- a/posthog/temporal/tests/test_squash_person_overrides_workflow.py +++ b/posthog/temporal/tests/test_squash_person_overrides_workflow.py @@ -2,7 +2,7 @@ import random from collections import defaultdict from datetime import datetime, timedelta -from typing import NamedTuple, TypedDict +from typing import Iterator, NamedTuple, TypedDict from uuid import UUID, uuid4 import psycopg2 @@ -943,7 +943,7 @@ class PostgresPersonOverrideFixtures(NamedTuple): @pytest.fixture(params=POSTGRES_PERSON_OVERRIDES_MANAGERS.keys()) def postgres_person_override_fixtures( request, query_inputs: QueryInputs, team_id, pg_connection -) -> PersonOverrideTuple: +) -> Iterator[PostgresPersonOverrideFixtures]: """Create a PersonOverrideMapping and a PersonOverride. We cannot use the Django ORM safely in an async context, so we INSERT INTO directly From 5e177b9a5b4bb397c0cbc8428b8c2ac235af985c Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Tue, 2 Jan 2024 16:04:58 -0800 Subject: [PATCH 16/16] Use flat override path by default, see https://github.com/PostHog/charts/pull/623 --- posthog/temporal/batch_exports/squash_person_overrides.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/posthog/temporal/batch_exports/squash_person_overrides.py b/posthog/temporal/batch_exports/squash_person_overrides.py index 89c85ae0eae0b..6843131c60333 100644 --- a/posthog/temporal/batch_exports/squash_person_overrides.py +++ b/posthog/temporal/batch_exports/squash_person_overrides.py @@ -394,7 +394,7 @@ def clear(self, team_id: int) -> None: "flat": FlatPostgresPersonOverridesManager, } -DEFAULT_POSTGRES_PERSON_OVERRIDES_MANAGER = "mappings" +DEFAULT_POSTGRES_PERSON_OVERRIDES_MANAGER = "flat" assert DEFAULT_POSTGRES_PERSON_OVERRIDES_MANAGER in POSTGRES_PERSON_OVERRIDES_MANAGERS