diff --git a/plugin-server/src/worker/ingestion/person-state.ts b/plugin-server/src/worker/ingestion/person-state.ts index a65ada8e072f9..07758813bb9b3 100644 --- a/plugin-server/src/worker/ingestion/person-state.ts +++ b/plugin-server/src/worker/ingestion/person-state.ts @@ -8,7 +8,7 @@ import { Counter } from 'prom-client' import { KAFKA_PERSON_OVERRIDE } from '../../config/kafka-topics' import { Person, PropertyUpdateOperation, TimestampFormat } from '../../types' import { DB } from '../../utils/db/db' -import { PostgresUse, TransactionClient } from '../../utils/db/postgres' +import { PostgresRouter, PostgresUse, TransactionClient } from '../../utils/db/postgres' import { timeoutGuard } from '../../utils/db/utils' import { promiseRetry } from '../../utils/retries' import { status } from '../../utils/status' @@ -519,7 +519,12 @@ export class PersonState { let personOverrideMessages: ProducerRecord[] = [] if (this.poEEmbraceJoin) { - personOverrideMessages = [await this.addPersonOverride(otherPerson, mergeInto, tx)] + personOverrideMessages = await new PersonOverrideWriter(this.db.postgres).addPersonOverride( + tx, + this.teamId, + otherPerson, + mergeInto + ) } return [ @@ -544,12 +549,21 @@ export class PersonState { .inc() return result } +} + +class PersonOverrideWriter { + constructor(private postgres: PostgresRouter) {} - private async addPersonOverride( + public async addPersonOverride( + tx: TransactionClient, + teamId: number, oldPerson: Person, - overridePerson: Person, - tx: TransactionClient - ): Promise { + overridePerson: Person + ): Promise { + if (teamId != oldPerson.team_id || teamId != overridePerson.team_id) { + throw new Error('cannot merge persons across different teams') + } + const mergedAt = DateTime.now() const oldestEvent = overridePerson.created_at /** @@ -559,10 +573,10 @@ export class PersonState { 2. Add an override from oldPerson to override person 3. Update any entries that have oldPerson as the override person to now also point to the new override person. Note that we don't update `oldest_event`, because it's a heuristic (used to optimise squashing) tied to the old_person and nothing changed about the old_person who's events need to get squashed. */ - const oldPersonId = await this.addPersonOverrideMapping(oldPerson, tx) - const overridePersonId = await this.addPersonOverrideMapping(overridePerson, tx) + const oldPersonId = await this.addPersonOverrideMapping(tx, oldPerson) + const overridePersonId = await this.addPersonOverrideMapping(tx, overridePerson) - await this.db.postgres.query( + await this.postgres.query( tx, SQL` INSERT INTO posthog_personoverride ( @@ -572,7 +586,7 @@ export class PersonState { oldest_event, version ) VALUES ( - ${this.teamId}, + ${teamId}, ${oldPersonId}, ${overridePersonId}, ${oldestEvent}, @@ -585,7 +599,7 @@ export class PersonState { // The follow-up JOIN is required as ClickHouse requires UUIDs, so we need to fetch the UUIDs // of the IDs we updated from the mapping table. - const { rows: transitiveUpdates } = await this.db.postgres.query( + const { rows: transitiveUpdates } = await this.postgres.query( tx, SQL` WITH updated_ids AS ( @@ -594,7 +608,7 @@ export class PersonState { SET override_person_id = ${overridePersonId}, version = COALESCE(version, 0)::numeric + 1 WHERE - team_id = ${this.teamId} AND override_person_id = ${oldPersonId} + team_id = ${teamId} AND override_person_id = ${oldPersonId} RETURNING old_person_id, version, @@ -617,36 +631,38 @@ export class PersonState { status.debug('🔁', 'person_overrides_updated', { transitiveUpdates }) - const personOverrideMessages: ProducerRecord = { - topic: KAFKA_PERSON_OVERRIDE, - messages: [ - { - value: JSON.stringify({ - team_id: oldPerson.team_id, - merged_at: castTimestampOrNow(mergedAt, TimestampFormat.ClickHouse), - override_person_id: overridePerson.uuid, - old_person_id: oldPerson.uuid, - oldest_event: castTimestampOrNow(oldestEvent, TimestampFormat.ClickHouse), - version: 0, - }), - }, - ...transitiveUpdates.map(({ old_person_id, version, oldest_event }) => ({ - value: JSON.stringify({ - team_id: oldPerson.team_id, - merged_at: castTimestampOrNow(mergedAt, TimestampFormat.ClickHouse), - override_person_id: overridePerson.uuid, - old_person_id: old_person_id, - oldest_event: castTimestampOrNow(oldest_event, TimestampFormat.ClickHouse), - version: version, - }), - })), - ], - } + const personOverrideMessages: ProducerRecord[] = [ + { + topic: KAFKA_PERSON_OVERRIDE, + messages: [ + { + value: JSON.stringify({ + team_id: teamId, + merged_at: castTimestampOrNow(mergedAt, TimestampFormat.ClickHouse), + override_person_id: overridePerson.uuid, + old_person_id: oldPerson.uuid, + oldest_event: castTimestampOrNow(oldestEvent, TimestampFormat.ClickHouse), + version: 0, + }), + }, + ...transitiveUpdates.map(({ old_person_id, version, oldest_event }) => ({ + value: JSON.stringify({ + team_id: teamId, + merged_at: castTimestampOrNow(mergedAt, TimestampFormat.ClickHouse), + override_person_id: overridePerson.uuid, + old_person_id: old_person_id, + oldest_event: castTimestampOrNow(oldest_event, TimestampFormat.ClickHouse), + version: version, + }), + })), + ], + }, + ] return personOverrideMessages } - private async addPersonOverrideMapping(person: Person, tx: TransactionClient): Promise { + private async addPersonOverrideMapping(tx: TransactionClient, person: Person): Promise { /** Update the helper table that serves as a mapping between a serial ID and a Person UUID. @@ -660,7 +676,7 @@ export class PersonState { // as we map int ids to UUIDs (the latter not supported in exclusion contraints). const { rows: [{ id }], - } = await this.db.postgres.query( + } = await this.postgres.query( tx, `WITH insert_id AS ( INSERT INTO posthog_personoverridemapping( @@ -668,7 +684,7 @@ export class PersonState { uuid ) VALUES ( - ${this.teamId}, + ${person.team_id}, '${person.uuid}' ) ON CONFLICT("team_id", "uuid") DO NOTHING