diff --git a/posthog/temporal/batch_exports/squash_person_overrides.py b/posthog/temporal/batch_exports/squash_person_overrides.py index 943dd91192e21..6843131c60333 100644 --- a/posthog/temporal/batch_exports/squash_person_overrides.py +++ b/posthog/temporal/batch_exports/squash_person_overrides.py @@ -3,7 +3,7 @@ from collections.abc import Iterator from dataclasses import asdict, dataclass, field from datetime import datetime, timedelta, timezone -from typing import AsyncIterator, Iterable, NamedTuple +from typing import AsyncIterator, Iterable, NamedTuple, Sequence from uuid import UUID import psycopg2 @@ -87,35 +87,6 @@ AND created_at >= 0; """ -SELECT_ID_FROM_OVERRIDE_UUID = """ -SELECT - id -FROM - posthog_personoverridemapping -WHERE - team_id = %(team_id)s - AND uuid = %(uuid)s; -""" - -DELETE_FROM_PERSON_OVERRIDES = """ -DELETE FROM - posthog_personoverride -WHERE - team_id = %(team_id)s - AND old_person_id = %(old_person_id)s - AND override_person_id = %(override_person_id)s - AND version = %(latest_version)s -RETURNING - old_person_id; -""" - -DELETE_FROM_PERSON_OVERRIDE_MAPPINGS = """ -DELETE FROM - posthog_personoverridemapping -WHERE - id = %(id)s; -""" - class PersonOverrideToDelete(NamedTuple): """A person override that should be deleted after squashing. @@ -161,6 +132,272 @@ class SerializablePersonOverrideToDelete(NamedTuple): oldest_event_at: str +class PersonOverrideTuple(NamedTuple): + old_person_id: UUID + override_person_id: UUID + + +class PostgresPersonOverridesManager: + def __init__(self, connection): + self.connection = connection + + def fetchall(self, team_id: int) -> Sequence[PersonOverrideTuple]: + with self.connection.cursor() as cursor: + cursor.execute( + """ + SELECT + old_person.uuid, + override_person.uuid + FROM posthog_personoverride override + LEFT OUTER JOIN + posthog_personoverridemapping old_person + ON override.team_id = old_person.team_id AND override.old_person_id = old_person.id + LEFT OUTER JOIN + posthog_personoverridemapping override_person + ON override.team_id = override_person.team_id AND override.override_person_id = override_person.id + WHERE override.team_id = %(team_id)s + """, + {"team_id": team_id}, + ) + return [PersonOverrideTuple(*row) for row in cursor.fetchall()] + + def insert(self, team_id: int, override: PersonOverrideTuple) -> None: + with self.connection.cursor() as cursor: + mapping_ids = [] + for person_uuid in (override.override_person_id, override.old_person_id): + cursor.execute( + """ + INSERT INTO posthog_personoverridemapping( + team_id, + uuid + ) + VALUES ( + %(team_id)s, + %(uuid)s + ) + ON CONFLICT("team_id", "uuid") DO NOTHING + RETURNING id + """, + {"team_id": team_id, "uuid": person_uuid}, + ) + mapping_ids.append(cursor.fetchone()) + + cursor.execute( + """ + INSERT INTO posthog_personoverride( + team_id, + old_person_id, + override_person_id, + oldest_event, + version + ) + VALUES ( + %(team_id)s, + %(old_person_id)s, + %(override_person_id)s, + NOW(), + 1 + ); + """, + { + "team_id": team_id, + "old_person_id": mapping_ids[1], + "override_person_id": mapping_ids[0], + }, + ) + + def delete(self, person_override: SerializablePersonOverrideToDelete, dry_run: bool = False) -> None: + with self.connection.cursor() as cursor: + SELECT_ID_FROM_OVERRIDE_UUID = """ + SELECT + id + FROM + posthog_personoverridemapping + WHERE + team_id = %(team_id)s + AND uuid = %(uuid)s; + """ + + cursor.execute( + SELECT_ID_FROM_OVERRIDE_UUID, + { + "team_id": person_override.team_id, + "uuid": person_override.old_person_id, + }, + ) + row = cursor.fetchone() + if not row: + return + + old_person_id = row[0] + + cursor.execute( + SELECT_ID_FROM_OVERRIDE_UUID, + { + "team_id": person_override.team_id, + "uuid": person_override.override_person_id, + }, + ) + row = cursor.fetchone() + if not row: + return + + override_person_id = row[0] + + DELETE_FROM_PERSON_OVERRIDES = """ + DELETE FROM + posthog_personoverride + WHERE + team_id = %(team_id)s + AND old_person_id = %(old_person_id)s + AND override_person_id = %(override_person_id)s + AND version = %(latest_version)s + RETURNING + old_person_id; + """ + + parameters = { + "team_id": person_override.team_id, + "old_person_id": old_person_id, + "override_person_id": override_person_id, + "latest_version": person_override.latest_version, + } + + if dry_run is True: + activity.logger.info("This is a DRY RUN so nothing will be deleted.") + activity.logger.info( + "Would have run query: %s with parameters %s", + DELETE_FROM_PERSON_OVERRIDES, + parameters, + ) + return + + cursor.execute(DELETE_FROM_PERSON_OVERRIDES, parameters) + row = cursor.fetchone() + if not row: + # There is no existing mapping for this (old_person_id, override_person_id) pair. + # It could be that a newer one was added (with a later version). + return + + deleted_id = row[0] + + DELETE_FROM_PERSON_OVERRIDE_MAPPINGS = """ + DELETE FROM + posthog_personoverridemapping + WHERE + id = %(deleted_id)s; + """ + + cursor.execute( + DELETE_FROM_PERSON_OVERRIDE_MAPPINGS, + { + "deleted_id": deleted_id, + }, + ) + + def clear(self, team_id: int) -> None: + with self.connection.cursor() as cursor: + cursor.execute( + "DELETE FROM posthog_personoverride WHERE team_id = %s", + [team_id], + ) + cursor.execute( + "DELETE FROM posthog_personoverridemapping WHERE team_id = %s", + [team_id], + ) + + +class FlatPostgresPersonOverridesManager: + def __init__(self, connection): + self.connection = connection + + def fetchall(self, team_id: int) -> Sequence[PersonOverrideTuple]: + with self.connection.cursor() as cursor: + cursor.execute( + """ + SELECT + old_person_id, + override_person_id + FROM posthog_flatpersonoverride + WHERE team_id = %(team_id)s + """, + {"team_id": team_id}, + ) + return [PersonOverrideTuple(*row) for row in cursor.fetchall()] + + def insert(self, team_id: int, override: PersonOverrideTuple) -> None: + with self.connection.cursor() as cursor: + cursor.execute( + """ + INSERT INTO posthog_flatpersonoverride( + team_id, + old_person_id, + override_person_id, + oldest_event, + version + ) + VALUES ( + %(team_id)s, + %(old_person_id)s, + %(override_person_id)s, + NOW(), + 1 + ); + """, + { + "team_id": team_id, + "old_person_id": override.old_person_id, + "override_person_id": override.override_person_id, + }, + ) + + def delete(self, person_override: SerializablePersonOverrideToDelete, dry_run: bool = False) -> None: + query = """ + DELETE FROM + posthog_flatpersonoverride + WHERE + team_id = %(team_id)s + AND old_person_id = %(old_person_id)s + AND override_person_id = %(override_person_id)s + AND version = %(latest_version)s + """ + + parameters = { + "team_id": person_override.team_id, + "old_person_id": person_override.old_person_id, + "override_person_id": person_override.override_person_id, + "latest_version": person_override.latest_version, + } + + if dry_run is True: + activity.logger.info("This is a DRY RUN so nothing will be deleted.") + activity.logger.info( + "Would have run query: %s with parameters %s", + query, + parameters, + ) + return + + with self.connection.cursor() as cursor: + cursor.execute(query, parameters) + + def clear(self, team_id: int) -> None: + with self.connection.cursor() as cursor: + cursor.execute( + "DELETE FROM posthog_flatpersonoverride WHERE team_id = %s", + [team_id], + ) + + +POSTGRES_PERSON_OVERRIDES_MANAGERS = { + "mappings": PostgresPersonOverridesManager, + "flat": FlatPostgresPersonOverridesManager, +} + +DEFAULT_POSTGRES_PERSON_OVERRIDES_MANAGER = "flat" +assert DEFAULT_POSTGRES_PERSON_OVERRIDES_MANAGER in POSTGRES_PERSON_OVERRIDES_MANAGERS + + @dataclass class QueryInputs: """Inputs for activities that run queries in the SquashPersonOverrides workflow. @@ -184,6 +421,7 @@ class QueryInputs: dictionary_name: str = "person_overrides_join_dict" dry_run: bool = True _latest_created_at: str | datetime | None = None + postgres_person_overrides_manager: str = DEFAULT_POSTGRES_PERSON_OVERRIDES_MANAGER def __post_init__(self) -> None: if isinstance(self._latest_created_at, datetime): @@ -212,6 +450,9 @@ def iter_person_overides_to_delete(self) -> Iterable[SerializablePersonOverrideT for person_override_to_delete in self.person_overrides_to_delete: yield SerializablePersonOverrideToDelete(*person_override_to_delete) + def get_postgres_person_overrides_manager(self, connection): + return POSTGRES_PERSON_OVERRIDES_MANAGERS[self.postgres_person_overrides_manager](connection) + @activity.defn async def prepare_person_overrides(inputs: QueryInputs) -> None: @@ -454,68 +695,10 @@ async def delete_squashed_person_overrides_from_postgres(inputs: QueryInputs) -> port=settings.DATABASES["default"]["PORT"], **settings.DATABASES["default"].get("SSL_OPTIONS", {}), ) as connection: - with connection.cursor() as cursor: - for person_override_to_delete in inputs.iter_person_overides_to_delete(): - activity.logger.debug("%s", person_override_to_delete) - - cursor.execute( - SELECT_ID_FROM_OVERRIDE_UUID, - { - "team_id": person_override_to_delete.team_id, - "uuid": person_override_to_delete.old_person_id, - }, - ) - - row = cursor.fetchone() - if not row: - continue - old_person_id = row[0] - - cursor.execute( - SELECT_ID_FROM_OVERRIDE_UUID, - { - "team_id": person_override_to_delete.team_id, - "uuid": person_override_to_delete.override_person_id, - }, - ) - - row = cursor.fetchone() - if not row: - continue - - override_person_id = row[0] - - parameters = { - "team_id": person_override_to_delete.team_id, - "old_person_id": old_person_id, - "override_person_id": override_person_id, - "latest_version": person_override_to_delete.latest_version, - } - - if inputs.dry_run is True: - activity.logger.info("This is a DRY RUN so nothing will be deleted.") - activity.logger.info( - "Would have run query: %s with parameters %s", - DELETE_FROM_PERSON_OVERRIDES, - parameters, - ) - continue - - cursor.execute(DELETE_FROM_PERSON_OVERRIDES, parameters) - - row = cursor.fetchone() - if not row: - # There is no existing mapping for this (old_person_id, override_person_id) pair. - # It could be that a newer one was added (with a later version). - continue - deleted_id = row[0] - - cursor.execute( - DELETE_FROM_PERSON_OVERRIDE_MAPPINGS, - { - "id": deleted_id, - }, - ) + overrides_manager = inputs.get_postgres_person_overrides_manager(connection) + for person_override_to_delete in inputs.iter_person_overides_to_delete(): + activity.logger.debug("%s", person_override_to_delete) + overrides_manager.delete(person_override_to_delete, inputs.dry_run) @contextlib.asynccontextmanager @@ -579,6 +762,7 @@ class SquashPersonOverridesInputs: dictionary_name: str = "person_overrides_join_dict" last_n_months: int = 1 dry_run: bool = True + postgres_person_overrides_manager: str = DEFAULT_POSTGRES_PERSON_OVERRIDES_MANAGER def iter_partition_ids(self) -> Iterator[str]: """Iterate over configured partition ids. @@ -698,6 +882,7 @@ async def run(self, inputs: SquashPersonOverridesInputs): dictionary_name=inputs.dictionary_name, team_ids=inputs.team_ids, dry_run=inputs.dry_run, + postgres_person_overrides_manager=inputs.postgres_person_overrides_manager, ) async with person_overrides_dictionary( diff --git a/posthog/temporal/tests/test_squash_person_overrides_workflow.py b/posthog/temporal/tests/test_squash_person_overrides_workflow.py index fa72cb585b6b5..4e90610914ef4 100644 --- a/posthog/temporal/tests/test_squash_person_overrides_workflow.py +++ b/posthog/temporal/tests/test_squash_person_overrides_workflow.py @@ -1,8 +1,8 @@ import operator import random -from collections import defaultdict, namedtuple +from collections import defaultdict from datetime import datetime, timedelta -from typing import TypedDict +from typing import Iterator, NamedTuple, TypedDict from uuid import UUID, uuid4 import psycopg2 @@ -23,6 +23,9 @@ PERSON_OVERRIDES_CREATE_TABLE_SQL, ) from posthog.temporal.batch_exports.squash_person_overrides import ( + POSTGRES_PERSON_OVERRIDES_MANAGERS, + PersonOverrideTuple, + PostgresPersonOverridesManager, QueryInputs, SerializablePersonOverrideToDelete, SquashPersonOverridesInputs, @@ -69,7 +72,7 @@ def activity_environment(): @pytest.fixture -def person_overrides_table(query_inputs): +def person_overrides_table(): """Manage person_overrides tables for testing.""" sync_execute(PERSON_OVERRIDES_CREATE_TABLE_SQL) sync_execute(KAFKA_PERSON_OVERRIDES_TABLE_SQL) @@ -83,9 +86,6 @@ def person_overrides_table(query_inputs): sync_execute("DROP TABLE person_overrides") -PersonOverrideTuple = namedtuple("PersonOverrideTuple", ("old_person_id", "override_person_id")) - - OVERRIDES_CREATED_AT = datetime.fromisoformat("2020-01-02T00:00:00.123123+00:00") OLDEST_EVENT_AT = OVERRIDES_CREATED_AT - timedelta(days=1) @@ -129,9 +129,7 @@ def person_overrides_data(person_overrides_table): @pytest.fixture def query_inputs(): """A default set of QueryInputs to use in all tests.""" - query_inputs = QueryInputs() - - return query_inputs + return QueryInputs() @pytest.mark.django_db @@ -928,7 +926,7 @@ def team_id(query_inputs, organization_uuid, pg_connection): """, {"organization_uuid": organization_uuid}, ) - team_id = cursor.fetchone() + [team_id] = cursor.fetchone() yield team_id @@ -937,104 +935,67 @@ def team_id(query_inputs, organization_uuid, pg_connection): cursor.execute("DELETE FROM posthog_team WHERE id = %s", [team_id]) -@pytest.fixture -def person_overrides(query_inputs, team_id, pg_connection): +class PostgresPersonOverrideFixtures(NamedTuple): + manager: str + override: PersonOverrideTuple + + +@pytest.fixture(params=POSTGRES_PERSON_OVERRIDES_MANAGERS.keys()) +def postgres_person_override_fixtures( + request, query_inputs: QueryInputs, team_id, pg_connection +) -> Iterator[PostgresPersonOverrideFixtures]: """Create a PersonOverrideMapping and a PersonOverride. We cannot use the Django ORM safely in an async context, so we INSERT INTO directly on the database. This means we need to clean up after ourselves, which we do after yielding. """ - old_person_id = uuid4() - override_person_id = uuid4() - person_override = PersonOverrideTuple(old_person_id, override_person_id) + # XXX: Several activity-based tests use this person overrides fixture and + # should vary their behavior to ensure that they work with both the old + # (mappings) and new (flat) approaches, but not all tests that use + # `query_inputs` need to be vary on the overrides manager type as many of + # them don't use Postgres overrides at all. To ensure that whenever Postgres + # overrides *are* used, we need to update the fixture here. This indirection + # isn't good, but this code should be short-lived, right? (... right???) + query_inputs.postgres_person_overrides_manager = request.param - with pg_connection: - with pg_connection.cursor() as cursor: - person_ids = [] - for person_uuid in (override_person_id, old_person_id): - cursor.execute( - """ - INSERT INTO posthog_personoverridemapping( - team_id, - uuid - ) - VALUES ( - %(team_id)s, - %(uuid)s - ) - ON CONFLICT("team_id", "uuid") DO NOTHING - RETURNING id - """, - {"team_id": team_id, "uuid": person_uuid}, - ) - person_ids.append(cursor.fetchone()) + override = PersonOverrideTuple(uuid4(), uuid4()) - cursor.execute( - """ - INSERT INTO posthog_personoverride( - team_id, - old_person_id, - override_person_id, - oldest_event, - version - ) - VALUES ( - %(team_id)s, - %(old_person_id)s, - %(override_person_id)s, - NOW(), - 1 - ); - """, - { - "team_id": team_id, - "old_person_id": person_ids[1], - "override_person_id": person_ids[0], - }, - ) + with pg_connection: + query_inputs.get_postgres_person_overrides_manager(pg_connection).insert(team_id, override) - yield person_override + yield PostgresPersonOverrideFixtures(request.param, override) with pg_connection: - with pg_connection.cursor() as cursor: - cursor.execute( - "DELETE FROM posthog_personoverride WHERE team_id = %s AND old_person_id = %s", - [team_id, person_ids[1]], - ) - cursor.execute( - "DELETE FROM posthog_personoverridemapping WHERE team_id = %s AND (uuid = %s OR uuid = %s)", - [team_id, old_person_id, override_person_id], - ) + query_inputs.get_postgres_person_overrides_manager(pg_connection).clear(team_id) @pytest.mark.django_db @pytest.mark.asyncio async def test_delete_squashed_person_overrides_from_postgres( - query_inputs, activity_environment, team_id, person_overrides, pg_connection + query_inputs, + activity_environment, + team_id, + postgres_person_override_fixtures: PostgresPersonOverrideFixtures, + pg_connection, ): """Test we can delete person overrides that have already been squashed. For the purposes of this unit test, we take the person overrides as given. A comprehensive test will cover the entire worflow end-to-end. """ + override = postgres_person_override_fixtures.override + # These are sanity checks to ensure the fixtures are working properly. # If any assertions fail here, its likely a test setup issue. with pg_connection: - with pg_connection.cursor() as cursor: - cursor.execute("SELECT id, team_id, uuid FROM posthog_personoverridemapping") - mappings = cursor.fetchall() - assert len(mappings) == 2 - - cursor.execute("SELECT * FROM posthog_personoverride") - overrides = cursor.fetchall() - assert len(overrides) == 1 + assert query_inputs.get_postgres_person_overrides_manager(pg_connection).fetchall(team_id) == [override] person_overrides_to_delete = [ SerializablePersonOverrideToDelete( team_id, - person_overrides.old_person_id, - person_overrides.override_person_id, + override.old_person_id, + override.override_person_id, OVERRIDES_CREATED_AT.isoformat(), 1, OLDEST_EVENT_AT.isoformat(), @@ -1046,40 +1007,31 @@ async def test_delete_squashed_person_overrides_from_postgres( await activity_environment.run(delete_squashed_person_overrides_from_postgres, query_inputs) with pg_connection: - with pg_connection.cursor() as cursor: - cursor.execute("SELECT team_id, uuid FROM posthog_personoverridemapping") - mappings = cursor.fetchall() - assert len(mappings) == 1 - assert mappings[0][1] == person_overrides.override_person_id - - cursor.execute("SELECT * FROM posthog_personoverride") - overrides = cursor.fetchall() - assert len(overrides) == 0 + assert query_inputs.get_postgres_person_overrides_manager(pg_connection).fetchall(team_id) == [] @pytest.mark.django_db @pytest.mark.asyncio async def test_delete_squashed_person_overrides_from_postgres_dry_run( - query_inputs, activity_environment, team_id, person_overrides, pg_connection + query_inputs, + activity_environment, + team_id, + postgres_person_override_fixtures: PostgresPersonOverrideFixtures, + pg_connection, ): """Test we do not delete person overrides when dry_run=True.""" + override = postgres_person_override_fixtures.override + # These are sanity checks to ensure the fixtures are working properly. # If any assertions fail here, its likely a test setup issue. with pg_connection: - with pg_connection.cursor() as cursor: - cursor.execute("SELECT id, team_id, uuid FROM posthog_personoverridemapping") - mappings = cursor.fetchall() - assert len(mappings) == 2 - - cursor.execute("SELECT * FROM posthog_personoverride") - overrides = cursor.fetchall() - assert len(overrides) == 1 + assert query_inputs.get_postgres_person_overrides_manager(pg_connection).fetchall(team_id) == [override] person_overrides_to_delete = [ SerializablePersonOverrideToDelete( team_id, - person_overrides.old_person_id, - person_overrides.override_person_id, + override.old_person_id, + override.override_person_id, OVERRIDES_CREATED_AT.isoformat(), 1, OLDEST_EVENT_AT.isoformat(), @@ -1091,30 +1043,32 @@ async def test_delete_squashed_person_overrides_from_postgres_dry_run( await activity_environment.run(delete_squashed_person_overrides_from_postgres, query_inputs) with pg_connection: - with pg_connection.cursor() as cursor: - cursor.execute("SELECT team_id, uuid FROM posthog_personoverridemapping") - mappings = cursor.fetchall() - assert len(mappings) == 2 - assert mappings[0][1] == person_overrides.override_person_id - - cursor.execute("SELECT * FROM posthog_personoverride") - overrides = cursor.fetchall() - assert len(overrides) == 1 + assert query_inputs.get_postgres_person_overrides_manager(pg_connection).fetchall(team_id) == [override] @pytest.mark.django_db @pytest.mark.asyncio async def test_delete_squashed_person_overrides_from_postgres_with_newer_override( - query_inputs, activity_environment, team_id, person_overrides, pg_connection + query_inputs, + activity_environment, + team_id, + postgres_person_override_fixtures: PostgresPersonOverrideFixtures, + pg_connection, ): """Test we do not delete a newer mapping from Postgres. For the purposes of this unit test, we take the person overrides as given. A comprehensive test will cover the entire worflow end-to-end. """ + override = postgres_person_override_fixtures.override + # These are sanity checks to ensure the fixtures are working properly. # If any assertions fail here, its likely a test setup issue. with pg_connection: + overrides_manager = query_inputs.get_postgres_person_overrides_manager(pg_connection) + if not isinstance(overrides_manager, PostgresPersonOverridesManager): + pytest.xfail(f"{overrides_manager!r} does not support mappings") + with pg_connection.cursor() as cursor: cursor.execute("SELECT id, team_id, uuid FROM posthog_personoverridemapping") mappings = cursor.fetchall() @@ -1138,9 +1092,7 @@ async def test_delete_squashed_person_overrides_from_postgres_with_newer_overrid """, { "team_id": team_id, - "old_person_id": [ - mapping[0] for mapping in mappings if mapping[2] == person_overrides.old_person_id - ][0], + "old_person_id": [mapping[0] for mapping in mappings if mapping[2] == override.old_person_id][0], }, ) @@ -1148,8 +1100,8 @@ async def test_delete_squashed_person_overrides_from_postgres_with_newer_overrid # We are schedulling for deletion an override with lower version number, so nothing should happen. SerializablePersonOverrideToDelete( team_id, - person_overrides.old_person_id, - person_overrides.override_person_id, + override.old_person_id, + override.override_person_id, OVERRIDES_CREATED_AT.isoformat(), 1, OLDEST_EVENT_AT.isoformat(), @@ -1167,8 +1119,8 @@ async def test_delete_squashed_person_overrides_from_postgres_with_newer_overrid # Nothing was deleted from mappings table assert len(mappings) == 2 - assert person_overrides.override_person_id in [mapping[2] for mapping in mappings] - assert person_overrides.old_person_id in [mapping[2] for mapping in mappings] + assert override.override_person_id in [mapping[2] for mapping in mappings] + assert override.old_person_id in [mapping[2] for mapping in mappings] cursor.execute("SELECT team_id, old_person_id, override_person_id, version FROM posthog_personoverride") overrides = cursor.fetchall() @@ -1178,12 +1130,10 @@ async def test_delete_squashed_person_overrides_from_postgres_with_newer_overrid team_id, old_person_id, override_person_id, version = overrides[0] assert team_id == team_id - assert ( - old_person_id == [mapping[0] for mapping in mappings if mapping[2] == person_overrides.old_person_id][0] - ) + assert old_person_id == [mapping[0] for mapping in mappings if mapping[2] == override.old_person_id][0] assert ( override_person_id - == [mapping[0] for mapping in mappings if mapping[2] == person_overrides.override_person_id][0] + == [mapping[0] for mapping in mappings if mapping[2] == override.override_person_id][0] ) assert version == 2 @@ -1191,10 +1141,9 @@ async def test_delete_squashed_person_overrides_from_postgres_with_newer_overrid @pytest.mark.django_db @pytest.mark.asyncio async def test_squash_person_overrides_workflow( - query_inputs, events_to_override, person_overrides_data, - person_overrides, + postgres_person_override_fixtures: PostgresPersonOverrideFixtures, person_overrides_table, ): """Test the squash_person_overrides workflow end-to-end.""" @@ -1207,6 +1156,7 @@ async def test_squash_person_overrides_workflow( inputs = SquashPersonOverridesInputs( partition_ids=["202001"], dry_run=False, + postgres_person_overrides_manager=postgres_person_override_fixtures.manager, ) async with Worker( @@ -1240,10 +1190,9 @@ async def test_squash_person_overrides_workflow( @pytest.mark.django_db @pytest.mark.asyncio async def test_squash_person_overrides_workflow_with_newer_overrides( - query_inputs, events_to_override, person_overrides_data, - person_overrides, + postgres_person_override_fixtures: PostgresPersonOverrideFixtures, newer_overrides, ): """Test the squash_person_overrides workflow end-to-end with newer overrides.""" @@ -1256,6 +1205,7 @@ async def test_squash_person_overrides_workflow_with_newer_overrides( inputs = SquashPersonOverridesInputs( partition_ids=["202001"], dry_run=False, + postgres_person_overrides_manager=postgres_person_override_fixtures.manager, ) async with Worker( @@ -1286,7 +1236,9 @@ async def test_squash_person_overrides_workflow_with_newer_overrides( @pytest.mark.django_db @pytest.mark.asyncio async def test_squash_person_overrides_workflow_with_limited_team_ids( - query_inputs, events_to_override, person_overrides_data, person_overrides + events_to_override, + person_overrides_data, + postgres_person_override_fixtures: PostgresPersonOverrideFixtures, ): """Test the squash_person_overrides workflow end-to-end.""" client = await Client.connect( @@ -1299,6 +1251,7 @@ async def test_squash_person_overrides_workflow_with_limited_team_ids( inputs = SquashPersonOverridesInputs( partition_ids=["202001"], team_ids=[random_team], + postgres_person_overrides_manager=postgres_person_override_fixtures.manager, dry_run=False, )