Skip to content

Commit

Permalink
refactor(plugin-server): Extract person override handling from `Perso…
Browse files Browse the repository at this point in the history
…nState` (#18974)
  • Loading branch information
tkaemming authored Nov 29, 2023
1 parent b0a9ce2 commit b9db624
Showing 1 changed file with 56 additions and 40 deletions.
96 changes: 56 additions & 40 deletions plugin-server/src/worker/ingestion/person-state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { Counter } from 'prom-client'
import { KAFKA_PERSON_OVERRIDE } from '../../config/kafka-topics'
import { Person, PropertyUpdateOperation, TimestampFormat } from '../../types'
import { DB } from '../../utils/db/db'
import { PostgresUse, TransactionClient } from '../../utils/db/postgres'
import { PostgresRouter, PostgresUse, TransactionClient } from '../../utils/db/postgres'
import { timeoutGuard } from '../../utils/db/utils'
import { promiseRetry } from '../../utils/retries'
import { status } from '../../utils/status'
Expand Down Expand Up @@ -519,7 +519,12 @@ export class PersonState {

let personOverrideMessages: ProducerRecord[] = []
if (this.poEEmbraceJoin) {
personOverrideMessages = [await this.addPersonOverride(otherPerson, mergeInto, tx)]
personOverrideMessages = await new PersonOverrideWriter(this.db.postgres).addPersonOverride(
tx,
this.teamId,
otherPerson,
mergeInto
)
}

return [
Expand All @@ -544,12 +549,21 @@ export class PersonState {
.inc()
return result
}
}

class PersonOverrideWriter {
constructor(private postgres: PostgresRouter) {}

private async addPersonOverride(
public async addPersonOverride(
tx: TransactionClient,
teamId: number,
oldPerson: Person,
overridePerson: Person,
tx: TransactionClient
): Promise<ProducerRecord> {
overridePerson: Person
): Promise<ProducerRecord[]> {
if (teamId != oldPerson.team_id || teamId != overridePerson.team_id) {
throw new Error('cannot merge persons across different teams')
}

const mergedAt = DateTime.now()
const oldestEvent = overridePerson.created_at
/**
Expand All @@ -559,10 +573,10 @@ export class PersonState {
2. Add an override from oldPerson to override person
3. Update any entries that have oldPerson as the override person to now also point to the new override person. Note that we don't update `oldest_event`, because it's a heuristic (used to optimise squashing) tied to the old_person and nothing changed about the old_person who's events need to get squashed.
*/
const oldPersonId = await this.addPersonOverrideMapping(oldPerson, tx)
const overridePersonId = await this.addPersonOverrideMapping(overridePerson, tx)
const oldPersonId = await this.addPersonOverrideMapping(tx, oldPerson)
const overridePersonId = await this.addPersonOverrideMapping(tx, overridePerson)

await this.db.postgres.query(
await this.postgres.query(
tx,
SQL`
INSERT INTO posthog_personoverride (
Expand All @@ -572,7 +586,7 @@ export class PersonState {
oldest_event,
version
) VALUES (
${this.teamId},
${teamId},
${oldPersonId},
${overridePersonId},
${oldestEvent},
Expand All @@ -585,7 +599,7 @@ export class PersonState {

// The follow-up JOIN is required as ClickHouse requires UUIDs, so we need to fetch the UUIDs
// of the IDs we updated from the mapping table.
const { rows: transitiveUpdates } = await this.db.postgres.query(
const { rows: transitiveUpdates } = await this.postgres.query(
tx,
SQL`
WITH updated_ids AS (
Expand All @@ -594,7 +608,7 @@ export class PersonState {
SET
override_person_id = ${overridePersonId}, version = COALESCE(version, 0)::numeric + 1
WHERE
team_id = ${this.teamId} AND override_person_id = ${oldPersonId}
team_id = ${teamId} AND override_person_id = ${oldPersonId}
RETURNING
old_person_id,
version,
Expand All @@ -617,36 +631,38 @@ export class PersonState {

status.debug('🔁', 'person_overrides_updated', { transitiveUpdates })

const personOverrideMessages: ProducerRecord = {
topic: KAFKA_PERSON_OVERRIDE,
messages: [
{
value: JSON.stringify({
team_id: oldPerson.team_id,
merged_at: castTimestampOrNow(mergedAt, TimestampFormat.ClickHouse),
override_person_id: overridePerson.uuid,
old_person_id: oldPerson.uuid,
oldest_event: castTimestampOrNow(oldestEvent, TimestampFormat.ClickHouse),
version: 0,
}),
},
...transitiveUpdates.map(({ old_person_id, version, oldest_event }) => ({
value: JSON.stringify({
team_id: oldPerson.team_id,
merged_at: castTimestampOrNow(mergedAt, TimestampFormat.ClickHouse),
override_person_id: overridePerson.uuid,
old_person_id: old_person_id,
oldest_event: castTimestampOrNow(oldest_event, TimestampFormat.ClickHouse),
version: version,
}),
})),
],
}
const personOverrideMessages: ProducerRecord[] = [
{
topic: KAFKA_PERSON_OVERRIDE,
messages: [
{
value: JSON.stringify({
team_id: teamId,
merged_at: castTimestampOrNow(mergedAt, TimestampFormat.ClickHouse),
override_person_id: overridePerson.uuid,
old_person_id: oldPerson.uuid,
oldest_event: castTimestampOrNow(oldestEvent, TimestampFormat.ClickHouse),
version: 0,
}),
},
...transitiveUpdates.map(({ old_person_id, version, oldest_event }) => ({
value: JSON.stringify({
team_id: teamId,
merged_at: castTimestampOrNow(mergedAt, TimestampFormat.ClickHouse),
override_person_id: overridePerson.uuid,
old_person_id: old_person_id,
oldest_event: castTimestampOrNow(oldest_event, TimestampFormat.ClickHouse),
version: version,
}),
})),
],
},
]

return personOverrideMessages
}

private async addPersonOverrideMapping(person: Person, tx: TransactionClient): Promise<number> {
private async addPersonOverrideMapping(tx: TransactionClient, person: Person): Promise<number> {
/**
Update the helper table that serves as a mapping between a serial ID and a Person UUID.
Expand All @@ -660,15 +676,15 @@ export class PersonState {
// as we map int ids to UUIDs (the latter not supported in exclusion contraints).
const {
rows: [{ id }],
} = await this.db.postgres.query(
} = await this.postgres.query(
tx,
`WITH insert_id AS (
INSERT INTO posthog_personoverridemapping(
team_id,
uuid
)
VALUES (
${this.teamId},
${person.team_id},
'${person.uuid}'
)
ON CONFLICT("team_id", "uuid") DO NOTHING
Expand Down

0 comments on commit b9db624

Please sign in to comment.