Skip to content

Commit

Permalink
fix: Add where predicate to alter table query (#21382)
Browse files Browse the repository at this point in the history
* fix: Fix deletion job by using join tables

* fix tests

* Update query snapshots

* drop table

* fix

* Update query snapshots

* remove te4st files

* fix

* Update query snapshots

* Update posthog/models/async_deletion/delete_events.py

---------

Co-authored-by: github-actions <41898282+github-actions[bot]@users.noreply.github.com>
Co-authored-by: James Greenhill <[email protected]>
  • Loading branch information
3 people authored Apr 9, 2024
1 parent 5c323f1 commit 9f68d55
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 50 deletions.
2 changes: 1 addition & 1 deletion plugin-server/src/worker/ingestion/process-event.ts
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ export class EventsProcessor {
}

let groupsColumns: Record<string, string | ClickHouseTimestamp> = {}
let eventPersonProperties: string = '{}'
let eventPersonProperties = '{}'
if (processPerson) {
const groupIdentifiers = this.getGroupIdentifiers(properties)
groupsColumns = await this.db.getGroupsColumns(teamId, groupIdentifiers)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ describe('eachBatchParallelIngestion with overflow reroute', () => {
let closeServer: () => Promise<void>
let queue: any

function createBatchWithMultipleEvents(events: any[], timestamp?: any, withKey: boolean = true): Message[] {
function createBatchWithMultipleEvents(events: any[], timestamp?: any, withKey = true): Message[] {
return events.map((event, i) => ({
partition: 0,
topic: KAFKA_EVENTS_PLUGIN_INGESTION,
Expand Down
19 changes: 13 additions & 6 deletions posthog/models/async_deletion/delete_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@ def process(self, deletions: List[AsyncDeletion]):
logger.debug("No AsyncDeletion to perform")
return

team_ids = list({row.team_id for row in deletions})

logger.info(
"Starting AsyncDeletion on `events` table in ClickHouse",
{
"count": len(deletions),
"team_ids": list({row.team_id for row in deletions}),
"team_ids": team_ids,
},
)
temp_table_name = f"{CLICKHOUSE_DATABASE}.async_deletion_run"
Expand All @@ -49,10 +51,13 @@ def process(self, deletions: List[AsyncDeletion]):
ON CLUSTER '{CLICKHOUSE_CLUSTER}'
DELETE
WHERE
joinGet({temp_table_name}, 'id', team_id, 0, toString(team_id)) > 0 OR
joinGet({temp_table_name}, 'id', team_id, 1, toString(person_id)) > 0
team_id IN %(team_ids)s AND
(
joinGet({temp_table_name}, 'id', team_id, 0, toString(team_id)) > 0 OR
joinGet({temp_table_name}, 'id', team_id, 1, toString(person_id)) > 0
)
""",
{},
{"team_ids": team_ids},
workload=Workload.OFFLINE,
)

Expand All @@ -74,9 +79,11 @@ def process(self, deletions: List[AsyncDeletion]):
f"""
ALTER TABLE {table}
ON CLUSTER '{CLICKHOUSE_CLUSTER}'
DELETE WHERE joinGet({temp_table_name}, 'id', team_id, 0, toString(team_id)) > 0
DELETE WHERE
team_id IN %(team_ids)s AND
joinGet({temp_table_name}, 'id', team_id, 0, toString(team_id)) > 0
""",
{},
{"team_ids": [deletion.team_id for deletion in team_deletions]},
workload=Workload.OFFLINE,
)

Expand Down
Loading

0 comments on commit 9f68d55

Please sign in to comment.