Skip to content

Commit

Permalink
refactor(plugin-server): Add deferred person override writer (#19007)
Browse files Browse the repository at this point in the history
  • Loading branch information
tkaemming authored and Twixes committed Dec 6, 2023
1 parent 2fccbe8 commit f311e6d
Show file tree
Hide file tree
Showing 7 changed files with 477 additions and 105 deletions.
2 changes: 1 addition & 1 deletion latest_migrations.manifest
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ contenttypes: 0002_remove_content_type_name
ee: 0015_add_verified_properties
otp_static: 0002_throttling
otp_totp: 0002_auto_20190420_0723
posthog: 0370_externaldatajob_workflow_id
posthog: 0371_pendingpersonoverride
sessions: 0001_initial
social_django: 0010_uid_db_index
two_factor: 0007_auto_20201201_1019
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { Person } from 'types'

import { normalizeEvent } from '../../../utils/event'
import { status } from '../../../utils/status'
import { PersonState } from '../person-state'
import { PersonOverrideWriter, PersonState } from '../person-state'
import { parseEventTimestamp } from '../timestamps'
import { EventPipelineRunner } from './runner'

Expand All @@ -28,7 +28,7 @@ export async function processPersonsStep(
String(event.distinct_id),
timestamp,
runner.hub.db,
runner.poEEmbraceJoin
runner.poEEmbraceJoin ? new PersonOverrideWriter(runner.hub.db.postgres) : undefined
).update()

return [event, person]
Expand Down
201 changes: 160 additions & 41 deletions plugin-server/src/worker/ingestion/person-state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import * as Sentry from '@sentry/node'
import { ProducerRecord } from 'kafkajs'
import { DateTime } from 'luxon'
import { Counter } from 'prom-client'
import { KafkaProducerWrapper } from 'utils/db/kafka-producer-wrapper'

import { KAFKA_PERSON_OVERRIDE } from '../../config/kafka-topics'
import { Person, PropertyUpdateOperation, TimestampFormat } from '../../types'
Expand Down Expand Up @@ -90,15 +91,14 @@ export class PersonState {

private db: DB
public updateIsIdentified: boolean // TODO: remove this from the class and being hidden
private poEEmbraceJoin: boolean

constructor(
event: PluginEvent,
teamId: number,
distinctId: string,
timestamp: DateTime,
db: DB,
poEEmbraceJoin = false,
private personOverrideWriter?: PersonOverrideWriter | DeferredPersonOverrideWriter,
uuid: UUIDT | undefined = undefined,
maxMergeAttempts: number = MAX_FAILED_PERSON_MERGE_ATTEMPTS
) {
Expand All @@ -115,9 +115,6 @@ export class PersonState {
// If set to true, we'll update `is_identified` at the end of `updateProperties`
// :KLUDGE: This is an indirect communication channel between `handleIdentifyOrAlias` and `updateProperties`
this.updateIsIdentified = false

// For persons on events embrace the join gradual roll-out, remove after fully rolled out
this.poEEmbraceJoin = poEEmbraceJoin
}

async update(): Promise<Person> {
Expand Down Expand Up @@ -432,7 +429,7 @@ export class PersonState {
const properties: Properties = { ...otherPerson.properties, ...mergeInto.properties }
this.applyEventPropertyUpdates(properties)

if (this.poEEmbraceJoin) {
if (this.personOverrideWriter) {
// Optimize merging persons to keep using the person id that has longer history,
// which means we'll have less events to update during the squash later
if (otherPerson.created_at < mergeInto.created_at) {
Expand Down Expand Up @@ -467,7 +464,7 @@ export class PersonState {
call: this.event.event, // $identify, $create_alias or $merge_dangerously
oldPersonIdentified: String(otherPerson.is_identified),
newPersonIdentified: String(mergeInto.is_identified),
poEEmbraceJoin: String(this.poEEmbraceJoin),
poEEmbraceJoin: String(!!this.personOverrideWriter),
})
.inc()

Expand Down Expand Up @@ -499,12 +496,10 @@ export class PersonState {
const deletePersonMessages = await this.db.deletePerson(otherPerson, tx)

let personOverrideMessages: ProducerRecord[] = []
if (this.poEEmbraceJoin) {
personOverrideMessages = await new PersonOverrideWriter(this.db.postgres).addPersonOverride(
if (this.personOverrideWriter) {
personOverrideMessages = await this.personOverrideWriter.addPersonOverride(
tx,
this.teamId,
otherPerson,
mergeInto
getPersonOverrideDetails(this.teamId, otherPerson, mergeInto)
)
}

Expand All @@ -525,37 +520,64 @@ export class PersonState {
call: this.event.event, // $identify, $create_alias or $merge_dangerously
oldPersonIdentified: String(otherPerson.is_identified),
newPersonIdentified: String(mergeInto.is_identified),
poEEmbraceJoin: String(this.poEEmbraceJoin),
poEEmbraceJoin: String(!!this.personOverrideWriter),
})
.inc()
return result
}
}

class PersonOverrideWriter {
/**
* A record of a merge operation occurring.
*
* These property names need to be kept in sync with the ``PersonOverride``
* Django model (and ``posthog_personoverride`` table schema) as defined in
* ``posthog/models/person/person.py``.
*/
type PersonOverrideDetails = {
team_id: number
old_person_id: string
override_person_id: string
oldest_event: DateTime
}

function getPersonOverrideDetails(teamId: number, oldPerson: Person, overridePerson: Person): PersonOverrideDetails {
if (teamId != oldPerson.team_id || teamId != overridePerson.team_id) {
throw new Error('cannot merge persons across different teams')
}
return {
team_id: teamId,
old_person_id: oldPerson.uuid,
override_person_id: overridePerson.uuid,
oldest_event: overridePerson.created_at,
}
}

export class PersonOverrideWriter {
constructor(private postgres: PostgresRouter) {}

public async addPersonOverride(
tx: TransactionClient,
teamId: number,
oldPerson: Person,
overridePerson: Person
overrideDetails: PersonOverrideDetails
): Promise<ProducerRecord[]> {
if (teamId != oldPerson.team_id || teamId != overridePerson.team_id) {
throw new Error('cannot merge persons across different teams')
}

const mergedAt = DateTime.now()
const oldestEvent = overridePerson.created_at
/**
We'll need to do 4 updates:
1. Add the persons involved to the helper table (2 of them)
2. Add an override from oldPerson to override person
3. Update any entries that have oldPerson as the override person to now also point to the new override person. Note that we don't update `oldest_event`, because it's a heuristic (used to optimise squashing) tied to the old_person and nothing changed about the old_person who's events need to get squashed.
*/
const oldPersonId = await this.addPersonOverrideMapping(tx, oldPerson)
const overridePersonId = await this.addPersonOverrideMapping(tx, overridePerson)
const oldPersonMappingId = await this.addPersonOverrideMapping(
tx,
overrideDetails.team_id,
overrideDetails.old_person_id
)
const overridePersonMappingId = await this.addPersonOverrideMapping(
tx,
overrideDetails.team_id,
overrideDetails.override_person_id
)

await this.postgres.query(
tx,
Expand All @@ -567,10 +589,10 @@ class PersonOverrideWriter {
oldest_event,
version
) VALUES (
${teamId},
${oldPersonId},
${overridePersonId},
${oldestEvent},
${overrideDetails.team_id},
${oldPersonMappingId},
${overridePersonMappingId},
${overrideDetails.oldest_event},
0
)
`,
Expand All @@ -587,9 +609,9 @@ class PersonOverrideWriter {
UPDATE
posthog_personoverride
SET
override_person_id = ${overridePersonId}, version = COALESCE(version, 0)::numeric + 1
override_person_id = ${overridePersonMappingId}, version = COALESCE(version, 0)::numeric + 1
WHERE
team_id = ${teamId} AND override_person_id = ${oldPersonId}
team_id = ${overrideDetails.team_id} AND override_person_id = ${oldPersonMappingId}
RETURNING
old_person_id,
version,
Expand Down Expand Up @@ -618,21 +640,21 @@ class PersonOverrideWriter {
messages: [
{
value: JSON.stringify({
team_id: teamId,
team_id: overrideDetails.team_id,
old_person_id: overrideDetails.old_person_id,
override_person_id: overrideDetails.override_person_id,
oldest_event: castTimestampOrNow(overrideDetails.oldest_event, TimestampFormat.ClickHouse),
merged_at: castTimestampOrNow(mergedAt, TimestampFormat.ClickHouse),
override_person_id: overridePerson.uuid,
old_person_id: oldPerson.uuid,
oldest_event: castTimestampOrNow(oldestEvent, TimestampFormat.ClickHouse),
version: 0,
}),
},
...transitiveUpdates.map(({ old_person_id, version, oldest_event }) => ({
value: JSON.stringify({
team_id: teamId,
merged_at: castTimestampOrNow(mergedAt, TimestampFormat.ClickHouse),
override_person_id: overridePerson.uuid,
team_id: overrideDetails.team_id,
old_person_id: old_person_id,
override_person_id: overrideDetails.override_person_id,
oldest_event: castTimestampOrNow(oldest_event, TimestampFormat.ClickHouse),
merged_at: castTimestampOrNow(mergedAt, TimestampFormat.ClickHouse),
version: version,
}),
})),
Expand All @@ -643,7 +665,7 @@ class PersonOverrideWriter {
return personOverrideMessages
}

private async addPersonOverrideMapping(tx: TransactionClient, person: Person): Promise<number> {
private async addPersonOverrideMapping(tx: TransactionClient, teamId: number, personId: string): Promise<number> {
/**
Update the helper table that serves as a mapping between a serial ID and a Person UUID.
Expand All @@ -665,8 +687,8 @@ class PersonOverrideWriter {
uuid
)
VALUES (
${person.team_id},
'${person.uuid}'
${teamId},
'${personId}'
)
ON CONFLICT("team_id", "uuid") DO NOTHING
RETURNING id
Expand All @@ -675,7 +697,7 @@ class PersonOverrideWriter {
UNION ALL
SELECT id
FROM posthog_personoverridemapping
WHERE uuid = '${person.uuid}'
WHERE team_id = ${teamId} AND uuid = '${personId}'
`,
undefined,
'personOverrideMapping'
Expand All @@ -685,6 +707,103 @@ class PersonOverrideWriter {
}
}

export class DeferredPersonOverrideWriter {
/**
* @param lockId the lock identifier/key used to ensure that only one
* process is updating the overrides at a time
*/
constructor(private postgres: PostgresRouter, private lockId: number) {}

/**
* Enqueue an override for deferred processing.
*/
public async addPersonOverride(
tx: TransactionClient,
overrideDetails: PersonOverrideDetails
): Promise<ProducerRecord[]> {
await this.postgres.query(
tx,
SQL`
INSERT INTO posthog_pendingpersonoverride (
team_id,
old_person_id,
override_person_id,
oldest_event
) VALUES (
${overrideDetails.team_id},
${overrideDetails.old_person_id},
${overrideDetails.override_person_id},
${overrideDetails.oldest_event}
)`,
undefined,
'pendingPersonOverride'
)

return []
}

/**
* Process all (or up to the given limit) pending overrides.
*
* An advisory lock is acquired prior to processing to ensure that this
* function has exclusive access to the pending overrides during the update
* process.
*
* @returns the number of overrides processed
*/
public async processPendingOverrides(kafkaProducer: KafkaProducerWrapper, limit?: number): Promise<number> {
const writer = new PersonOverrideWriter(this.postgres)

return await this.postgres.transaction(PostgresUse.COMMON_WRITE, 'processPendingOverrides', async (tx) => {
const {
rows: [{ acquired }],
} = await this.postgres.query(
tx,
SQL`SELECT pg_try_advisory_xact_lock(${this.lockId}) as acquired`,
undefined,
'processPendingOverrides'
)
if (!acquired) {
throw new Error('could not acquire lock')
}

// n.b.: Ordering by id ensures we are processing in (roughly) FIFO order
const { rows } = await this.postgres.query(
tx,
`SELECT * FROM posthog_pendingpersonoverride ORDER BY id` +
(limit !== undefined ? ` LIMIT ${limit}` : ''),
undefined,
'processPendingOverrides'
)

const messages: ProducerRecord[] = []
for (const { id, ...mergeOperation } of rows) {
messages.push(...(await writer.addPersonOverride(tx, mergeOperation)))
await this.postgres.query(
tx,
SQL`DELETE FROM posthog_pendingpersonoverride WHERE id = ${id}`,
undefined,
'processPendingOverrides'
)
}

// n.b.: We publish the messages here (and wait for acks) to ensure
// that all of our override updates are sent to Kafka prior to
// committing the transaction. If we're unable to publish, we should
// discard updates and try again later when it's available -- not
// doing so would cause the copy of this data in ClickHouse to
// slowly drift out of sync with the copy in Postgres. This write is
// safe to retry if we write to Kafka but then fail to commit to
// Postgres for some reason -- the same row state should be
// generated each call, and the receiving ReplacingMergeTree will
// ensure we keep only the latest version after all writes settle.)
await kafkaProducer.queueMessages(messages, true)

return rows.length
})
}
}

function SQL(sqlParts: TemplateStringsArray, ...args: any[]): { text: string; values: any[] } {
// Generates a node-pq compatible query object given a tagged
// template literal. The intention is to remove the need to match up
Expand Down
19 changes: 19 additions & 0 deletions plugin-server/tests/helpers/promises.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,22 @@ export function createPromise<T = void>(): MockPromise<T> {

return result as MockPromise<T>
}

export class WaitEvent {
private promise: Promise<void>
private resolve: () => void

constructor() {
this.promise = new Promise((resolve) => {
this.resolve = resolve
})
}

public set() {
this.resolve()
}

public async wait() {
return this.promise
}
}
Loading

0 comments on commit f311e6d

Please sign in to comment.