From f311e6d8fccdad8431cdc9ba7659c0dbd8470ba5 Mon Sep 17 00:00:00 2001 From: ted kaemming <65315+tkaemming@users.noreply.github.com> Date: Tue, 5 Dec 2023 13:34:41 -0800 Subject: [PATCH] refactor(plugin-server): Add deferred person override writer (#19007) --- latest_migrations.manifest | 2 +- .../event-pipeline/processPersonsStep.ts | 4 +- .../src/worker/ingestion/person-state.ts | 201 ++++++++--- plugin-server/tests/helpers/promises.ts | 19 + .../worker/ingestion/person-state.test.ts | 326 ++++++++++++++---- .../migrations/0371_pendingpersonoverride.py | 22 ++ posthog/models/person/person.py | 8 + 7 files changed, 477 insertions(+), 105 deletions(-) create mode 100644 posthog/migrations/0371_pendingpersonoverride.py diff --git a/latest_migrations.manifest b/latest_migrations.manifest index def342b71e44bf..4dad3bbf643bf1 100644 --- a/latest_migrations.manifest +++ b/latest_migrations.manifest @@ -5,7 +5,7 @@ contenttypes: 0002_remove_content_type_name ee: 0015_add_verified_properties otp_static: 0002_throttling otp_totp: 0002_auto_20190420_0723 -posthog: 0370_externaldatajob_workflow_id +posthog: 0371_pendingpersonoverride sessions: 0001_initial social_django: 0010_uid_db_index two_factor: 0007_auto_20201201_1019 diff --git a/plugin-server/src/worker/ingestion/event-pipeline/processPersonsStep.ts b/plugin-server/src/worker/ingestion/event-pipeline/processPersonsStep.ts index 6a4cd6c4874cae..cb5c7467332599 100644 --- a/plugin-server/src/worker/ingestion/event-pipeline/processPersonsStep.ts +++ b/plugin-server/src/worker/ingestion/event-pipeline/processPersonsStep.ts @@ -4,7 +4,7 @@ import { Person } from 'types' import { normalizeEvent } from '../../../utils/event' import { status } from '../../../utils/status' -import { PersonState } from '../person-state' +import { PersonOverrideWriter, PersonState } from '../person-state' import { parseEventTimestamp } from '../timestamps' import { EventPipelineRunner } from './runner' @@ -28,7 +28,7 @@ export async function processPersonsStep( String(event.distinct_id), timestamp, runner.hub.db, - runner.poEEmbraceJoin + runner.poEEmbraceJoin ? new PersonOverrideWriter(runner.hub.db.postgres) : undefined ).update() return [event, person] diff --git a/plugin-server/src/worker/ingestion/person-state.ts b/plugin-server/src/worker/ingestion/person-state.ts index 250f95e4e67b7b..340fbfcb97d6eb 100644 --- a/plugin-server/src/worker/ingestion/person-state.ts +++ b/plugin-server/src/worker/ingestion/person-state.ts @@ -3,6 +3,7 @@ import * as Sentry from '@sentry/node' import { ProducerRecord } from 'kafkajs' import { DateTime } from 'luxon' import { Counter } from 'prom-client' +import { KafkaProducerWrapper } from 'utils/db/kafka-producer-wrapper' import { KAFKA_PERSON_OVERRIDE } from '../../config/kafka-topics' import { Person, PropertyUpdateOperation, TimestampFormat } from '../../types' @@ -90,7 +91,6 @@ export class PersonState { private db: DB public updateIsIdentified: boolean // TODO: remove this from the class and being hidden - private poEEmbraceJoin: boolean constructor( event: PluginEvent, @@ -98,7 +98,7 @@ export class PersonState { distinctId: string, timestamp: DateTime, db: DB, - poEEmbraceJoin = false, + private personOverrideWriter?: PersonOverrideWriter | DeferredPersonOverrideWriter, uuid: UUIDT | undefined = undefined, maxMergeAttempts: number = MAX_FAILED_PERSON_MERGE_ATTEMPTS ) { @@ -115,9 +115,6 @@ export class PersonState { // If set to true, we'll update `is_identified` at the end of `updateProperties` // :KLUDGE: This is an indirect communication channel between `handleIdentifyOrAlias` and `updateProperties` this.updateIsIdentified = false - - // For persons on events embrace the join gradual roll-out, remove after fully rolled out - this.poEEmbraceJoin = poEEmbraceJoin } async update(): Promise { @@ -432,7 +429,7 @@ export class PersonState { const properties: Properties = { ...otherPerson.properties, ...mergeInto.properties } this.applyEventPropertyUpdates(properties) - if (this.poEEmbraceJoin) { + if (this.personOverrideWriter) { // Optimize merging persons to keep using the person id that has longer history, // which means we'll have less events to update during the squash later if (otherPerson.created_at < mergeInto.created_at) { @@ -467,7 +464,7 @@ export class PersonState { call: this.event.event, // $identify, $create_alias or $merge_dangerously oldPersonIdentified: String(otherPerson.is_identified), newPersonIdentified: String(mergeInto.is_identified), - poEEmbraceJoin: String(this.poEEmbraceJoin), + poEEmbraceJoin: String(!!this.personOverrideWriter), }) .inc() @@ -499,12 +496,10 @@ export class PersonState { const deletePersonMessages = await this.db.deletePerson(otherPerson, tx) let personOverrideMessages: ProducerRecord[] = [] - if (this.poEEmbraceJoin) { - personOverrideMessages = await new PersonOverrideWriter(this.db.postgres).addPersonOverride( + if (this.personOverrideWriter) { + personOverrideMessages = await this.personOverrideWriter.addPersonOverride( tx, - this.teamId, - otherPerson, - mergeInto + getPersonOverrideDetails(this.teamId, otherPerson, mergeInto) ) } @@ -525,28 +520,47 @@ export class PersonState { call: this.event.event, // $identify, $create_alias or $merge_dangerously oldPersonIdentified: String(otherPerson.is_identified), newPersonIdentified: String(mergeInto.is_identified), - poEEmbraceJoin: String(this.poEEmbraceJoin), + poEEmbraceJoin: String(!!this.personOverrideWriter), }) .inc() return result } } -class PersonOverrideWriter { +/** + * A record of a merge operation occurring. + * + * These property names need to be kept in sync with the ``PersonOverride`` + * Django model (and ``posthog_personoverride`` table schema) as defined in + * ``posthog/models/person/person.py``. + */ +type PersonOverrideDetails = { + team_id: number + old_person_id: string + override_person_id: string + oldest_event: DateTime +} + +function getPersonOverrideDetails(teamId: number, oldPerson: Person, overridePerson: Person): PersonOverrideDetails { + if (teamId != oldPerson.team_id || teamId != overridePerson.team_id) { + throw new Error('cannot merge persons across different teams') + } + return { + team_id: teamId, + old_person_id: oldPerson.uuid, + override_person_id: overridePerson.uuid, + oldest_event: overridePerson.created_at, + } +} + +export class PersonOverrideWriter { constructor(private postgres: PostgresRouter) {} public async addPersonOverride( tx: TransactionClient, - teamId: number, - oldPerson: Person, - overridePerson: Person + overrideDetails: PersonOverrideDetails ): Promise { - if (teamId != oldPerson.team_id || teamId != overridePerson.team_id) { - throw new Error('cannot merge persons across different teams') - } - const mergedAt = DateTime.now() - const oldestEvent = overridePerson.created_at /** We'll need to do 4 updates: @@ -554,8 +568,16 @@ class PersonOverrideWriter { 2. Add an override from oldPerson to override person 3. Update any entries that have oldPerson as the override person to now also point to the new override person. Note that we don't update `oldest_event`, because it's a heuristic (used to optimise squashing) tied to the old_person and nothing changed about the old_person who's events need to get squashed. */ - const oldPersonId = await this.addPersonOverrideMapping(tx, oldPerson) - const overridePersonId = await this.addPersonOverrideMapping(tx, overridePerson) + const oldPersonMappingId = await this.addPersonOverrideMapping( + tx, + overrideDetails.team_id, + overrideDetails.old_person_id + ) + const overridePersonMappingId = await this.addPersonOverrideMapping( + tx, + overrideDetails.team_id, + overrideDetails.override_person_id + ) await this.postgres.query( tx, @@ -567,10 +589,10 @@ class PersonOverrideWriter { oldest_event, version ) VALUES ( - ${teamId}, - ${oldPersonId}, - ${overridePersonId}, - ${oldestEvent}, + ${overrideDetails.team_id}, + ${oldPersonMappingId}, + ${overridePersonMappingId}, + ${overrideDetails.oldest_event}, 0 ) `, @@ -587,9 +609,9 @@ class PersonOverrideWriter { UPDATE posthog_personoverride SET - override_person_id = ${overridePersonId}, version = COALESCE(version, 0)::numeric + 1 + override_person_id = ${overridePersonMappingId}, version = COALESCE(version, 0)::numeric + 1 WHERE - team_id = ${teamId} AND override_person_id = ${oldPersonId} + team_id = ${overrideDetails.team_id} AND override_person_id = ${oldPersonMappingId} RETURNING old_person_id, version, @@ -618,21 +640,21 @@ class PersonOverrideWriter { messages: [ { value: JSON.stringify({ - team_id: teamId, + team_id: overrideDetails.team_id, + old_person_id: overrideDetails.old_person_id, + override_person_id: overrideDetails.override_person_id, + oldest_event: castTimestampOrNow(overrideDetails.oldest_event, TimestampFormat.ClickHouse), merged_at: castTimestampOrNow(mergedAt, TimestampFormat.ClickHouse), - override_person_id: overridePerson.uuid, - old_person_id: oldPerson.uuid, - oldest_event: castTimestampOrNow(oldestEvent, TimestampFormat.ClickHouse), version: 0, }), }, ...transitiveUpdates.map(({ old_person_id, version, oldest_event }) => ({ value: JSON.stringify({ - team_id: teamId, - merged_at: castTimestampOrNow(mergedAt, TimestampFormat.ClickHouse), - override_person_id: overridePerson.uuid, + team_id: overrideDetails.team_id, old_person_id: old_person_id, + override_person_id: overrideDetails.override_person_id, oldest_event: castTimestampOrNow(oldest_event, TimestampFormat.ClickHouse), + merged_at: castTimestampOrNow(mergedAt, TimestampFormat.ClickHouse), version: version, }), })), @@ -643,7 +665,7 @@ class PersonOverrideWriter { return personOverrideMessages } - private async addPersonOverrideMapping(tx: TransactionClient, person: Person): Promise { + private async addPersonOverrideMapping(tx: TransactionClient, teamId: number, personId: string): Promise { /** Update the helper table that serves as a mapping between a serial ID and a Person UUID. @@ -665,8 +687,8 @@ class PersonOverrideWriter { uuid ) VALUES ( - ${person.team_id}, - '${person.uuid}' + ${teamId}, + '${personId}' ) ON CONFLICT("team_id", "uuid") DO NOTHING RETURNING id @@ -675,7 +697,7 @@ class PersonOverrideWriter { UNION ALL SELECT id FROM posthog_personoverridemapping - WHERE uuid = '${person.uuid}' + WHERE team_id = ${teamId} AND uuid = '${personId}' `, undefined, 'personOverrideMapping' @@ -685,6 +707,103 @@ class PersonOverrideWriter { } } +export class DeferredPersonOverrideWriter { + /** + * @param lockId the lock identifier/key used to ensure that only one + * process is updating the overrides at a time + */ + constructor(private postgres: PostgresRouter, private lockId: number) {} + + /** + * Enqueue an override for deferred processing. + */ + public async addPersonOverride( + tx: TransactionClient, + overrideDetails: PersonOverrideDetails + ): Promise { + await this.postgres.query( + tx, + SQL` + INSERT INTO posthog_pendingpersonoverride ( + team_id, + old_person_id, + override_person_id, + oldest_event + ) VALUES ( + ${overrideDetails.team_id}, + ${overrideDetails.old_person_id}, + ${overrideDetails.override_person_id}, + ${overrideDetails.oldest_event} + )`, + undefined, + 'pendingPersonOverride' + ) + + return [] + } + + /** + * Process all (or up to the given limit) pending overrides. + * + * An advisory lock is acquired prior to processing to ensure that this + * function has exclusive access to the pending overrides during the update + * process. + * + * @returns the number of overrides processed + */ + public async processPendingOverrides(kafkaProducer: KafkaProducerWrapper, limit?: number): Promise { + const writer = new PersonOverrideWriter(this.postgres) + + return await this.postgres.transaction(PostgresUse.COMMON_WRITE, 'processPendingOverrides', async (tx) => { + const { + rows: [{ acquired }], + } = await this.postgres.query( + tx, + SQL`SELECT pg_try_advisory_xact_lock(${this.lockId}) as acquired`, + undefined, + 'processPendingOverrides' + ) + if (!acquired) { + throw new Error('could not acquire lock') + } + + // n.b.: Ordering by id ensures we are processing in (roughly) FIFO order + const { rows } = await this.postgres.query( + tx, + `SELECT * FROM posthog_pendingpersonoverride ORDER BY id` + + (limit !== undefined ? ` LIMIT ${limit}` : ''), + undefined, + 'processPendingOverrides' + ) + + const messages: ProducerRecord[] = [] + for (const { id, ...mergeOperation } of rows) { + messages.push(...(await writer.addPersonOverride(tx, mergeOperation))) + await this.postgres.query( + tx, + SQL`DELETE FROM posthog_pendingpersonoverride WHERE id = ${id}`, + undefined, + 'processPendingOverrides' + ) + } + + // n.b.: We publish the messages here (and wait for acks) to ensure + // that all of our override updates are sent to Kafka prior to + // committing the transaction. If we're unable to publish, we should + // discard updates and try again later when it's available -- not + // doing so would cause the copy of this data in ClickHouse to + // slowly drift out of sync with the copy in Postgres. This write is + // safe to retry if we write to Kafka but then fail to commit to + // Postgres for some reason -- the same row state should be + // generated each call, and the receiving ReplacingMergeTree will + // ensure we keep only the latest version after all writes settle.) + await kafkaProducer.queueMessages(messages, true) + + return rows.length + }) + } +} + function SQL(sqlParts: TemplateStringsArray, ...args: any[]): { text: string; values: any[] } { // Generates a node-pq compatible query object given a tagged // template literal. The intention is to remove the need to match up diff --git a/plugin-server/tests/helpers/promises.ts b/plugin-server/tests/helpers/promises.ts index 4d92371bdbbfb7..4daea9156aa02b 100644 --- a/plugin-server/tests/helpers/promises.ts +++ b/plugin-server/tests/helpers/promises.ts @@ -13,3 +13,22 @@ export function createPromise(): MockPromise { return result as MockPromise } + +export class WaitEvent { + private promise: Promise + private resolve: () => void + + constructor() { + this.promise = new Promise((resolve) => { + this.resolve = resolve + }) + } + + public set() { + this.resolve() + } + + public async wait() { + return this.promise + } +} diff --git a/plugin-server/tests/worker/ingestion/person-state.test.ts b/plugin-server/tests/worker/ingestion/person-state.test.ts index 9c03e2d0584143..76652cb12d5495 100644 --- a/plugin-server/tests/worker/ingestion/person-state.test.ts +++ b/plugin-server/tests/worker/ingestion/person-state.test.ts @@ -1,14 +1,20 @@ import { PluginEvent } from '@posthog/plugin-scaffold' import { DateTime } from 'luxon' +import { waitForExpect } from '../../../functional_tests/expectations' import { Database, Hub, Person } from '../../../src/types' import { DependencyUnavailableError } from '../../../src/utils/db/error' import { createHub } from '../../../src/utils/db/hub' import { PostgresUse } from '../../../src/utils/db/postgres' import { defaultRetryConfig } from '../../../src/utils/retries' import { UUIDT } from '../../../src/utils/utils' -import { PersonState } from '../../../src/worker/ingestion/person-state' +import { + DeferredPersonOverrideWriter, + PersonOverrideWriter, + PersonState, +} from '../../../src/worker/ingestion/person-state' import { delayUntilEventIngested } from '../../helpers/clickhouse' +import { WaitEvent } from '../../helpers/promises' import { createOrganization, createTeam, fetchPostgresPersons, insertRow } from '../../helpers/sql' jest.setTimeout(5000) // 5 sec timeout @@ -17,6 +23,63 @@ const timestamp = DateTime.fromISO('2020-01-01T12:00:05.200Z').toUTC() const timestamp2 = DateTime.fromISO('2020-02-02T12:00:05.200Z').toUTC() const timestampch = '2020-01-01 12:00:05.000' +async function fetchPostgresPersonIdOverrides(hub: Hub, teamId: number): Promise<[string, string][]> { + const result = await hub.db.postgres.query( + PostgresUse.COMMON_WRITE, + ` + WITH overrides AS ( + SELECT id, old_person_id, override_person_id + FROM posthog_personoverride + WHERE team_id = ${teamId} + ORDER BY id + ) + SELECT + mapping.uuid AS old_person_id, + overrides_mapping.uuid AS override_person_id + FROM + overrides AS first + JOIN + posthog_personoverridemapping AS mapping ON first.old_person_id = mapping.id + JOIN ( + SELECT + second.id AS id, + uuid + FROM + overrides AS second + JOIN posthog_personoverridemapping AS mapping ON second.override_person_id = mapping.id + ) AS overrides_mapping ON overrides_mapping.id = first.id + `, + undefined, + 'fetchPersonIdOverrides' + ) + return result.rows.map(({ old_person_id, override_person_id }) => [old_person_id, override_person_id]).sort() as [ + string, + string + ][] +} + +interface PersonOverridesMode { + getWriter(hub: Hub): PersonOverrideWriter | DeferredPersonOverrideWriter + fetchPostgresPersonIdOverrides(hub: Hub, teamId: number): Promise<[string, string][]> +} + +const PersonOverridesModes: Record = { + disabled: undefined, + immediate: { + getWriter: (hub) => new PersonOverrideWriter(hub.db.postgres), + fetchPostgresPersonIdOverrides: (hub, teamId) => fetchPostgresPersonIdOverrides(hub, teamId), + }, + deferred: { + // XXX: This is kind of a mess -- ideally it'd be preferable to just + // instantiate the writer once and share it + getWriter: (hub) => new DeferredPersonOverrideWriter(hub.db.postgres, 456), + fetchPostgresPersonIdOverrides: async (hub, teamId) => { + await new DeferredPersonOverrideWriter(hub.db.postgres, 456).processPendingOverrides(hub.db.kafkaProducer) + return await fetchPostgresPersonIdOverrides(hub, teamId) + }, + }, +} + describe('PersonState.update()', () => { let hub: Hub let closeHub: () => Promise @@ -24,7 +87,7 @@ describe('PersonState.update()', () => { let uuid: UUIDT let uuid2: UUIDT let teamId: number - let poEEmbraceJoin: boolean + let overridesMode: PersonOverridesMode | undefined let organizationId: string beforeAll(async () => { @@ -35,7 +98,7 @@ describe('PersonState.update()', () => { }) beforeEach(async () => { - poEEmbraceJoin = false + overridesMode = undefined uuid = new UUIDT() uuid2 = new UUIDT() @@ -63,13 +126,14 @@ describe('PersonState.update()', () => { properties: {}, ...event, } + return new PersonState( fullEvent as any, teamId, event.distinct_id!, timestamp, customHub ? customHub.db : hub.db, - poEEmbraceJoin, + overridesMode?.getWriter(customHub ?? hub), uuid, maxMergeAttempts ?? 3 // the default ) @@ -453,12 +517,12 @@ describe('PersonState.update()', () => { }) }) - describe.each([[true], [false]])('on $identify event', (poEEmbraceJoinThis) => { + describe.each(Object.keys(PersonOverridesModes))('on $identify event', (useOverridesMode) => { beforeEach(() => { - poEEmbraceJoin = poEEmbraceJoinThis + overridesMode = PersonOverridesModes[useOverridesMode] // n.b. mutating outer scope here -- be careful }) - describe(`${poEEmbraceJoinThis ? 'PoE' : 'normal'}`, () => { + describe(`overrides: ${useOverridesMode}`, () => { it(`no-op when $anon_distinct_id not passed`, async () => { const person = await personState({ event: '$identify', @@ -1014,11 +1078,12 @@ describe('PersonState.update()', () => { }) }) - describe.each([[true], [false]])('on $merge_dangerously events', (poEEmbraceJoinThis) => { + describe.each(Object.keys(PersonOverridesModes))('on $merge_dangerously events', (useOverridesMode) => { beforeEach(() => { - poEEmbraceJoin = poEEmbraceJoinThis + overridesMode = PersonOverridesModes[useOverridesMode] // n.b. mutating outer scope here -- be careful }) - describe(`${poEEmbraceJoinThis ? 'PoE' : 'normal'}`, () => { + + describe(`overrides: ${useOverridesMode}`, () => { // only difference between $merge_dangerously and $identify it(`merge_dangerously can merge people when alias id user is identified`, async () => { await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, true, uuid.toString(), ['old-user']) @@ -1381,7 +1446,7 @@ describe('PersonState.update()', () => { ) }) }) - describe.each([[true], [false]])('on persons merges', (poEEmbraceJoinThis) => { + describe.each(Object.keys(PersonOverridesModes))('on persons merges', (useOverridesMode) => { // For some reason these tests failed if I ran them with a hub shared // with other tests, so I'm creating a new hub for each test. let hub: Hub @@ -1389,7 +1454,7 @@ describe('PersonState.update()', () => { beforeEach(async () => { ;[hub, closeHub] = await createHub({}) - poEEmbraceJoin = poEEmbraceJoinThis + overridesMode = PersonOverridesModes[useOverridesMode] // n.b. mutating outer scope here -- be careful jest.spyOn(hub.db, 'fetchPerson') jest.spyOn(hub.db, 'updatePersonDeprecated') @@ -1398,42 +1463,7 @@ describe('PersonState.update()', () => { afterEach(async () => { await closeHub() }) - - async function fetchPersonIdOverrides() { - const result = await hub.db.postgres.query( - PostgresUse.COMMON_WRITE, - ` - WITH overrides AS ( - SELECT id, old_person_id, override_person_id - FROM posthog_personoverride - WHERE team_id = ${teamId} - ORDER BY id - ) - SELECT - mapping.uuid AS old_person_id, - overrides_mapping.uuid AS override_person_id - FROM - overrides AS first - JOIN - posthog_personoverridemapping AS mapping ON first.old_person_id = mapping.id - JOIN ( - SELECT - second.id AS id, - uuid - FROM - overrides AS second - JOIN posthog_personoverridemapping AS mapping ON second.override_person_id = mapping.id - ) AS overrides_mapping ON overrides_mapping.id = first.id - `, - undefined, - 'fetchPersonIdOverrides' - ) - return result.rows - .map(({ old_person_id, override_person_id }) => [old_person_id, override_person_id]) - .sort() as [string, string][] - } - - describe(`${poEEmbraceJoinThis ? 'PoE' : 'normal'}`, () => { + describe(`overrides: ${useOverridesMode}`, () => { it(`no-op if persons already merged`, async () => { await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, true, uuid.toString(), [ 'first', @@ -1539,9 +1569,9 @@ describe('PersonState.update()', () => { const clickHouseDistinctIds = await fetchDistinctIdsClickhouse(person) expect(clickHouseDistinctIds).toEqual(expect.arrayContaining(['first', 'second'])) - // verify Postgres person_id overrides - if (poEEmbraceJoin) { - const overrides = await fetchPersonIdOverrides() + // verify Postgres person_id overrides, if applicable + if (overridesMode) { + const overrides = await overridesMode.fetchPostgresPersonIdOverrides(hub, teamId) expect(overrides).toEqual([[second.uuid, first.uuid]]) // & CH person overrides // TODO @@ -1704,8 +1734,8 @@ describe('PersonState.update()', () => { }) it(`does not commit partial transactions on override conflicts`, async () => { - if (!poEEmbraceJoin) { - return // this is only a PoE test + if (overridesMode !== PersonOverridesModes.immediate) { + return // this behavior is only supported with immediate overrides } const first: Person = await hub.db.createPerson( timestamp, @@ -1792,7 +1822,7 @@ describe('PersonState.update()', () => { expect(distinctIdsAfterFailure).toEqual(expect.arrayContaining([['first'], ['second']])) // verify Postgres person_id overrides - const overridesAfterFailure = await fetchPersonIdOverrides() + const overridesAfterFailure = await overridesMode!.fetchPostgresPersonIdOverrides(hub, teamId) expect(overridesAfterFailure).toEqual([]) // Now verify we successfully get to our target state if we do not have @@ -1827,7 +1857,7 @@ describe('PersonState.update()', () => { expect(distinctIds).toEqual(expect.arrayContaining(['first', 'second'])) // verify Postgres person_id overrides - const overrides = await fetchPersonIdOverrides() + const overrides = await overridesMode!.fetchPostgresPersonIdOverrides(hub, teamId) expect(overrides).toEqual([[second.uuid, first.uuid]]) }) @@ -1966,9 +1996,9 @@ describe('PersonState.update()', () => { const distinctIds = await hub.db.fetchDistinctIdValues(persons[0]) expect(distinctIds).toEqual(expect.arrayContaining(['first', 'second', 'third'])) - if (poEEmbraceJoin) { - // verify Postgres person_id overrides - const overrides = await fetchPersonIdOverrides() + // verify Postgres person_id overrides, if applicable + if (overridesMode) { + const overrides = await overridesMode.fetchPostgresPersonIdOverrides(hub, teamId) expect(overrides).toEqual([ [second.uuid, first.uuid], [third.uuid, first.uuid], @@ -2053,9 +2083,9 @@ describe('PersonState.update()', () => { const distinctIds = await hub.db.fetchDistinctIdValues(persons[0]) expect(distinctIds).toEqual(expect.arrayContaining(['first', 'second', 'third'])) - if (poEEmbraceJoin) { - // verify Postgres person_id overrides - const overrides = await fetchPersonIdOverrides() + // verify Postgres person_id overrides, if applicable + if (overridesMode) { + const overrides = await overridesMode.fetchPostgresPersonIdOverrides(hub, teamId) expect(overrides).toEqual([ [second.uuid, first.uuid], [third.uuid, first.uuid], @@ -2065,3 +2095,177 @@ describe('PersonState.update()', () => { }) }) }) + +describe('DeferredPersonOverrideWriter', () => { + let hub: Hub + let closeHub: () => Promise + + // not always used, but used more often then not + let organizationId: string + let teamId: number + + const lockId = 456 + let writer: DeferredPersonOverrideWriter + + beforeAll(async () => { + ;[hub, closeHub] = await createHub({}) + organizationId = await createOrganization(hub.db.postgres) + writer = new DeferredPersonOverrideWriter(hub.db.postgres, lockId) + }) + + beforeEach(async () => { + teamId = await createTeam(hub.db.postgres, organizationId) + await hub.db.postgres.query( + PostgresUse.COMMON_WRITE, + 'TRUNCATE TABLE posthog_pendingpersonoverride', + undefined, + '' + ) + }) + + afterEach(() => { + jest.restoreAllMocks() + }) + + afterAll(async () => { + await closeHub() + }) + + const getPendingPersonOverrides = async () => { + const { rows } = await hub.db.postgres.query( + PostgresUse.COMMON_WRITE, + `SELECT old_person_id, override_person_id + FROM posthog_pendingpersonoverride + WHERE team_id = ${teamId}`, + undefined, + '' + ) + return rows + } + + it('moves overrides from the pending table to the overrides table', async () => { + const { postgres, kafkaProducer } = hub.db + + const override = { + old_person_id: new UUIDT().toString(), + override_person_id: new UUIDT().toString(), + } + + await postgres.transaction(PostgresUse.COMMON_WRITE, '', async (tx) => { + await writer.addPersonOverride(tx, { team_id: teamId, ...override, oldest_event: DateTime.fromMillis(0) }) + }) + + expect(await getPendingPersonOverrides()).toEqual([override]) + + expect(await writer.processPendingOverrides(kafkaProducer)).toEqual(1) + + expect(await getPendingPersonOverrides()).toMatchObject([]) + + expect(await fetchPostgresPersonIdOverrides(hub, teamId)).toEqual([ + [override.old_person_id, override.override_person_id], + ]) + + const clickhouseOverrides = await waitForExpect(async () => { + const { data } = await hub.db.clickhouse.querying( + ` + SELECT old_person_id, override_person_id + FROM person_overrides + WHERE team_id = ${teamId} + `, + { dataObjects: true } + ) + expect(data).toHaveLength(1) + return data + }) + expect(clickhouseOverrides).toEqual([override]) + }) + + it('rolls back on kafka producer error', async () => { + const { postgres, kafkaProducer } = hub.db + + const override = { + old_person_id: new UUIDT().toString(), + override_person_id: new UUIDT().toString(), + } + + await postgres.transaction(PostgresUse.COMMON_WRITE, '', async (tx) => { + await writer.addPersonOverride(tx, { team_id: teamId, ...override, oldest_event: DateTime.fromMillis(0) }) + }) + + expect(await getPendingPersonOverrides()).toEqual([override]) + + jest.spyOn(kafkaProducer, 'queueMessages').mockImplementation(() => { + throw new Error('something bad happened') + }) + + await expect(writer.processPendingOverrides(kafkaProducer)).rejects.toThrow() + + expect(await getPendingPersonOverrides()).toEqual([override]) + }) + + it('ensures advisory lock is held before processing', async () => { + const { postgres, kafkaProducer } = hub.db + + let acquiredLock: boolean + const tryLockComplete = new WaitEvent() + const readyToReleaseLock = new WaitEvent() + + const transactionHolder = postgres + .transaction(PostgresUse.COMMON_WRITE, '', async (tx) => { + const { rows } = await postgres.query( + tx, + `SELECT pg_try_advisory_lock(${lockId}) as acquired, pg_backend_pid()`, + undefined, + '' + ) + ;[{ acquired: acquiredLock }] = rows + tryLockComplete.set() + await readyToReleaseLock.wait() + }) + .then(() => { + acquiredLock = false + }) + + try { + await tryLockComplete.wait() + expect(acquiredLock!).toBe(true) + await expect(writer.processPendingOverrides(kafkaProducer)).rejects.toThrow(Error('could not acquire lock')) + } finally { + readyToReleaseLock.set() + await transactionHolder + } + + expect(acquiredLock!).toBe(false) + await expect(writer.processPendingOverrides(kafkaProducer)).resolves.toEqual(0) + }) + + it('respects limit if provided', async () => { + const { postgres, kafkaProducer } = hub.db + + const overrides = [...Array(3)].map(() => ({ + old_person_id: new UUIDT().toString(), + override_person_id: new UUIDT().toString(), + })) + + await postgres.transaction(PostgresUse.COMMON_WRITE, '', async (tx) => { + await Promise.all( + overrides.map( + async (override) => + await writer.addPersonOverride(tx, { + team_id: teamId, + ...override, + oldest_event: DateTime.fromMillis(0), + }) + ) + ) + }) + + expect(await getPendingPersonOverrides()).toEqual(overrides) + + expect(await writer.processPendingOverrides(kafkaProducer, 2)).toEqual(2) + expect(await getPendingPersonOverrides()).toMatchObject(overrides.slice(-1)) + + expect(await writer.processPendingOverrides(kafkaProducer, 2)).toEqual(1) + expect(await getPendingPersonOverrides()).toEqual([]) + }) +}) diff --git a/posthog/migrations/0371_pendingpersonoverride.py b/posthog/migrations/0371_pendingpersonoverride.py new file mode 100644 index 00000000000000..ce34c34829f266 --- /dev/null +++ b/posthog/migrations/0371_pendingpersonoverride.py @@ -0,0 +1,22 @@ +# Generated by Django 3.2.19 on 2023-11-29 23:46 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + dependencies = [ + ("posthog", "0370_externaldatajob_workflow_id"), + ] + + operations = [ + migrations.CreateModel( + name="PendingPersonOverride", + fields=[ + ("id", models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID")), + ("team_id", models.BigIntegerField()), + ("old_person_id", models.UUIDField()), + ("override_person_id", models.UUIDField()), + ("oldest_event", models.DateTimeField()), + ], + ), + ] diff --git a/posthog/models/person/person.py b/posthog/models/person/person.py index 92b6103f2c8b64..9ef4a58e18cbaa 100644 --- a/posthog/models/person/person.py +++ b/posthog/models/person/person.py @@ -134,6 +134,14 @@ class Meta: uuid = models.UUIDField() +class PendingPersonOverride(models.Model): + id = models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID") + team_id = models.BigIntegerField() + old_person_id = models.UUIDField() + override_person_id = models.UUIDField() + oldest_event = models.DateTimeField() + + class PersonOverride(models.Model): """A model of persons to be overriden in merge or merge-like events.