diff --git a/plugin-server/src/worker/ingestion/process-event.ts b/plugin-server/src/worker/ingestion/process-event.ts index a3e9f384848b2..8b57e26190046 100644 --- a/plugin-server/src/worker/ingestion/process-event.ts +++ b/plugin-server/src/worker/ingestion/process-event.ts @@ -218,7 +218,7 @@ export class EventsProcessor { } let groupsColumns: Record = {} - let eventPersonProperties: string = '{}' + let eventPersonProperties = '{}' if (processPerson) { const groupIdentifiers = this.getGroupIdentifiers(properties) groupsColumns = await this.db.getGroupsColumns(teamId, groupIdentifiers) diff --git a/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-consumer.test.ts b/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-consumer.test.ts index 25470fc699765..0aac89c33b289 100644 --- a/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-consumer.test.ts +++ b/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-consumer.test.ts @@ -56,7 +56,7 @@ describe('eachBatchParallelIngestion with overflow reroute', () => { let closeServer: () => Promise let queue: any - function createBatchWithMultipleEvents(events: any[], timestamp?: any, withKey: boolean = true): Message[] { + function createBatchWithMultipleEvents(events: any[], timestamp?: any, withKey = true): Message[] { return events.map((event, i) => ({ partition: 0, topic: KAFKA_EVENTS_PLUGIN_INGESTION, diff --git a/posthog/models/async_deletion/delete_events.py b/posthog/models/async_deletion/delete_events.py index de762db837cf3..2486043a5b871 100644 --- a/posthog/models/async_deletion/delete_events.py +++ b/posthog/models/async_deletion/delete_events.py @@ -27,11 +27,13 @@ def process(self, deletions: List[AsyncDeletion]): logger.debug("No AsyncDeletion to perform") return + team_ids = list({row.team_id for row in deletions}) + logger.info( "Starting AsyncDeletion on `events` table in ClickHouse", { "count": len(deletions), - "team_ids": list({row.team_id for row in deletions}), + "team_ids": team_ids, }, ) temp_table_name = f"{CLICKHOUSE_DATABASE}.async_deletion_run" @@ -49,10 +51,13 @@ def process(self, deletions: List[AsyncDeletion]): ON CLUSTER '{CLICKHOUSE_CLUSTER}' DELETE WHERE - joinGet({temp_table_name}, 'id', team_id, 0, toString(team_id)) > 0 OR - joinGet({temp_table_name}, 'id', team_id, 1, toString(person_id)) > 0 + team_id IN %(team_ids)s AND + ( + joinGet({temp_table_name}, 'id', team_id, 0, toString(team_id)) > 0 OR + joinGet({temp_table_name}, 'id', team_id, 1, toString(person_id)) > 0 + ) """, - {}, + {"team_ids": team_ids}, workload=Workload.OFFLINE, ) @@ -74,9 +79,11 @@ def process(self, deletions: List[AsyncDeletion]): f""" ALTER TABLE {table} ON CLUSTER '{CLICKHOUSE_CLUSTER}' - DELETE WHERE joinGet({temp_table_name}, 'id', team_id, 0, toString(team_id)) > 0 + DELETE WHERE + team_id IN %(team_ids)s AND + joinGet({temp_table_name}, 'id', team_id, 0, toString(team_id)) > 0 """, - {}, + {"team_ids": [deletion.team_id for deletion in team_deletions]}, workload=Workload.OFFLINE, ) diff --git a/posthog/models/test/__snapshots__/test_async_deletion_model.ambr b/posthog/models/test/__snapshots__/test_async_deletion_model.ambr index 3535fc5010a5c..2a19e48368d94 100644 --- a/posthog/models/test/__snapshots__/test_async_deletion_model.ambr +++ b/posthog/models/test/__snapshots__/test_async_deletion_model.ambr @@ -4,8 +4,9 @@ ALTER TABLE sharded_events ON CLUSTER 'posthog' DELETE - WHERE joinGet(posthog_test.async_deletion_run, 'id', team_id, 0, toString(team_id)) > 0 - OR joinGet(posthog_test.async_deletion_run, 'id', team_id, 1, toString(person_id)) > 0 + WHERE team_id IN [1, 2, 3, 4, 5 /* ... */] + AND (joinGet(posthog_test.async_deletion_run, 'id', team_id, 0, toString(team_id)) > 0 + OR joinGet(posthog_test.async_deletion_run, 'id', team_id, 1, toString(person_id)) > 0) ''' # --- # name: TestAsyncDeletion.test_delete_auxilary_models_via_team.1 @@ -13,7 +14,8 @@ ALTER TABLE person ON CLUSTER 'posthog' DELETE - WHERE joinGet(posthog_test.async_deletion_run, 'id', team_id, 0, toString(team_id)) > 0 + WHERE team_id IN [1, 2, 3, 4, 5 /* ... */] + AND joinGet(posthog_test.async_deletion_run, 'id', team_id, 0, toString(team_id)) > 0 ''' # --- # name: TestAsyncDeletion.test_delete_auxilary_models_via_team.2 @@ -21,7 +23,8 @@ ALTER TABLE person_distinct_id ON CLUSTER 'posthog' DELETE - WHERE joinGet(posthog_test.async_deletion_run, 'id', team_id, 0, toString(team_id)) > 0 + WHERE team_id IN [1, 2, 3, 4, 5 /* ... */] + AND joinGet(posthog_test.async_deletion_run, 'id', team_id, 0, toString(team_id)) > 0 ''' # --- # name: TestAsyncDeletion.test_delete_auxilary_models_via_team.3 @@ -29,7 +32,8 @@ ALTER TABLE person_distinct_id2 ON CLUSTER 'posthog' DELETE - WHERE joinGet(posthog_test.async_deletion_run, 'id', team_id, 0, toString(team_id)) > 0 + WHERE team_id IN [1, 2, 3, 4, 5 /* ... */] + AND joinGet(posthog_test.async_deletion_run, 'id', team_id, 0, toString(team_id)) > 0 ''' # --- # name: TestAsyncDeletion.test_delete_auxilary_models_via_team.4 @@ -37,7 +41,8 @@ ALTER TABLE groups ON CLUSTER 'posthog' DELETE - WHERE joinGet(posthog_test.async_deletion_run, 'id', team_id, 0, toString(team_id)) > 0 + WHERE team_id IN [1, 2, 3, 4, 5 /* ... */] + AND joinGet(posthog_test.async_deletion_run, 'id', team_id, 0, toString(team_id)) > 0 ''' # --- # name: TestAsyncDeletion.test_delete_auxilary_models_via_team.5 @@ -45,7 +50,8 @@ ALTER TABLE cohortpeople ON CLUSTER 'posthog' DELETE - WHERE joinGet(posthog_test.async_deletion_run, 'id', team_id, 0, toString(team_id)) > 0 + WHERE team_id IN [1, 2, 3, 4, 5 /* ... */] + AND joinGet(posthog_test.async_deletion_run, 'id', team_id, 0, toString(team_id)) > 0 ''' # --- # name: TestAsyncDeletion.test_delete_auxilary_models_via_team.6 @@ -53,7 +59,8 @@ ALTER TABLE person_static_cohort ON CLUSTER 'posthog' DELETE - WHERE joinGet(posthog_test.async_deletion_run, 'id', team_id, 0, toString(team_id)) > 0 + WHERE team_id IN [1, 2, 3, 4, 5 /* ... */] + AND joinGet(posthog_test.async_deletion_run, 'id', team_id, 0, toString(team_id)) > 0 ''' # --- # name: TestAsyncDeletion.test_delete_auxilary_models_via_team.7 @@ -61,7 +68,8 @@ ALTER TABLE plugin_log_entries ON CLUSTER 'posthog' DELETE - WHERE joinGet(posthog_test.async_deletion_run, 'id', team_id, 0, toString(team_id)) > 0 + WHERE team_id IN [1, 2, 3, 4, 5 /* ... */] + AND joinGet(posthog_test.async_deletion_run, 'id', team_id, 0, toString(team_id)) > 0 ''' # --- # name: TestAsyncDeletion.test_delete_auxilary_models_via_team_unrelated @@ -69,8 +77,9 @@ ALTER TABLE sharded_events ON CLUSTER 'posthog' DELETE - WHERE joinGet(posthog_test.async_deletion_run, 'id', team_id, 0, toString(team_id)) > 0 - OR joinGet(posthog_test.async_deletion_run, 'id', team_id, 1, toString(person_id)) > 0 + WHERE team_id IN [1, 2, 3, 4, 5 /* ... */] + AND (joinGet(posthog_test.async_deletion_run, 'id', team_id, 0, toString(team_id)) > 0 + OR joinGet(posthog_test.async_deletion_run, 'id', team_id, 1, toString(person_id)) > 0) ''' # --- # name: TestAsyncDeletion.test_delete_auxilary_models_via_team_unrelated.1 @@ -78,7 +87,8 @@ ALTER TABLE person ON CLUSTER 'posthog' DELETE - WHERE joinGet(posthog_test.async_deletion_run, 'id', team_id, 0, toString(team_id)) > 0 + WHERE team_id IN [1, 2, 3, 4, 5 /* ... */] + AND joinGet(posthog_test.async_deletion_run, 'id', team_id, 0, toString(team_id)) > 0 ''' # --- # name: TestAsyncDeletion.test_delete_auxilary_models_via_team_unrelated.2 @@ -86,7 +96,8 @@ ALTER TABLE person_distinct_id ON CLUSTER 'posthog' DELETE - WHERE joinGet(posthog_test.async_deletion_run, 'id', team_id, 0, toString(team_id)) > 0 + WHERE team_id IN [1, 2, 3, 4, 5 /* ... */] + AND joinGet(posthog_test.async_deletion_run, 'id', team_id, 0, toString(team_id)) > 0 ''' # --- # name: TestAsyncDeletion.test_delete_auxilary_models_via_team_unrelated.3 @@ -94,7 +105,8 @@ ALTER TABLE person_distinct_id2 ON CLUSTER 'posthog' DELETE - WHERE joinGet(posthog_test.async_deletion_run, 'id', team_id, 0, toString(team_id)) > 0 + WHERE team_id IN [1, 2, 3, 4, 5 /* ... */] + AND joinGet(posthog_test.async_deletion_run, 'id', team_id, 0, toString(team_id)) > 0 ''' # --- # name: TestAsyncDeletion.test_delete_auxilary_models_via_team_unrelated.4 @@ -102,7 +114,8 @@ ALTER TABLE groups ON CLUSTER 'posthog' DELETE - WHERE joinGet(posthog_test.async_deletion_run, 'id', team_id, 0, toString(team_id)) > 0 + WHERE team_id IN [1, 2, 3, 4, 5 /* ... */] + AND joinGet(posthog_test.async_deletion_run, 'id', team_id, 0, toString(team_id)) > 0 ''' # --- # name: TestAsyncDeletion.test_delete_auxilary_models_via_team_unrelated.5 @@ -110,7 +123,8 @@ ALTER TABLE cohortpeople ON CLUSTER 'posthog' DELETE - WHERE joinGet(posthog_test.async_deletion_run, 'id', team_id, 0, toString(team_id)) > 0 + WHERE team_id IN [1, 2, 3, 4, 5 /* ... */] + AND joinGet(posthog_test.async_deletion_run, 'id', team_id, 0, toString(team_id)) > 0 ''' # --- # name: TestAsyncDeletion.test_delete_auxilary_models_via_team_unrelated.6 @@ -118,7 +132,8 @@ ALTER TABLE person_static_cohort ON CLUSTER 'posthog' DELETE - WHERE joinGet(posthog_test.async_deletion_run, 'id', team_id, 0, toString(team_id)) > 0 + WHERE team_id IN [1, 2, 3, 4, 5 /* ... */] + AND joinGet(posthog_test.async_deletion_run, 'id', team_id, 0, toString(team_id)) > 0 ''' # --- # name: TestAsyncDeletion.test_delete_auxilary_models_via_team_unrelated.7 @@ -126,7 +141,8 @@ ALTER TABLE plugin_log_entries ON CLUSTER 'posthog' DELETE - WHERE joinGet(posthog_test.async_deletion_run, 'id', team_id, 0, toString(team_id)) > 0 + WHERE team_id IN [1, 2, 3, 4, 5 /* ... */] + AND joinGet(posthog_test.async_deletion_run, 'id', team_id, 0, toString(team_id)) > 0 ''' # --- # name: TestAsyncDeletion.test_delete_cohortpeople @@ -146,8 +162,9 @@ ALTER TABLE sharded_events ON CLUSTER 'posthog' DELETE - WHERE joinGet(posthog_test.async_deletion_run, 'id', team_id, 0, toString(team_id)) > 0 - OR joinGet(posthog_test.async_deletion_run, 'id', team_id, 1, toString(person_id)) > 0 + WHERE team_id IN [1, 2, 3, 4, 5 /* ... */] + AND (joinGet(posthog_test.async_deletion_run, 'id', team_id, 0, toString(team_id)) > 0 + OR joinGet(posthog_test.async_deletion_run, 'id', team_id, 1, toString(person_id)) > 0) ''' # --- # name: TestAsyncDeletion.test_delete_person_unrelated @@ -155,8 +172,9 @@ ALTER TABLE sharded_events ON CLUSTER 'posthog' DELETE - WHERE joinGet(posthog_test.async_deletion_run, 'id', team_id, 0, toString(team_id)) > 0 - OR joinGet(posthog_test.async_deletion_run, 'id', team_id, 1, toString(person_id)) > 0 + WHERE team_id IN [1, 2, 3, 4, 5 /* ... */] + AND (joinGet(posthog_test.async_deletion_run, 'id', team_id, 0, toString(team_id)) > 0 + OR joinGet(posthog_test.async_deletion_run, 'id', team_id, 1, toString(person_id)) > 0) ''' # --- # name: TestAsyncDeletion.test_delete_teams @@ -164,8 +182,9 @@ ALTER TABLE sharded_events ON CLUSTER 'posthog' DELETE - WHERE joinGet(posthog_test.async_deletion_run, 'id', team_id, 0, toString(team_id)) > 0 - OR joinGet(posthog_test.async_deletion_run, 'id', team_id, 1, toString(person_id)) > 0 + WHERE team_id IN [1, 2, 3, 4, 5 /* ... */] + AND (joinGet(posthog_test.async_deletion_run, 'id', team_id, 0, toString(team_id)) > 0 + OR joinGet(posthog_test.async_deletion_run, 'id', team_id, 1, toString(person_id)) > 0) ''' # --- # name: TestAsyncDeletion.test_delete_teams.1 @@ -173,7 +192,8 @@ ALTER TABLE person ON CLUSTER 'posthog' DELETE - WHERE joinGet(posthog_test.async_deletion_run, 'id', team_id, 0, toString(team_id)) > 0 + WHERE team_id IN [1, 2, 3, 4, 5 /* ... */] + AND joinGet(posthog_test.async_deletion_run, 'id', team_id, 0, toString(team_id)) > 0 ''' # --- # name: TestAsyncDeletion.test_delete_teams.2 @@ -181,7 +201,8 @@ ALTER TABLE person_distinct_id ON CLUSTER 'posthog' DELETE - WHERE joinGet(posthog_test.async_deletion_run, 'id', team_id, 0, toString(team_id)) > 0 + WHERE team_id IN [1, 2, 3, 4, 5 /* ... */] + AND joinGet(posthog_test.async_deletion_run, 'id', team_id, 0, toString(team_id)) > 0 ''' # --- # name: TestAsyncDeletion.test_delete_teams.3 @@ -189,7 +210,8 @@ ALTER TABLE person_distinct_id2 ON CLUSTER 'posthog' DELETE - WHERE joinGet(posthog_test.async_deletion_run, 'id', team_id, 0, toString(team_id)) > 0 + WHERE team_id IN [1, 2, 3, 4, 5 /* ... */] + AND joinGet(posthog_test.async_deletion_run, 'id', team_id, 0, toString(team_id)) > 0 ''' # --- # name: TestAsyncDeletion.test_delete_teams.4 @@ -197,7 +219,8 @@ ALTER TABLE groups ON CLUSTER 'posthog' DELETE - WHERE joinGet(posthog_test.async_deletion_run, 'id', team_id, 0, toString(team_id)) > 0 + WHERE team_id IN [1, 2, 3, 4, 5 /* ... */] + AND joinGet(posthog_test.async_deletion_run, 'id', team_id, 0, toString(team_id)) > 0 ''' # --- # name: TestAsyncDeletion.test_delete_teams.5 @@ -205,7 +228,8 @@ ALTER TABLE cohortpeople ON CLUSTER 'posthog' DELETE - WHERE joinGet(posthog_test.async_deletion_run, 'id', team_id, 0, toString(team_id)) > 0 + WHERE team_id IN [1, 2, 3, 4, 5 /* ... */] + AND joinGet(posthog_test.async_deletion_run, 'id', team_id, 0, toString(team_id)) > 0 ''' # --- # name: TestAsyncDeletion.test_delete_teams.6 @@ -213,7 +237,8 @@ ALTER TABLE person_static_cohort ON CLUSTER 'posthog' DELETE - WHERE joinGet(posthog_test.async_deletion_run, 'id', team_id, 0, toString(team_id)) > 0 + WHERE team_id IN [1, 2, 3, 4, 5 /* ... */] + AND joinGet(posthog_test.async_deletion_run, 'id', team_id, 0, toString(team_id)) > 0 ''' # --- # name: TestAsyncDeletion.test_delete_teams.7 @@ -221,7 +246,8 @@ ALTER TABLE plugin_log_entries ON CLUSTER 'posthog' DELETE - WHERE joinGet(posthog_test.async_deletion_run, 'id', team_id, 0, toString(team_id)) > 0 + WHERE team_id IN [1, 2, 3, 4, 5 /* ... */] + AND joinGet(posthog_test.async_deletion_run, 'id', team_id, 0, toString(team_id)) > 0 ''' # --- # name: TestAsyncDeletion.test_delete_teams_unrelated @@ -229,8 +255,9 @@ ALTER TABLE sharded_events ON CLUSTER 'posthog' DELETE - WHERE joinGet(posthog_test.async_deletion_run, 'id', team_id, 0, toString(team_id)) > 0 - OR joinGet(posthog_test.async_deletion_run, 'id', team_id, 1, toString(person_id)) > 0 + WHERE team_id IN [1, 2, 3, 4, 5 /* ... */] + AND (joinGet(posthog_test.async_deletion_run, 'id', team_id, 0, toString(team_id)) > 0 + OR joinGet(posthog_test.async_deletion_run, 'id', team_id, 1, toString(person_id)) > 0) ''' # --- # name: TestAsyncDeletion.test_delete_teams_unrelated.1 @@ -238,7 +265,8 @@ ALTER TABLE person ON CLUSTER 'posthog' DELETE - WHERE joinGet(posthog_test.async_deletion_run, 'id', team_id, 0, toString(team_id)) > 0 + WHERE team_id IN [1, 2, 3, 4, 5 /* ... */] + AND joinGet(posthog_test.async_deletion_run, 'id', team_id, 0, toString(team_id)) > 0 ''' # --- # name: TestAsyncDeletion.test_delete_teams_unrelated.2 @@ -246,7 +274,8 @@ ALTER TABLE person_distinct_id ON CLUSTER 'posthog' DELETE - WHERE joinGet(posthog_test.async_deletion_run, 'id', team_id, 0, toString(team_id)) > 0 + WHERE team_id IN [1, 2, 3, 4, 5 /* ... */] + AND joinGet(posthog_test.async_deletion_run, 'id', team_id, 0, toString(team_id)) > 0 ''' # --- # name: TestAsyncDeletion.test_delete_teams_unrelated.3 @@ -254,7 +283,8 @@ ALTER TABLE person_distinct_id2 ON CLUSTER 'posthog' DELETE - WHERE joinGet(posthog_test.async_deletion_run, 'id', team_id, 0, toString(team_id)) > 0 + WHERE team_id IN [1, 2, 3, 4, 5 /* ... */] + AND joinGet(posthog_test.async_deletion_run, 'id', team_id, 0, toString(team_id)) > 0 ''' # --- # name: TestAsyncDeletion.test_delete_teams_unrelated.4 @@ -262,7 +292,8 @@ ALTER TABLE groups ON CLUSTER 'posthog' DELETE - WHERE joinGet(posthog_test.async_deletion_run, 'id', team_id, 0, toString(team_id)) > 0 + WHERE team_id IN [1, 2, 3, 4, 5 /* ... */] + AND joinGet(posthog_test.async_deletion_run, 'id', team_id, 0, toString(team_id)) > 0 ''' # --- # name: TestAsyncDeletion.test_delete_teams_unrelated.5 @@ -270,7 +301,8 @@ ALTER TABLE cohortpeople ON CLUSTER 'posthog' DELETE - WHERE joinGet(posthog_test.async_deletion_run, 'id', team_id, 0, toString(team_id)) > 0 + WHERE team_id IN [1, 2, 3, 4, 5 /* ... */] + AND joinGet(posthog_test.async_deletion_run, 'id', team_id, 0, toString(team_id)) > 0 ''' # --- # name: TestAsyncDeletion.test_delete_teams_unrelated.6 @@ -278,7 +310,8 @@ ALTER TABLE person_static_cohort ON CLUSTER 'posthog' DELETE - WHERE joinGet(posthog_test.async_deletion_run, 'id', team_id, 0, toString(team_id)) > 0 + WHERE team_id IN [1, 2, 3, 4, 5 /* ... */] + AND joinGet(posthog_test.async_deletion_run, 'id', team_id, 0, toString(team_id)) > 0 ''' # --- # name: TestAsyncDeletion.test_delete_teams_unrelated.7 @@ -286,7 +319,8 @@ ALTER TABLE plugin_log_entries ON CLUSTER 'posthog' DELETE - WHERE joinGet(posthog_test.async_deletion_run, 'id', team_id, 0, toString(team_id)) > 0 + WHERE team_id IN [1, 2, 3, 4, 5 /* ... */] + AND joinGet(posthog_test.async_deletion_run, 'id', team_id, 0, toString(team_id)) > 0 ''' # --- # name: TestAsyncDeletion.test_mark_deletions_done_person diff --git a/posthog/models/test/test_async_deletion_model.py b/posthog/models/test/test_async_deletion_model.py index ff0302d941dec..2d276fa0cceb9 100644 --- a/posthog/models/test/test_async_deletion_model.py +++ b/posthog/models/test/test_async_deletion_model.py @@ -116,6 +116,7 @@ def test_mark_deletions_done_person_when_not_done(self): @snapshot_clickhouse_alter_queries def test_delete_teams(self): _create_event(event_uuid=uuid4(), event="event1", team=self.teams[0], distinct_id="1") + _create_event(event_uuid=uuid4(), event="event2", team=self.teams[1], distinct_id="2") AsyncDeletion.objects.create( deletion_type=DeletionType.Team, @@ -123,6 +124,12 @@ def test_delete_teams(self): key=str(self.teams[0].pk), created_by=self.user, ) + AsyncDeletion.objects.create( + deletion_type=DeletionType.Team, + team_id=self.teams[1].pk, + key=str(self.teams[1].pk), + created_by=self.user, + ) AsyncEventDeletion().run() diff --git a/posthog/test/base.py b/posthog/test/base.py index c7066e62c85cb..a6d6afcb9c4f2 100644 --- a/posthog/test/base.py +++ b/posthog/test/base.py @@ -481,7 +481,8 @@ def assertQueryMatchesSnapshot(self, query, params=None, replace_all_numbers=Fal # :TRICKY: team_id changes every test, avoid it messing with snapshots. if replace_all_numbers: query = re.sub(r"(\"?) = \d+", r"\1 = 2", query) - query = re.sub(r"(\"?) IN \(\d+(, \d+)*\)", r"\1 IN (1, 2, 3, 4, 5 /* ... */)", query) + query = re.sub(r"(\"?) IN \(\d+(, ?\d+)*\)", r"\1 IN (1, 2, 3, 4, 5 /* ... */)", query) + query = re.sub(r"(\"?) IN \[\d+(, ?\d+)*\]", r"\1 IN [1, 2, 3, 4, 5 /* ... */]", query) # replace "uuid" IN ('00000000-0000-4000-8000-000000000001'::uuid) effectively: query = re.sub( r"\"uuid\" IN \('[0-9a-f-]{36}'(::uuid)?(, '[0-9a-f-]{36}'(::uuid)?)*\)", @@ -974,7 +975,7 @@ def wrapped(self, *args, **kwargs): for query in queries: if "FROM system.columns" not in query: - self.assertQueryMatchesSnapshot(query) + self.assertQueryMatchesSnapshot(query, replace_all_numbers=True) return wrapped