From 64816547eb39115ffad1eff728e60dfbd45055ab Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Fri, 24 Nov 2023 20:41:28 -0800 Subject: [PATCH 01/25] Add `PendingPersonOverride` model. --- posthog/models/person/person.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/posthog/models/person/person.py b/posthog/models/person/person.py index 92b6103f2c8b6..9ef4a58e18cba 100644 --- a/posthog/models/person/person.py +++ b/posthog/models/person/person.py @@ -134,6 +134,14 @@ class Meta: uuid = models.UUIDField() +class PendingPersonOverride(models.Model): + id = models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID") + team_id = models.BigIntegerField() + old_person_id = models.UUIDField() + override_person_id = models.UUIDField() + oldest_event = models.DateTimeField() + + class PersonOverride(models.Model): """A model of persons to be overriden in merge or merge-like events. From 2c380304dd06bcab0bd9c3349168d28a5eb7a91c Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Wed, 29 Nov 2023 15:46:43 -0800 Subject: [PATCH 02/25] Add automatic migration. --- latest_migrations.manifest | 2 +- .../migrations/0368_pendingpersonoverride.py | 22 +++++++++++++++++++ 2 files changed, 23 insertions(+), 1 deletion(-) create mode 100644 posthog/migrations/0368_pendingpersonoverride.py diff --git a/latest_migrations.manifest b/latest_migrations.manifest index ba2f2a8333b42..831fab16cfdd2 100644 --- a/latest_migrations.manifest +++ b/latest_migrations.manifest @@ -5,7 +5,7 @@ contenttypes: 0002_remove_content_type_name ee: 0015_add_verified_properties otp_static: 0002_throttling otp_totp: 0002_auto_20190420_0723 -posthog: 0367_job_inputs +posthog: 0368_pendingpersonoverride sessions: 0001_initial social_django: 0010_uid_db_index two_factor: 0007_auto_20201201_1019 diff --git a/posthog/migrations/0368_pendingpersonoverride.py b/posthog/migrations/0368_pendingpersonoverride.py new file mode 100644 index 0000000000000..e7b7ebab9c83a --- /dev/null +++ b/posthog/migrations/0368_pendingpersonoverride.py @@ -0,0 +1,22 @@ +# Generated by Django 3.2.19 on 2023-11-29 23:46 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + dependencies = [ + ("posthog", "0367_job_inputs"), + ] + + operations = [ + migrations.CreateModel( + name="PendingPersonOverride", + fields=[ + ("id", models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID")), + ("team_id", models.BigIntegerField()), + ("old_person_id", models.UUIDField()), + ("override_person_id", models.UUIDField()), + ("oldest_event", models.DateTimeField()), + ], + ), + ] From 0486f2b52d339b95d4af2db6d0121f1cc446db5e Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Wed, 29 Nov 2023 16:15:24 -0800 Subject: [PATCH 03/25] Add unfortunate `MergeOperation` type to keep consistency between non-deferred and deferred overwrite implementations. --- .../src/worker/ingestion/person-state.ts | 81 +++++++++++-------- 1 file changed, 48 insertions(+), 33 deletions(-) diff --git a/plugin-server/src/worker/ingestion/person-state.ts b/plugin-server/src/worker/ingestion/person-state.ts index 07758813bb9b3..63fc8be02b5e1 100644 --- a/plugin-server/src/worker/ingestion/person-state.ts +++ b/plugin-server/src/worker/ingestion/person-state.ts @@ -521,9 +521,7 @@ export class PersonState { if (this.poEEmbraceJoin) { personOverrideMessages = await new PersonOverrideWriter(this.db.postgres).addPersonOverride( tx, - this.teamId, - otherPerson, - mergeInto + getMergeOperation(this.teamId, otherPerson, mergeInto) ) } @@ -551,21 +549,30 @@ export class PersonState { } } +type MergeOperation = { + team_id: number + old_person_id: string + override_person_id: string + oldest_event: DateTime +} + +function getMergeOperation(teamId: number, oldPerson: Person, overridePerson: Person): MergeOperation { + if (teamId != oldPerson.team_id || teamId != overridePerson.team_id) { + throw new Error('cannot merge persons across different teams') + } + return { + team_id: teamId, + old_person_id: oldPerson.uuid, + override_person_id: overridePerson.uuid, + oldest_event: overridePerson.created_at, + } +} + class PersonOverrideWriter { constructor(private postgres: PostgresRouter) {} - public async addPersonOverride( - tx: TransactionClient, - teamId: number, - oldPerson: Person, - overridePerson: Person - ): Promise { - if (teamId != oldPerson.team_id || teamId != overridePerson.team_id) { - throw new Error('cannot merge persons across different teams') - } - + public async addPersonOverride(tx: TransactionClient, mergeOperation: MergeOperation): Promise { const mergedAt = DateTime.now() - const oldestEvent = overridePerson.created_at /** We'll need to do 4 updates: @@ -573,8 +580,16 @@ class PersonOverrideWriter { 2. Add an override from oldPerson to override person 3. Update any entries that have oldPerson as the override person to now also point to the new override person. Note that we don't update `oldest_event`, because it's a heuristic (used to optimise squashing) tied to the old_person and nothing changed about the old_person who's events need to get squashed. */ - const oldPersonId = await this.addPersonOverrideMapping(tx, oldPerson) - const overridePersonId = await this.addPersonOverrideMapping(tx, overridePerson) + const oldPersonMappingId = await this.addPersonOverrideMapping( + tx, + mergeOperation.team_id, + mergeOperation.old_person_id + ) + const overridePersonMappingId = await this.addPersonOverrideMapping( + tx, + mergeOperation.team_id, + mergeOperation.override_person_id + ) await this.postgres.query( tx, @@ -586,10 +601,10 @@ class PersonOverrideWriter { oldest_event, version ) VALUES ( - ${teamId}, - ${oldPersonId}, - ${overridePersonId}, - ${oldestEvent}, + ${mergeOperation.team_id}, + ${oldPersonMappingId}, + ${overridePersonMappingId}, + ${mergeOperation.oldest_event}, 0 ) `, @@ -606,9 +621,9 @@ class PersonOverrideWriter { UPDATE posthog_personoverride SET - override_person_id = ${overridePersonId}, version = COALESCE(version, 0)::numeric + 1 + override_person_id = ${overridePersonMappingId}, version = COALESCE(version, 0)::numeric + 1 WHERE - team_id = ${teamId} AND override_person_id = ${oldPersonId} + team_id = ${mergeOperation.team_id} AND override_person_id = ${oldPersonMappingId} RETURNING old_person_id, version, @@ -637,21 +652,21 @@ class PersonOverrideWriter { messages: [ { value: JSON.stringify({ - team_id: teamId, + team_id: mergeOperation.team_id, + old_person_id: mergeOperation.old_person_id, + override_person_id: mergeOperation.override_person_id, + oldest_event: castTimestampOrNow(mergeOperation.oldest_event, TimestampFormat.ClickHouse), merged_at: castTimestampOrNow(mergedAt, TimestampFormat.ClickHouse), - override_person_id: overridePerson.uuid, - old_person_id: oldPerson.uuid, - oldest_event: castTimestampOrNow(oldestEvent, TimestampFormat.ClickHouse), version: 0, }), }, ...transitiveUpdates.map(({ old_person_id, version, oldest_event }) => ({ value: JSON.stringify({ - team_id: teamId, - merged_at: castTimestampOrNow(mergedAt, TimestampFormat.ClickHouse), - override_person_id: overridePerson.uuid, + team_id: mergeOperation.team_id, old_person_id: old_person_id, + override_person_id: mergeOperation.override_person_id, oldest_event: castTimestampOrNow(oldest_event, TimestampFormat.ClickHouse), + merged_at: castTimestampOrNow(mergedAt, TimestampFormat.ClickHouse), version: version, }), })), @@ -662,7 +677,7 @@ class PersonOverrideWriter { return personOverrideMessages } - private async addPersonOverrideMapping(tx: TransactionClient, person: Person): Promise { + private async addPersonOverrideMapping(tx: TransactionClient, teamId: number, personId: string): Promise { /** Update the helper table that serves as a mapping between a serial ID and a Person UUID. @@ -684,8 +699,8 @@ class PersonOverrideWriter { uuid ) VALUES ( - ${person.team_id}, - '${person.uuid}' + ${teamId}, + '${personId}' ) ON CONFLICT("team_id", "uuid") DO NOTHING RETURNING id @@ -694,7 +709,7 @@ class PersonOverrideWriter { UNION ALL SELECT id FROM posthog_personoverridemapping - WHERE uuid = '${person.uuid}' + WHERE uuid = '${personId}' `, undefined, 'personOverrideMapping' From be35915a48e31d3e5cdd0d9b0430b1807957eef2 Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Wed, 29 Nov 2023 16:39:01 -0800 Subject: [PATCH 04/25] Add `DeferredPersonOverrideWriter`. --- .../src/worker/ingestion/person-state.ts | 56 ++++++++++++++++++- 1 file changed, 55 insertions(+), 1 deletion(-) diff --git a/plugin-server/src/worker/ingestion/person-state.ts b/plugin-server/src/worker/ingestion/person-state.ts index 63fc8be02b5e1..93677b0ba8c26 100644 --- a/plugin-server/src/worker/ingestion/person-state.ts +++ b/plugin-server/src/worker/ingestion/person-state.ts @@ -4,6 +4,7 @@ import { StatsD } from 'hot-shots' import { ProducerRecord } from 'kafkajs' import { DateTime } from 'luxon' import { Counter } from 'prom-client' +import { KafkaProducerWrapper } from 'utils/db/kafka-producer-wrapper' import { KAFKA_PERSON_OVERRIDE } from '../../config/kafka-topics' import { Person, PropertyUpdateOperation, TimestampFormat } from '../../types' @@ -93,6 +94,7 @@ export class PersonState { private statsd: StatsD | undefined public updateIsIdentified: boolean // TODO: remove this from the class and being hidden private poEEmbraceJoin: boolean + private overrideWriter: PersonOverrideWriter | DeferredPersonOverrideWriter constructor( event: PluginEvent, @@ -122,6 +124,7 @@ export class PersonState { // For persons on events embrace the join gradual roll-out, remove after fully rolled out this.poEEmbraceJoin = poEEmbraceJoin + this.overrideWriter = new PersonOverrideWriter(db.postgres) } async update(): Promise { @@ -519,7 +522,7 @@ export class PersonState { let personOverrideMessages: ProducerRecord[] = [] if (this.poEEmbraceJoin) { - personOverrideMessages = await new PersonOverrideWriter(this.db.postgres).addPersonOverride( + personOverrideMessages = await this.overrideWriter.addPersonOverride( tx, getMergeOperation(this.teamId, otherPerson, mergeInto) ) @@ -719,6 +722,57 @@ class PersonOverrideWriter { } } +class DeferredPersonOverrideWriter { + constructor(private postgres: PostgresRouter) {} + + public async addPersonOverride(tx: TransactionClient, mergeOperation: MergeOperation): Promise { + await this.postgres.query( + tx, + SQL` + INSERT INTO posthog_pendingpersonoverride ( + team_id, + old_person_id, + override_person_id, + oldest_event + ) VALUES ( + ${mergeOperation.team_id}, + ${mergeOperation.old_person_id}, + ${mergeOperation.override_person_id}, + ${mergeOperation.oldest_event} + )`, + undefined, + 'pendingPersonOverride' + ) + + return [] + } + + public async processPendingOverrides(kafkaProducer: KafkaProducerWrapper): Promise { + const writer = new PersonOverrideWriter(this.postgres) + + await this.postgres.transaction(PostgresUse.COMMON_WRITE, 'processPendingOverrides', async (tx) => { + const { rows } = await this.postgres.query( + tx, + `SELECT * FROM posthog_pendingpersonoverride ORDER BY id`, + undefined, + 'processPendingOverrides' + ) + const results = rows.map(async ({ id, ...mergeOperation }) => { + const messages = await writer.addPersonOverride(tx, mergeOperation) + await this.postgres.query( + tx, + SQL`DELETE FROM posthog_pendingpersonoverride WHERE id = ${id}`, + undefined, + 'processPendingOverrides' + ) + return messages + }) + const messages = (await Promise.all(results)).flat() + await kafkaProducer.queueMessages(messages, true) + }) + } +} + export function ageInMonthsLowCardinality(timestamp: DateTime): number { const ageInMonths = Math.max(-Math.floor(timestamp.diffNow('months').months), 0) // for getting low cardinality for statsd metrics tags, which can cause issues in e.g. InfluxDB: From 69835f0f9a302df2c57301b0b3e61eaf49939823 Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Thu, 30 Nov 2023 09:19:00 -0800 Subject: [PATCH 05/25] Start on dependency injection of override writer for better testing. --- .../event-pipeline/processPersonsStep.ts | 4 ++-- .../src/worker/ingestion/person-state.ts | 22 +++++++------------ .../worker/ingestion/person-state.test.ts | 8 +++++-- 3 files changed, 16 insertions(+), 18 deletions(-) diff --git a/plugin-server/src/worker/ingestion/event-pipeline/processPersonsStep.ts b/plugin-server/src/worker/ingestion/event-pipeline/processPersonsStep.ts index 0ae5390cc2d93..f7dd4fa993718 100644 --- a/plugin-server/src/worker/ingestion/event-pipeline/processPersonsStep.ts +++ b/plugin-server/src/worker/ingestion/event-pipeline/processPersonsStep.ts @@ -4,7 +4,7 @@ import { Person } from 'types' import { normalizeEvent } from '../../../utils/event' import { status } from '../../../utils/status' -import { PersonState } from '../person-state' +import { PersonOverrideWriter, PersonState } from '../person-state' import { parseEventTimestamp } from '../timestamps' import { EventPipelineRunner } from './runner' @@ -29,7 +29,7 @@ export async function processPersonsStep( timestamp, runner.hub.db, runner.hub.statsd, - runner.poEEmbraceJoin + runner.poEEmbraceJoin ? new PersonOverrideWriter(runner.hub.db.postgres) : undefined ).update() return [event, person] diff --git a/plugin-server/src/worker/ingestion/person-state.ts b/plugin-server/src/worker/ingestion/person-state.ts index 93677b0ba8c26..8a0e2e214161e 100644 --- a/plugin-server/src/worker/ingestion/person-state.ts +++ b/plugin-server/src/worker/ingestion/person-state.ts @@ -93,8 +93,6 @@ export class PersonState { private db: DB private statsd: StatsD | undefined public updateIsIdentified: boolean // TODO: remove this from the class and being hidden - private poEEmbraceJoin: boolean - private overrideWriter: PersonOverrideWriter | DeferredPersonOverrideWriter constructor( event: PluginEvent, @@ -103,7 +101,7 @@ export class PersonState { timestamp: DateTime, db: DB, statsd: StatsD | undefined = undefined, - poEEmbraceJoin = false, + private personOverrideWriter: PersonOverrideWriter | DeferredPersonOverrideWriter | undefined = undefined, uuid: UUIDT | undefined = undefined, maxMergeAttempts: number = MAX_FAILED_PERSON_MERGE_ATTEMPTS ) { @@ -121,10 +119,6 @@ export class PersonState { // If set to true, we'll update `is_identified` at the end of `updateProperties` // :KLUDGE: This is an indirect communication channel between `handleIdentifyOrAlias` and `updateProperties` this.updateIsIdentified = false - - // For persons on events embrace the join gradual roll-out, remove after fully rolled out - this.poEEmbraceJoin = poEEmbraceJoin - this.overrideWriter = new PersonOverrideWriter(db.postgres) } async update(): Promise { @@ -454,7 +448,7 @@ export class PersonState { const properties: Properties = { ...otherPerson.properties, ...mergeInto.properties } this.applyEventPropertyUpdates(properties) - if (this.poEEmbraceJoin) { + if (this.personOverrideWriter !== undefined) { // Optimize merging persons to keep using the person id that has longer history, // which means we'll have less events to update during the squash later if (otherPerson.created_at < mergeInto.created_at) { @@ -489,7 +483,7 @@ export class PersonState { call: this.event.event, // $identify, $create_alias or $merge_dangerously oldPersonIdentified: String(otherPerson.is_identified), newPersonIdentified: String(mergeInto.is_identified), - poEEmbraceJoin: String(this.poEEmbraceJoin), + poEEmbraceJoin: String(this.personOverrideWriter !== undefined), }) .inc() @@ -521,8 +515,8 @@ export class PersonState { const deletePersonMessages = await this.db.deletePerson(otherPerson, tx) let personOverrideMessages: ProducerRecord[] = [] - if (this.poEEmbraceJoin) { - personOverrideMessages = await this.overrideWriter.addPersonOverride( + if (this.personOverrideWriter !== undefined) { + personOverrideMessages = await this.personOverrideWriter.addPersonOverride( tx, getMergeOperation(this.teamId, otherPerson, mergeInto) ) @@ -545,7 +539,7 @@ export class PersonState { call: this.event.event, // $identify, $create_alias or $merge_dangerously oldPersonIdentified: String(otherPerson.is_identified), newPersonIdentified: String(mergeInto.is_identified), - poEEmbraceJoin: String(this.poEEmbraceJoin), + poEEmbraceJoin: String(this.personOverrideWriter !== undefined), }) .inc() return result @@ -571,7 +565,7 @@ function getMergeOperation(teamId: number, oldPerson: Person, overridePerson: Pe } } -class PersonOverrideWriter { +export class PersonOverrideWriter { constructor(private postgres: PostgresRouter) {} public async addPersonOverride(tx: TransactionClient, mergeOperation: MergeOperation): Promise { @@ -722,7 +716,7 @@ class PersonOverrideWriter { } } -class DeferredPersonOverrideWriter { +export class DeferredPersonOverrideWriter { constructor(private postgres: PostgresRouter) {} public async addPersonOverride(tx: TransactionClient, mergeOperation: MergeOperation): Promise { diff --git a/plugin-server/tests/worker/ingestion/person-state.test.ts b/plugin-server/tests/worker/ingestion/person-state.test.ts index 5592b342e2f06..0ecec60f7097d 100644 --- a/plugin-server/tests/worker/ingestion/person-state.test.ts +++ b/plugin-server/tests/worker/ingestion/person-state.test.ts @@ -7,7 +7,11 @@ import { createHub } from '../../../src/utils/db/hub' import { PostgresUse } from '../../../src/utils/db/postgres' import { defaultRetryConfig } from '../../../src/utils/retries' import { UUIDT } from '../../../src/utils/utils' -import { ageInMonthsLowCardinality, PersonState } from '../../../src/worker/ingestion/person-state' +import { + ageInMonthsLowCardinality, + PersonOverrideWriter, + PersonState, +} from '../../../src/worker/ingestion/person-state' import { delayUntilEventIngested } from '../../helpers/clickhouse' import { createOrganization, createTeam, fetchPostgresPersons, insertRow } from '../../helpers/sql' @@ -70,7 +74,7 @@ describe('PersonState.update()', () => { timestamp, customHub ? customHub.db : hub.db, customHub ? customHub.statsd : hub.statsd, - poEEmbraceJoin, + poEEmbraceJoin ? new PersonOverrideWriter(customHub ? customHub.db.postgres : hub.db.postgres) : undefined, uuid, maxMergeAttempts ?? 3 // the default ) From cf3a0967a9cf755e705b6e114b60e569ff714781 Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Thu, 30 Nov 2023 10:18:11 -0800 Subject: [PATCH 06/25] Support disabled, immediate, and deferred overrides in tests. --- .../worker/ingestion/person-state.test.ts | 143 ++++++++++-------- 1 file changed, 83 insertions(+), 60 deletions(-) diff --git a/plugin-server/tests/worker/ingestion/person-state.test.ts b/plugin-server/tests/worker/ingestion/person-state.test.ts index 0ecec60f7097d..94f66e2b87892 100644 --- a/plugin-server/tests/worker/ingestion/person-state.test.ts +++ b/plugin-server/tests/worker/ingestion/person-state.test.ts @@ -9,6 +9,7 @@ import { defaultRetryConfig } from '../../../src/utils/retries' import { UUIDT } from '../../../src/utils/utils' import { ageInMonthsLowCardinality, + DeferredPersonOverrideWriter, PersonOverrideWriter, PersonState, } from '../../../src/worker/ingestion/person-state' @@ -21,6 +22,61 @@ const timestamp = DateTime.fromISO('2020-01-01T12:00:05.200Z').toUTC() const timestamp2 = DateTime.fromISO('2020-02-02T12:00:05.200Z').toUTC() const timestampch = '2020-01-01 12:00:05.000' +async function fetchPostgresPersonIdOverrides(hub: Hub, teamId: number): Promise<[string, string][]> { + const result = await hub.db.postgres.query( + PostgresUse.COMMON_WRITE, + ` + WITH overrides AS ( + SELECT id, old_person_id, override_person_id + FROM posthog_personoverride + WHERE team_id = ${teamId} + ORDER BY id + ) + SELECT + mapping.uuid AS old_person_id, + overrides_mapping.uuid AS override_person_id + FROM + overrides AS first + JOIN + posthog_personoverridemapping AS mapping ON first.old_person_id = mapping.id + JOIN ( + SELECT + second.id AS id, + uuid + FROM + overrides AS second + JOIN posthog_personoverridemapping AS mapping ON second.override_person_id = mapping.id + ) AS overrides_mapping ON overrides_mapping.id = first.id + `, + undefined, + 'fetchPersonIdOverrides' + ) + return result.rows.map(({ old_person_id, override_person_id }) => [old_person_id, override_person_id]).sort() as [ + string, + string + ][] +} + +interface PersonOverridesMode { + getWriter(hub: Hub): PersonOverrideWriter | DeferredPersonOverrideWriter + fetchPostgresPersonIdOverrides(hub: Hub, teamId: number): Promise<[string, string][]> +} + +const PersonOverridesModes: Record = { + disabled: undefined, + immediate: { + getWriter: (hub) => new PersonOverrideWriter(hub.db.postgres), + fetchPostgresPersonIdOverrides: (hub, teamId) => fetchPostgresPersonIdOverrides(hub, teamId), + }, + deferred: { + getWriter: (hub) => new DeferredPersonOverrideWriter(hub.db.postgres), + fetchPostgresPersonIdOverrides: async (hub, teamId) => { + await new DeferredPersonOverrideWriter(hub.db.postgres).processPendingOverrides(hub.db.kafkaProducer) + return await fetchPostgresPersonIdOverrides(hub, teamId) + }, + }, +} + describe('PersonState.update()', () => { let hub: Hub let closeHub: () => Promise @@ -28,7 +84,7 @@ describe('PersonState.update()', () => { let uuid: UUIDT let uuid2: UUIDT let teamId: number - let poEEmbraceJoin: boolean + let overridesMode: PersonOverridesMode | undefined let organizationId: string beforeAll(async () => { @@ -39,7 +95,7 @@ describe('PersonState.update()', () => { }) beforeEach(async () => { - poEEmbraceJoin = false + overridesMode = undefined uuid = new UUIDT() uuid2 = new UUIDT() @@ -67,6 +123,7 @@ describe('PersonState.update()', () => { properties: {}, ...event, } + return new PersonState( fullEvent as any, teamId, @@ -74,7 +131,7 @@ describe('PersonState.update()', () => { timestamp, customHub ? customHub.db : hub.db, customHub ? customHub.statsd : hub.statsd, - poEEmbraceJoin ? new PersonOverrideWriter(customHub ? customHub.db.postgres : hub.db.postgres) : undefined, + overridesMode !== undefined ? overridesMode.getWriter(customHub ? customHub : hub) : undefined, uuid, maxMergeAttempts ?? 3 // the default ) @@ -458,12 +515,12 @@ describe('PersonState.update()', () => { }) }) - describe.each([[true], [false]])('on $identify event', (poEEmbraceJoinThis) => { + describe.each(Object.keys(PersonOverridesModes))('on $identify event', (useOverridesMode) => { beforeEach(() => { - poEEmbraceJoin = poEEmbraceJoinThis + overridesMode = PersonOverridesModes[useOverridesMode] // n.b. mutating outer scope here -- be careful }) - describe(`${poEEmbraceJoinThis ? 'PoE' : 'normal'}`, () => { + describe(`overrides: ${useOverridesMode}`, () => { it(`no-op when $anon_distinct_id not passed`, async () => { const person = await personState({ event: '$identify', @@ -1019,11 +1076,12 @@ describe('PersonState.update()', () => { }) }) - describe.each([[true], [false]])('on $merge_dangerously events', (poEEmbraceJoinThis) => { + describe.each(Object.keys(PersonOverridesModes))('on $merge_dangerously events', (useOverridesMode) => { beforeEach(() => { - poEEmbraceJoin = poEEmbraceJoinThis + overridesMode = PersonOverridesModes[useOverridesMode] // n.b. mutating outer scope here -- be careful }) - describe(`${poEEmbraceJoinThis ? 'PoE' : 'normal'}`, () => { + + describe(`overrides: ${useOverridesMode}`, () => { // only difference between $merge_dangerously and $identify it(`merge_dangerously can merge people when alias id user is identified`, async () => { await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, true, uuid.toString(), ['old-user']) @@ -1404,7 +1462,7 @@ describe('PersonState.update()', () => { ) }) }) - describe.each([[true], [false]])('on persons merges', (poEEmbraceJoinThis) => { + describe.each(Object.keys(PersonOverridesModes))('on persons merges', (useOverridesMode) => { // For some reason these tests failed if I ran them with a hub shared // with other tests, so I'm creating a new hub for each test. let hub: Hub @@ -1412,7 +1470,7 @@ describe('PersonState.update()', () => { beforeEach(async () => { ;[hub, closeHub] = await createHub({}) - poEEmbraceJoin = poEEmbraceJoinThis + overridesMode = PersonOverridesModes[useOverridesMode] // n.b. mutating outer scope here -- be careful jest.spyOn(hub.db, 'fetchPerson') jest.spyOn(hub.db, 'updatePersonDeprecated') @@ -1421,42 +1479,7 @@ describe('PersonState.update()', () => { afterEach(async () => { await closeHub() }) - - async function fetchPersonIdOverrides() { - const result = await hub.db.postgres.query( - PostgresUse.COMMON_WRITE, - ` - WITH overrides AS ( - SELECT id, old_person_id, override_person_id - FROM posthog_personoverride - WHERE team_id = ${teamId} - ORDER BY id - ) - SELECT - mapping.uuid AS old_person_id, - overrides_mapping.uuid AS override_person_id - FROM - overrides AS first - JOIN - posthog_personoverridemapping AS mapping ON first.old_person_id = mapping.id - JOIN ( - SELECT - second.id AS id, - uuid - FROM - overrides AS second - JOIN posthog_personoverridemapping AS mapping ON second.override_person_id = mapping.id - ) AS overrides_mapping ON overrides_mapping.id = first.id - `, - undefined, - 'fetchPersonIdOverrides' - ) - return result.rows - .map(({ old_person_id, override_person_id }) => [old_person_id, override_person_id]) - .sort() as [string, string][] - } - - describe(`${poEEmbraceJoinThis ? 'PoE' : 'normal'}`, () => { + describe(`overrides: ${useOverridesMode}`, () => { it(`no-op if persons already merged`, async () => { await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, true, uuid.toString(), [ 'first', @@ -1562,9 +1585,9 @@ describe('PersonState.update()', () => { const clickHouseDistinctIds = await fetchDistinctIdsClickhouse(person) expect(clickHouseDistinctIds).toEqual(expect.arrayContaining(['first', 'second'])) - // verify Postgres person_id overrides - if (poEEmbraceJoin) { - const overrides = await fetchPersonIdOverrides() + // verify Postgres person_id overrides, if applicable + if (overridesMode !== undefined) { + const overrides = await overridesMode.fetchPostgresPersonIdOverrides(hub, teamId) expect(overrides).toEqual([[second.uuid, first.uuid]]) // & CH person overrides // TODO @@ -1727,8 +1750,8 @@ describe('PersonState.update()', () => { }) it(`does not commit partial transactions on override conflicts`, async () => { - if (!poEEmbraceJoin) { - return // this is only a PoE test + if (overridesMode !== PersonOverridesModes.immediate) { + return // this behavior is only supported with immediate overrides } const first: Person = await hub.db.createPerson( timestamp, @@ -1815,7 +1838,7 @@ describe('PersonState.update()', () => { expect(distinctIdsAfterFailure).toEqual(expect.arrayContaining([['first'], ['second']])) // verify Postgres person_id overrides - const overridesAfterFailure = await fetchPersonIdOverrides() + const overridesAfterFailure = await fetchPostgresPersonIdOverrides(hub, teamId) expect(overridesAfterFailure).toEqual([]) // Now verify we successfully get to our target state if we do not have @@ -1850,7 +1873,7 @@ describe('PersonState.update()', () => { expect(distinctIds).toEqual(expect.arrayContaining(['first', 'second'])) // verify Postgres person_id overrides - const overrides = await fetchPersonIdOverrides() + const overrides = await fetchPostgresPersonIdOverrides(hub, teamId) expect(overrides).toEqual([[second.uuid, first.uuid]]) }) @@ -1989,9 +2012,9 @@ describe('PersonState.update()', () => { const distinctIds = await hub.db.fetchDistinctIdValues(persons[0]) expect(distinctIds).toEqual(expect.arrayContaining(['first', 'second', 'third'])) - if (poEEmbraceJoin) { - // verify Postgres person_id overrides - const overrides = await fetchPersonIdOverrides() + // verify Postgres person_id overrides, if applicable + if (overridesMode !== undefined) { + const overrides = await overridesMode.fetchPostgresPersonIdOverrides(hub, teamId) expect(overrides).toEqual([ [second.uuid, first.uuid], [third.uuid, first.uuid], @@ -2076,9 +2099,9 @@ describe('PersonState.update()', () => { const distinctIds = await hub.db.fetchDistinctIdValues(persons[0]) expect(distinctIds).toEqual(expect.arrayContaining(['first', 'second', 'third'])) - if (poEEmbraceJoin) { - // verify Postgres person_id overrides - const overrides = await fetchPersonIdOverrides() + // verify Postgres person_id overrides, if applicable + if (overridesMode !== undefined) { + const overrides = await overridesMode.fetchPostgresPersonIdOverrides(hub, teamId) expect(overrides).toEqual([ [second.uuid, first.uuid], [third.uuid, first.uuid], From 3c8fd4f9e93a6371ae78ed7d602b8b736355626a Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Thu, 30 Nov 2023 12:53:14 -0800 Subject: [PATCH 07/25] Minor improvements. --- plugin-server/src/worker/ingestion/person-state.ts | 8 ++++---- plugin-server/tests/worker/ingestion/person-state.test.ts | 6 +++--- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/plugin-server/src/worker/ingestion/person-state.ts b/plugin-server/src/worker/ingestion/person-state.ts index 8a0e2e214161e..b68b106cd6d87 100644 --- a/plugin-server/src/worker/ingestion/person-state.ts +++ b/plugin-server/src/worker/ingestion/person-state.ts @@ -448,7 +448,7 @@ export class PersonState { const properties: Properties = { ...otherPerson.properties, ...mergeInto.properties } this.applyEventPropertyUpdates(properties) - if (this.personOverrideWriter !== undefined) { + if (this.personOverrideWriter) { // Optimize merging persons to keep using the person id that has longer history, // which means we'll have less events to update during the squash later if (otherPerson.created_at < mergeInto.created_at) { @@ -483,7 +483,7 @@ export class PersonState { call: this.event.event, // $identify, $create_alias or $merge_dangerously oldPersonIdentified: String(otherPerson.is_identified), newPersonIdentified: String(mergeInto.is_identified), - poEEmbraceJoin: String(this.personOverrideWriter !== undefined), + poEEmbraceJoin: String(!!this.personOverrideWriter), }) .inc() @@ -515,7 +515,7 @@ export class PersonState { const deletePersonMessages = await this.db.deletePerson(otherPerson, tx) let personOverrideMessages: ProducerRecord[] = [] - if (this.personOverrideWriter !== undefined) { + if (this.personOverrideWriter) { personOverrideMessages = await this.personOverrideWriter.addPersonOverride( tx, getMergeOperation(this.teamId, otherPerson, mergeInto) @@ -539,7 +539,7 @@ export class PersonState { call: this.event.event, // $identify, $create_alias or $merge_dangerously oldPersonIdentified: String(otherPerson.is_identified), newPersonIdentified: String(mergeInto.is_identified), - poEEmbraceJoin: String(this.personOverrideWriter !== undefined), + poEEmbraceJoin: String(!!this.personOverrideWriter), }) .inc() return result diff --git a/plugin-server/tests/worker/ingestion/person-state.test.ts b/plugin-server/tests/worker/ingestion/person-state.test.ts index 94f66e2b87892..ccbe4c8af2d57 100644 --- a/plugin-server/tests/worker/ingestion/person-state.test.ts +++ b/plugin-server/tests/worker/ingestion/person-state.test.ts @@ -1586,7 +1586,7 @@ describe('PersonState.update()', () => { expect(clickHouseDistinctIds).toEqual(expect.arrayContaining(['first', 'second'])) // verify Postgres person_id overrides, if applicable - if (overridesMode !== undefined) { + if (overridesMode) { const overrides = await overridesMode.fetchPostgresPersonIdOverrides(hub, teamId) expect(overrides).toEqual([[second.uuid, first.uuid]]) // & CH person overrides @@ -2013,7 +2013,7 @@ describe('PersonState.update()', () => { expect(distinctIds).toEqual(expect.arrayContaining(['first', 'second', 'third'])) // verify Postgres person_id overrides, if applicable - if (overridesMode !== undefined) { + if (overridesMode) { const overrides = await overridesMode.fetchPostgresPersonIdOverrides(hub, teamId) expect(overrides).toEqual([ [second.uuid, first.uuid], @@ -2100,7 +2100,7 @@ describe('PersonState.update()', () => { expect(distinctIds).toEqual(expect.arrayContaining(['first', 'second', 'third'])) // verify Postgres person_id overrides, if applicable - if (overridesMode !== undefined) { + if (overridesMode) { const overrides = await overridesMode.fetchPostgresPersonIdOverrides(hub, teamId) expect(overrides).toEqual([ [second.uuid, first.uuid], From 2bc97ba36d11f54d7ddc8be72771cf76980b7f9f Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Thu, 30 Nov 2023 13:03:20 -0800 Subject: [PATCH 08/25] Syntactical sugar rush --- plugin-server/src/worker/ingestion/person-state.ts | 2 +- plugin-server/tests/worker/ingestion/person-state.test.ts | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/plugin-server/src/worker/ingestion/person-state.ts b/plugin-server/src/worker/ingestion/person-state.ts index b68b106cd6d87..5872837aca26e 100644 --- a/plugin-server/src/worker/ingestion/person-state.ts +++ b/plugin-server/src/worker/ingestion/person-state.ts @@ -101,7 +101,7 @@ export class PersonState { timestamp: DateTime, db: DB, statsd: StatsD | undefined = undefined, - private personOverrideWriter: PersonOverrideWriter | DeferredPersonOverrideWriter | undefined = undefined, + private personOverrideWriter?: PersonOverrideWriter | DeferredPersonOverrideWriter, uuid: UUIDT | undefined = undefined, maxMergeAttempts: number = MAX_FAILED_PERSON_MERGE_ATTEMPTS ) { diff --git a/plugin-server/tests/worker/ingestion/person-state.test.ts b/plugin-server/tests/worker/ingestion/person-state.test.ts index ccbe4c8af2d57..49966e8b77ae1 100644 --- a/plugin-server/tests/worker/ingestion/person-state.test.ts +++ b/plugin-server/tests/worker/ingestion/person-state.test.ts @@ -131,7 +131,7 @@ describe('PersonState.update()', () => { timestamp, customHub ? customHub.db : hub.db, customHub ? customHub.statsd : hub.statsd, - overridesMode !== undefined ? overridesMode.getWriter(customHub ? customHub : hub) : undefined, + overridesMode?.getWriter(customHub ?? hub), uuid, maxMergeAttempts ?? 3 // the default ) From 603090f7911699486257e99365b6c63224caabce Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Thu, 30 Nov 2023 17:36:44 -0800 Subject: [PATCH 09/25] Fix bug in `addPersonOverrideMapping` that was causing cross-team mapping references. --- plugin-server/src/worker/ingestion/person-state.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugin-server/src/worker/ingestion/person-state.ts b/plugin-server/src/worker/ingestion/person-state.ts index 5872837aca26e..bb21ad8f1a894 100644 --- a/plugin-server/src/worker/ingestion/person-state.ts +++ b/plugin-server/src/worker/ingestion/person-state.ts @@ -706,7 +706,7 @@ export class PersonOverrideWriter { UNION ALL SELECT id FROM posthog_personoverridemapping - WHERE uuid = '${personId}' + WHERE team_id = ${teamId} AND uuid = '${personId}' `, undefined, 'personOverrideMapping' From 9144472cd19b309911c7a1711bbd1eea58895770 Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Thu, 30 Nov 2023 17:55:40 -0800 Subject: [PATCH 10/25] Ensure items are processed serially: not sure if this is absolutely necessary, but seems safer. --- plugin-server/src/worker/ingestion/person-state.ts | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/plugin-server/src/worker/ingestion/person-state.ts b/plugin-server/src/worker/ingestion/person-state.ts index bb21ad8f1a894..e8328181fed4e 100644 --- a/plugin-server/src/worker/ingestion/person-state.ts +++ b/plugin-server/src/worker/ingestion/person-state.ts @@ -751,17 +751,18 @@ export class DeferredPersonOverrideWriter { undefined, 'processPendingOverrides' ) - const results = rows.map(async ({ id, ...mergeOperation }) => { - const messages = await writer.addPersonOverride(tx, mergeOperation) + + const messages: ProducerRecord[] = [] + for (const { id, ...mergeOperation } of rows) { + messages.push(...(await writer.addPersonOverride(tx, mergeOperation))) await this.postgres.query( tx, SQL`DELETE FROM posthog_pendingpersonoverride WHERE id = ${id}`, undefined, 'processPendingOverrides' ) - return messages - }) - const messages = (await Promise.all(results)).flat() + } + await kafkaProducer.queueMessages(messages, true) }) } From 60e05e0b8427848e3b1dabfd6c36b8fa822d4223 Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Thu, 30 Nov 2023 19:18:30 -0800 Subject: [PATCH 11/25] Add processing lock. --- plugin-server/src/worker/ingestion/person-state.ts | 14 +++++++++++++- .../tests/worker/ingestion/person-state.test.ts | 6 ++++-- 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/plugin-server/src/worker/ingestion/person-state.ts b/plugin-server/src/worker/ingestion/person-state.ts index e8328181fed4e..50c81a0dfee78 100644 --- a/plugin-server/src/worker/ingestion/person-state.ts +++ b/plugin-server/src/worker/ingestion/person-state.ts @@ -717,7 +717,7 @@ export class PersonOverrideWriter { } export class DeferredPersonOverrideWriter { - constructor(private postgres: PostgresRouter) {} + constructor(private postgres: PostgresRouter, private lockId: number) {} public async addPersonOverride(tx: TransactionClient, mergeOperation: MergeOperation): Promise { await this.postgres.query( @@ -745,6 +745,18 @@ export class DeferredPersonOverrideWriter { const writer = new PersonOverrideWriter(this.postgres) await this.postgres.transaction(PostgresUse.COMMON_WRITE, 'processPendingOverrides', async (tx) => { + const { + rows: [{ acquired }], + } = await this.postgres.query( + tx, + SQL`SELECT pg_try_advisory_xact_lock(${this.lockId}) as acquired`, + undefined, + 'processPendingOverrides' + ) + if (!acquired) { + throw new Error('could not acquire lock') + } + const { rows } = await this.postgres.query( tx, `SELECT * FROM posthog_pendingpersonoverride ORDER BY id`, diff --git a/plugin-server/tests/worker/ingestion/person-state.test.ts b/plugin-server/tests/worker/ingestion/person-state.test.ts index 49966e8b77ae1..b535250033a5c 100644 --- a/plugin-server/tests/worker/ingestion/person-state.test.ts +++ b/plugin-server/tests/worker/ingestion/person-state.test.ts @@ -69,9 +69,11 @@ const PersonOverridesModes: Record = { fetchPostgresPersonIdOverrides: (hub, teamId) => fetchPostgresPersonIdOverrides(hub, teamId), }, deferred: { - getWriter: (hub) => new DeferredPersonOverrideWriter(hub.db.postgres), + // XXX: This is kind of a mess -- ideally it'd be preferable to just + // instantiate the writer once and share it + getWriter: (hub) => new DeferredPersonOverrideWriter(hub.db.postgres, 456), fetchPostgresPersonIdOverrides: async (hub, teamId) => { - await new DeferredPersonOverrideWriter(hub.db.postgres).processPendingOverrides(hub.db.kafkaProducer) + await new DeferredPersonOverrideWriter(hub.db.postgres, 456).processPendingOverrides(hub.db.kafkaProducer) return await fetchPostgresPersonIdOverrides(hub, teamId) }, }, From 5bd9d732439b3a28abcc936bbb14e32e81966694 Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Fri, 1 Dec 2023 14:19:37 -0800 Subject: [PATCH 12/25] Add `Event` class for testing. --- plugin-server/tests/helpers/promises.ts | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/plugin-server/tests/helpers/promises.ts b/plugin-server/tests/helpers/promises.ts index 4d92371bdbbfb..3abd8dc082553 100644 --- a/plugin-server/tests/helpers/promises.ts +++ b/plugin-server/tests/helpers/promises.ts @@ -13,3 +13,22 @@ export function createPromise(): MockPromise { return result as MockPromise } + +export class Event { + private promise: Promise + private resolve: () => void + + constructor() { + this.promise = new Promise((resolve) => { + this.resolve = resolve + }) + } + + public set() { + this.resolve() + } + + public async wait() { + return this.promise + } +} From 206e7871469b40b8ea59ce57d9ff18c518a7beb8 Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Fri, 1 Dec 2023 15:01:06 -0800 Subject: [PATCH 13/25] Ensure advisory lock is held before proceeding with processing overrides. --- plugin-server/tests/helpers/promises.ts | 2 +- .../worker/ingestion/person-state.test.ts | 54 +++++++++++++++++++ 2 files changed, 55 insertions(+), 1 deletion(-) diff --git a/plugin-server/tests/helpers/promises.ts b/plugin-server/tests/helpers/promises.ts index 3abd8dc082553..4daea9156aa02 100644 --- a/plugin-server/tests/helpers/promises.ts +++ b/plugin-server/tests/helpers/promises.ts @@ -14,7 +14,7 @@ export function createPromise(): MockPromise { return result as MockPromise } -export class Event { +export class WaitEvent { private promise: Promise private resolve: () => void diff --git a/plugin-server/tests/worker/ingestion/person-state.test.ts b/plugin-server/tests/worker/ingestion/person-state.test.ts index b535250033a5c..daf7c51fff579 100644 --- a/plugin-server/tests/worker/ingestion/person-state.test.ts +++ b/plugin-server/tests/worker/ingestion/person-state.test.ts @@ -14,6 +14,7 @@ import { PersonState, } from '../../../src/worker/ingestion/person-state' import { delayUntilEventIngested } from '../../helpers/clickhouse' +import { WaitEvent } from '../../helpers/promises' import { createOrganization, createTeam, fetchPostgresPersons, insertRow } from '../../helpers/sql' jest.setTimeout(5000) // 5 sec timeout @@ -2139,3 +2140,56 @@ describe('PersonState.update()', () => { }) }) }) + +describe('DeferredPersonOverrideWriter', () => { + let hub: Hub + let closeHub: () => Promise + + const lockId = 456 + let writer: DeferredPersonOverrideWriter + + beforeAll(async () => { + ;[hub, closeHub] = await createHub({}) + writer = new DeferredPersonOverrideWriter(hub.db.postgres, lockId) + }) + + afterAll(async () => { + await closeHub() + }) + + it('ensures advisory lock is held before processing', async () => { + const { postgres, kafkaProducer } = hub.db + + let acquiredLock: boolean + const tryLockComplete = new WaitEvent() + const readyToReleaseLock = new WaitEvent() + + const transactionHolder = postgres + .transaction(PostgresUse.COMMON_WRITE, '', async (tx) => { + const { rows } = await postgres.query( + tx, + `SELECT pg_try_advisory_lock(${lockId}) as acquired, pg_backend_pid()`, + undefined, + '' + ) + ;[{ acquired: acquiredLock }] = rows + tryLockComplete.set() + await readyToReleaseLock.wait() + }) + .then(() => { + acquiredLock = false + }) + + try { + await tryLockComplete.wait() + expect(acquiredLock!).toBe(true) + await expect(writer.processPendingOverrides(kafkaProducer)).rejects.toThrow(Error('could not acquire lock')) + } finally { + readyToReleaseLock.set() + await transactionHolder + } + + expect(acquiredLock!).toBe(false) + await expect(writer.processPendingOverrides(kafkaProducer)).resolves.not.toThrow() + }) +}) From 19c363a19e827743cb6f050bb0bd4671f1d7c83d Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Fri, 1 Dec 2023 15:18:34 -0800 Subject: [PATCH 14/25] Add return value, docstring, slightly improve test. --- plugin-server/src/worker/ingestion/person-state.ts | 14 ++++++++++++-- .../tests/worker/ingestion/person-state.test.ts | 2 +- 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/plugin-server/src/worker/ingestion/person-state.ts b/plugin-server/src/worker/ingestion/person-state.ts index 50c81a0dfee78..bc7fca31a2dd1 100644 --- a/plugin-server/src/worker/ingestion/person-state.ts +++ b/plugin-server/src/worker/ingestion/person-state.ts @@ -741,10 +741,18 @@ export class DeferredPersonOverrideWriter { return [] } - public async processPendingOverrides(kafkaProducer: KafkaProducerWrapper): Promise { + /** + * Process all pending overrides. An advisory lock is acquired prior to + * processing to ensure that this function has exclusive access to the + * pending overrides during the update process. + * + * @param kafkaProducer + * @returns the number of overrides processed + */ + public async processPendingOverrides(kafkaProducer: KafkaProducerWrapper): Promise { const writer = new PersonOverrideWriter(this.postgres) - await this.postgres.transaction(PostgresUse.COMMON_WRITE, 'processPendingOverrides', async (tx) => { + return await this.postgres.transaction(PostgresUse.COMMON_WRITE, 'processPendingOverrides', async (tx) => { const { rows: [{ acquired }], } = await this.postgres.query( @@ -776,6 +784,8 @@ export class DeferredPersonOverrideWriter { } await kafkaProducer.queueMessages(messages, true) + + return rows.length }) } } diff --git a/plugin-server/tests/worker/ingestion/person-state.test.ts b/plugin-server/tests/worker/ingestion/person-state.test.ts index daf7c51fff579..7436c1f7f4de2 100644 --- a/plugin-server/tests/worker/ingestion/person-state.test.ts +++ b/plugin-server/tests/worker/ingestion/person-state.test.ts @@ -2190,6 +2190,6 @@ describe('DeferredPersonOverrideWriter', () => { } expect(acquiredLock!).toBe(false) - await expect(writer.processPendingOverrides(kafkaProducer)).resolves.not.toThrow() + await expect(writer.processPendingOverrides(kafkaProducer)).resolves.toEqual(0) }) }) From 2a963a3902346d9a3c67c962507a23d58ed77121 Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Fri, 1 Dec 2023 15:42:06 -0800 Subject: [PATCH 15/25] Add a basic test to shoveling data between tables. --- .../worker/ingestion/person-state.test.ts | 60 +++++++++++++++++++ 1 file changed, 60 insertions(+) diff --git a/plugin-server/tests/worker/ingestion/person-state.test.ts b/plugin-server/tests/worker/ingestion/person-state.test.ts index 7436c1f7f4de2..88e693d45b606 100644 --- a/plugin-server/tests/worker/ingestion/person-state.test.ts +++ b/plugin-server/tests/worker/ingestion/person-state.test.ts @@ -2145,18 +2145,78 @@ describe('DeferredPersonOverrideWriter', () => { let hub: Hub let closeHub: () => Promise + // not always used, but used more often then not + let organizationId: string + let teamId: number + const lockId = 456 let writer: DeferredPersonOverrideWriter beforeAll(async () => { ;[hub, closeHub] = await createHub({}) + organizationId = await createOrganization(hub.db.postgres) writer = new DeferredPersonOverrideWriter(hub.db.postgres, lockId) }) + beforeEach(async () => { + teamId = await createTeam(hub.db.postgres, organizationId) + await hub.db.postgres.query( + PostgresUse.COMMON_WRITE, + 'TRUNCATE TABLE posthog_pendingpersonoverride', + undefined, + '' + ) + }) + afterAll(async () => { await closeHub() }) + it('moves overrides from the pending table to the overrides table', async () => { + const { postgres, kafkaProducer } = hub.db + + const override = { + old_person_id: new UUIDT().toString(), + override_person_id: new UUIDT().toString(), + } + + await postgres.transaction(PostgresUse.COMMON_WRITE, '', async (tx) => { + await writer.addPersonOverride(tx, { team_id: teamId, ...override, oldest_event: DateTime.fromMillis(0) }) + }) + + expect( + await postgres.query( + PostgresUse.COMMON_WRITE, + `SELECT old_person_id, override_person_id + FROM posthog_pendingpersonoverride + WHERE team_id = ${teamId}`, + undefined, + '' + ) + ).toMatchObject({ + rows: [override], + }) + + expect(await writer.processPendingOverrides(kafkaProducer)).toEqual(1) + + expect( + await postgres.query( + PostgresUse.COMMON_WRITE, + `SELECT old_person_id, override_person_id + FROM posthog_pendingpersonoverride + WHERE team_id = ${teamId}`, + undefined, + '' + ) + ).toMatchObject({ rows: [] }) + + expect(await fetchPostgresPersonIdOverrides(hub, teamId)).toEqual([ + [override.old_person_id, override.override_person_id], + ]) + + // TODO would also be good to check that this produces to kafka and/or clickhouse + }) + it('ensures advisory lock is held before processing', async () => { const { postgres, kafkaProducer } = hub.db From ee45b07994001fbcf045592909d7141e9c572163 Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Fri, 1 Dec 2023 15:52:35 -0800 Subject: [PATCH 16/25] Ensure that the transaction is rolled back in case of Kafka error. --- .../worker/ingestion/person-state.test.ts | 49 +++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/plugin-server/tests/worker/ingestion/person-state.test.ts b/plugin-server/tests/worker/ingestion/person-state.test.ts index 88e693d45b606..02d63e753f32e 100644 --- a/plugin-server/tests/worker/ingestion/person-state.test.ts +++ b/plugin-server/tests/worker/ingestion/person-state.test.ts @@ -2168,6 +2168,10 @@ describe('DeferredPersonOverrideWriter', () => { ) }) + afterEach(() => { + jest.restoreAllMocks() + }) + afterAll(async () => { await closeHub() }) @@ -2217,6 +2221,51 @@ describe('DeferredPersonOverrideWriter', () => { // TODO would also be good to check that this produces to kafka and/or clickhouse }) + it('rolls back on kafka producer error', async () => { + const { postgres, kafkaProducer } = hub.db + + const override = { + old_person_id: new UUIDT().toString(), + override_person_id: new UUIDT().toString(), + } + + await postgres.transaction(PostgresUse.COMMON_WRITE, '', async (tx) => { + await writer.addPersonOverride(tx, { team_id: teamId, ...override, oldest_event: DateTime.fromMillis(0) }) + }) + + expect( + await postgres.query( + PostgresUse.COMMON_WRITE, + `SELECT old_person_id, override_person_id + FROM posthog_pendingpersonoverride + WHERE team_id = ${teamId}`, + undefined, + '' + ) + ).toMatchObject({ + rows: [override], + }) + + jest.spyOn(kafkaProducer, 'queueMessages').mockImplementation(() => { + throw new Error('something bad happened') + }) + + await expect(writer.processPendingOverrides(kafkaProducer)).rejects.toThrow() + + expect( + await postgres.query( + PostgresUse.COMMON_WRITE, + `SELECT old_person_id, override_person_id + FROM posthog_pendingpersonoverride + WHERE team_id = ${teamId}`, + undefined, + '' + ) + ).toMatchObject({ + rows: [override], + }) + }) + it('ensures advisory lock is held before processing', async () => { const { postgres, kafkaProducer } = hub.db From 53179470e8407fb9cc03380d67581298cb953c21 Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Fri, 1 Dec 2023 17:01:59 -0800 Subject: [PATCH 17/25] Add comments to a couple of things that could use a bit more explanation --- plugin-server/src/worker/ingestion/person-state.ts | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/plugin-server/src/worker/ingestion/person-state.ts b/plugin-server/src/worker/ingestion/person-state.ts index bc7fca31a2dd1..f040877b2ce8e 100644 --- a/plugin-server/src/worker/ingestion/person-state.ts +++ b/plugin-server/src/worker/ingestion/person-state.ts @@ -717,8 +717,15 @@ export class PersonOverrideWriter { } export class DeferredPersonOverrideWriter { + /** + * @param lockId the lock identifier/key used to ensure that only one + * process is updating the overrides at a time + */ constructor(private postgres: PostgresRouter, private lockId: number) {} + /** + * Enqueue an override for deferred processing. + */ public async addPersonOverride(tx: TransactionClient, mergeOperation: MergeOperation): Promise { await this.postgres.query( tx, @@ -746,7 +753,6 @@ export class DeferredPersonOverrideWriter { * processing to ensure that this function has exclusive access to the * pending overrides during the update process. * - * @param kafkaProducer * @returns the number of overrides processed */ public async processPendingOverrides(kafkaProducer: KafkaProducerWrapper): Promise { @@ -765,6 +771,7 @@ export class DeferredPersonOverrideWriter { throw new Error('could not acquire lock') } + // n.b.: Ordering by id ensures we are processing in (roughly) FIFO order const { rows } = await this.postgres.query( tx, `SELECT * FROM posthog_pendingpersonoverride ORDER BY id`, @@ -783,6 +790,10 @@ export class DeferredPersonOverrideWriter { ) } + // n.b.: We publish the messages here (and wait for acks) to ensure + // that all of our override updates are sent to Kafka before + // prior to committing the transaction. If we're unable to publish, + // we should discard updates and try again later when it's available await kafkaProducer.queueMessages(messages, true) return rows.length From a03780549b45d4089c2330d2a221abfb57ef5b76 Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Fri, 1 Dec 2023 17:30:55 -0800 Subject: [PATCH 18/25] Remove some duplication from tests. --- .../worker/ingestion/person-state.test.ts | 62 +++++-------------- 1 file changed, 16 insertions(+), 46 deletions(-) diff --git a/plugin-server/tests/worker/ingestion/person-state.test.ts b/plugin-server/tests/worker/ingestion/person-state.test.ts index 02d63e753f32e..d7478f480b099 100644 --- a/plugin-server/tests/worker/ingestion/person-state.test.ts +++ b/plugin-server/tests/worker/ingestion/person-state.test.ts @@ -2176,6 +2176,18 @@ describe('DeferredPersonOverrideWriter', () => { await closeHub() }) + const getPendingPersonOverrides = async () => { + const { rows } = await hub.db.postgres.query( + PostgresUse.COMMON_WRITE, + `SELECT old_person_id, override_person_id + FROM posthog_pendingpersonoverride + WHERE team_id = ${teamId}`, + undefined, + '' + ) + return rows + } + it('moves overrides from the pending table to the overrides table', async () => { const { postgres, kafkaProducer } = hub.db @@ -2188,31 +2200,11 @@ describe('DeferredPersonOverrideWriter', () => { await writer.addPersonOverride(tx, { team_id: teamId, ...override, oldest_event: DateTime.fromMillis(0) }) }) - expect( - await postgres.query( - PostgresUse.COMMON_WRITE, - `SELECT old_person_id, override_person_id - FROM posthog_pendingpersonoverride - WHERE team_id = ${teamId}`, - undefined, - '' - ) - ).toMatchObject({ - rows: [override], - }) + expect(await getPendingPersonOverrides()).toEqual([override]) expect(await writer.processPendingOverrides(kafkaProducer)).toEqual(1) - expect( - await postgres.query( - PostgresUse.COMMON_WRITE, - `SELECT old_person_id, override_person_id - FROM posthog_pendingpersonoverride - WHERE team_id = ${teamId}`, - undefined, - '' - ) - ).toMatchObject({ rows: [] }) + expect(await getPendingPersonOverrides()).toMatchObject([]) expect(await fetchPostgresPersonIdOverrides(hub, teamId)).toEqual([ [override.old_person_id, override.override_person_id], @@ -2233,18 +2225,7 @@ describe('DeferredPersonOverrideWriter', () => { await writer.addPersonOverride(tx, { team_id: teamId, ...override, oldest_event: DateTime.fromMillis(0) }) }) - expect( - await postgres.query( - PostgresUse.COMMON_WRITE, - `SELECT old_person_id, override_person_id - FROM posthog_pendingpersonoverride - WHERE team_id = ${teamId}`, - undefined, - '' - ) - ).toMatchObject({ - rows: [override], - }) + expect(await getPendingPersonOverrides()).toEqual([override]) jest.spyOn(kafkaProducer, 'queueMessages').mockImplementation(() => { throw new Error('something bad happened') @@ -2252,18 +2233,7 @@ describe('DeferredPersonOverrideWriter', () => { await expect(writer.processPendingOverrides(kafkaProducer)).rejects.toThrow() - expect( - await postgres.query( - PostgresUse.COMMON_WRITE, - `SELECT old_person_id, override_person_id - FROM posthog_pendingpersonoverride - WHERE team_id = ${teamId}`, - undefined, - '' - ) - ).toMatchObject({ - rows: [override], - }) + expect(await getPendingPersonOverrides()).toEqual([override]) }) it('ensures advisory lock is held before processing', async () => { From 15b7f499573f9bdfb84ac8b8b967d8ed96bb73f4 Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Fri, 1 Dec 2023 17:36:44 -0800 Subject: [PATCH 19/25] Consistently use overridesMode (even if not absolutely required) --- plugin-server/tests/worker/ingestion/person-state.test.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/plugin-server/tests/worker/ingestion/person-state.test.ts b/plugin-server/tests/worker/ingestion/person-state.test.ts index d7478f480b099..4a6a51367a2a0 100644 --- a/plugin-server/tests/worker/ingestion/person-state.test.ts +++ b/plugin-server/tests/worker/ingestion/person-state.test.ts @@ -1841,7 +1841,7 @@ describe('PersonState.update()', () => { expect(distinctIdsAfterFailure).toEqual(expect.arrayContaining([['first'], ['second']])) // verify Postgres person_id overrides - const overridesAfterFailure = await fetchPostgresPersonIdOverrides(hub, teamId) + const overridesAfterFailure = await overridesMode!.fetchPostgresPersonIdOverrides(hub, teamId) expect(overridesAfterFailure).toEqual([]) // Now verify we successfully get to our target state if we do not have @@ -1876,7 +1876,7 @@ describe('PersonState.update()', () => { expect(distinctIds).toEqual(expect.arrayContaining(['first', 'second'])) // verify Postgres person_id overrides - const overrides = await fetchPostgresPersonIdOverrides(hub, teamId) + const overrides = await overridesMode!.fetchPostgresPersonIdOverrides(hub, teamId) expect(overrides).toEqual([[second.uuid, first.uuid]]) }) From f37f1c25fa9bc03668058fdc4746aaea12045ded Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Fri, 1 Dec 2023 18:37:21 -0800 Subject: [PATCH 20/25] Might as well make sure the data makes it to ClickHouse too. --- .../tests/worker/ingestion/person-state.test.ts | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/plugin-server/tests/worker/ingestion/person-state.test.ts b/plugin-server/tests/worker/ingestion/person-state.test.ts index 4a6a51367a2a0..491ca82b59f71 100644 --- a/plugin-server/tests/worker/ingestion/person-state.test.ts +++ b/plugin-server/tests/worker/ingestion/person-state.test.ts @@ -1,6 +1,7 @@ import { PluginEvent } from '@posthog/plugin-scaffold' import { DateTime } from 'luxon' +import { waitForExpect } from '../../../functional_tests/expectations' import { Database, Hub, Person } from '../../../src/types' import { DependencyUnavailableError } from '../../../src/utils/db/error' import { createHub } from '../../../src/utils/db/hub' @@ -2210,7 +2211,19 @@ describe('DeferredPersonOverrideWriter', () => { [override.old_person_id, override.override_person_id], ]) - // TODO would also be good to check that this produces to kafka and/or clickhouse + const clickhouseOverrides = await waitForExpect(async () => { + const { data } = await hub.db.clickhouse.querying( + ` + SELECT old_person_id, override_person_id + FROM person_overrides + WHERE team_id = ${teamId} + `, + { dataObjects: true } + ) + expect(data).toHaveLength(1) + return data + }) + expect(clickhouseOverrides).toEqual([override]) }) it('rolls back on kafka producer error', async () => { From 0b14b861e1f8554f9fffc213b034da8eda33d5bf Mon Sep 17 00:00:00 2001 From: ted kaemming <65315+tkaemming@users.noreply.github.com> Date: Mon, 4 Dec 2023 14:35:15 -0800 Subject: [PATCH 21/25] Apply suggestions from code review Co-authored-by: Brett Hoerner --- plugin-server/src/worker/ingestion/person-state.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugin-server/src/worker/ingestion/person-state.ts b/plugin-server/src/worker/ingestion/person-state.ts index f3d33e24f0436..c8010d86894bb 100644 --- a/plugin-server/src/worker/ingestion/person-state.ts +++ b/plugin-server/src/worker/ingestion/person-state.ts @@ -789,7 +789,7 @@ export class DeferredPersonOverrideWriter { } // n.b.: We publish the messages here (and wait for acks) to ensure - // that all of our override updates are sent to Kafka before + // that all of our override updates are sent to Kafka // prior to committing the transaction. If we're unable to publish, // we should discard updates and try again later when it's available await kafkaProducer.queueMessages(messages, true) From cef6d03ad74fa5a14c759b826b14b9ad3ae5c5f1 Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Mon, 4 Dec 2023 14:44:46 -0800 Subject: [PATCH 22/25] Better document ``MergeOperation``/``PersonOverride`` dependency. --- plugin-server/src/worker/ingestion/person-state.ts | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/plugin-server/src/worker/ingestion/person-state.ts b/plugin-server/src/worker/ingestion/person-state.ts index c8010d86894bb..4a1bb638804e0 100644 --- a/plugin-server/src/worker/ingestion/person-state.ts +++ b/plugin-server/src/worker/ingestion/person-state.ts @@ -544,6 +544,13 @@ export class PersonState { } } +/** + * A record of a merge operation occurring. + * + * These property names need to be kept in sync with the ``PersonOverride`` + * Django model (and ``posthog_personoverride`` table schema) as defined in + * ``posthog/models/person/person.py``. + */ type MergeOperation = { team_id: number old_person_id: string From 7fe68051b0135e10255c3de32a28f2c23ac43a52 Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Mon, 4 Dec 2023 15:01:41 -0800 Subject: [PATCH 23/25] Add more detail around retry safety during deferred publish. --- plugin-server/src/worker/ingestion/person-state.ts | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/plugin-server/src/worker/ingestion/person-state.ts b/plugin-server/src/worker/ingestion/person-state.ts index 4a1bb638804e0..882fcae896d20 100644 --- a/plugin-server/src/worker/ingestion/person-state.ts +++ b/plugin-server/src/worker/ingestion/person-state.ts @@ -796,9 +796,15 @@ export class DeferredPersonOverrideWriter { } // n.b.: We publish the messages here (and wait for acks) to ensure - // that all of our override updates are sent to Kafka - // prior to committing the transaction. If we're unable to publish, - // we should discard updates and try again later when it's available + // that all of our override updates are sent to Kafka prior to + // committing the transaction. If we're unable to publish, we should + // discard updates and try again later when it's available -- not + // doing so would cause the copy of this data in ClickHouse to + // slowly drift out of sync with the copy in Postgres. This write is + // safe to retry if we write to Kafka but then fail to commit to + // Postgres for some reason -- the same row state should be + // generated each call, and the receiving ReplacingMergeTree will + // ensure we keep only the latest version after all writes settle.) await kafkaProducer.queueMessages(messages, true) return rows.length From ff8aedab0ee579c6f8a0127e4d5a2b277fa5fbc3 Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Mon, 4 Dec 2023 15:06:55 -0800 Subject: [PATCH 24/25] Rename `MergeOperation` to `PersonOverrideDetails` so that it's relationship to the model is a bit clearer. --- .../src/worker/ingestion/person-state.ts | 50 +++++++++++-------- 1 file changed, 28 insertions(+), 22 deletions(-) diff --git a/plugin-server/src/worker/ingestion/person-state.ts b/plugin-server/src/worker/ingestion/person-state.ts index 882fcae896d20..38a05b72e848c 100644 --- a/plugin-server/src/worker/ingestion/person-state.ts +++ b/plugin-server/src/worker/ingestion/person-state.ts @@ -516,7 +516,7 @@ export class PersonState { if (this.personOverrideWriter) { personOverrideMessages = await this.personOverrideWriter.addPersonOverride( tx, - getMergeOperation(this.teamId, otherPerson, mergeInto) + getPersonOverrideDetails(this.teamId, otherPerson, mergeInto) ) } @@ -551,14 +551,14 @@ export class PersonState { * Django model (and ``posthog_personoverride`` table schema) as defined in * ``posthog/models/person/person.py``. */ -type MergeOperation = { +type PersonOverrideDetails = { team_id: number old_person_id: string override_person_id: string oldest_event: DateTime } -function getMergeOperation(teamId: number, oldPerson: Person, overridePerson: Person): MergeOperation { +function getPersonOverrideDetails(teamId: number, oldPerson: Person, overridePerson: Person): PersonOverrideDetails { if (teamId != oldPerson.team_id || teamId != overridePerson.team_id) { throw new Error('cannot merge persons across different teams') } @@ -573,7 +573,10 @@ function getMergeOperation(teamId: number, oldPerson: Person, overridePerson: Pe export class PersonOverrideWriter { constructor(private postgres: PostgresRouter) {} - public async addPersonOverride(tx: TransactionClient, mergeOperation: MergeOperation): Promise { + public async addPersonOverride( + tx: TransactionClient, + overrideDetails: PersonOverrideDetails + ): Promise { const mergedAt = DateTime.now() /** We'll need to do 4 updates: @@ -584,13 +587,13 @@ export class PersonOverrideWriter { */ const oldPersonMappingId = await this.addPersonOverrideMapping( tx, - mergeOperation.team_id, - mergeOperation.old_person_id + overrideDetails.team_id, + overrideDetails.old_person_id ) const overridePersonMappingId = await this.addPersonOverrideMapping( tx, - mergeOperation.team_id, - mergeOperation.override_person_id + overrideDetails.team_id, + overrideDetails.override_person_id ) await this.postgres.query( @@ -603,10 +606,10 @@ export class PersonOverrideWriter { oldest_event, version ) VALUES ( - ${mergeOperation.team_id}, + ${overrideDetails.team_id}, ${oldPersonMappingId}, ${overridePersonMappingId}, - ${mergeOperation.oldest_event}, + ${overrideDetails.oldest_event}, 0 ) `, @@ -625,7 +628,7 @@ export class PersonOverrideWriter { SET override_person_id = ${overridePersonMappingId}, version = COALESCE(version, 0)::numeric + 1 WHERE - team_id = ${mergeOperation.team_id} AND override_person_id = ${oldPersonMappingId} + team_id = ${overrideDetails.team_id} AND override_person_id = ${oldPersonMappingId} RETURNING old_person_id, version, @@ -654,19 +657,19 @@ export class PersonOverrideWriter { messages: [ { value: JSON.stringify({ - team_id: mergeOperation.team_id, - old_person_id: mergeOperation.old_person_id, - override_person_id: mergeOperation.override_person_id, - oldest_event: castTimestampOrNow(mergeOperation.oldest_event, TimestampFormat.ClickHouse), + team_id: overrideDetails.team_id, + old_person_id: overrideDetails.old_person_id, + override_person_id: overrideDetails.override_person_id, + oldest_event: castTimestampOrNow(overrideDetails.oldest_event, TimestampFormat.ClickHouse), merged_at: castTimestampOrNow(mergedAt, TimestampFormat.ClickHouse), version: 0, }), }, ...transitiveUpdates.map(({ old_person_id, version, oldest_event }) => ({ value: JSON.stringify({ - team_id: mergeOperation.team_id, + team_id: overrideDetails.team_id, old_person_id: old_person_id, - override_person_id: mergeOperation.override_person_id, + override_person_id: overrideDetails.override_person_id, oldest_event: castTimestampOrNow(oldest_event, TimestampFormat.ClickHouse), merged_at: castTimestampOrNow(mergedAt, TimestampFormat.ClickHouse), version: version, @@ -731,7 +734,10 @@ export class DeferredPersonOverrideWriter { /** * Enqueue an override for deferred processing. */ - public async addPersonOverride(tx: TransactionClient, mergeOperation: MergeOperation): Promise { + public async addPersonOverride( + tx: TransactionClient, + overrideDetails: PersonOverrideDetails + ): Promise { await this.postgres.query( tx, SQL` @@ -741,10 +747,10 @@ export class DeferredPersonOverrideWriter { override_person_id, oldest_event ) VALUES ( - ${mergeOperation.team_id}, - ${mergeOperation.old_person_id}, - ${mergeOperation.override_person_id}, - ${mergeOperation.oldest_event} + ${overrideDetails.team_id}, + ${overrideDetails.old_person_id}, + ${overrideDetails.override_person_id}, + ${overrideDetails.oldest_event} )`, undefined, 'pendingPersonOverride' From 01a172f69824dd717ac5ca2c8749dffd7c3df7cc Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Mon, 4 Dec 2023 16:27:54 -0800 Subject: [PATCH 25/25] Add the ability to pass a limit to `processPendingOverrides`. --- .../src/worker/ingestion/person-state.ts | 13 ++++---- .../worker/ingestion/person-state.test.ts | 30 +++++++++++++++++++ 2 files changed, 38 insertions(+), 5 deletions(-) diff --git a/plugin-server/src/worker/ingestion/person-state.ts b/plugin-server/src/worker/ingestion/person-state.ts index 38a05b72e848c..4e41b5aa22ebb 100644 --- a/plugin-server/src/worker/ingestion/person-state.ts +++ b/plugin-server/src/worker/ingestion/person-state.ts @@ -760,13 +760,15 @@ export class DeferredPersonOverrideWriter { } /** - * Process all pending overrides. An advisory lock is acquired prior to - * processing to ensure that this function has exclusive access to the - * pending overrides during the update process. + * Process all (or up to the given limit) pending overrides. + * + * An advisory lock is acquired prior to processing to ensure that this + * function has exclusive access to the pending overrides during the update + * process. * * @returns the number of overrides processed */ - public async processPendingOverrides(kafkaProducer: KafkaProducerWrapper): Promise { + public async processPendingOverrides(kafkaProducer: KafkaProducerWrapper, limit?: number): Promise { const writer = new PersonOverrideWriter(this.postgres) return await this.postgres.transaction(PostgresUse.COMMON_WRITE, 'processPendingOverrides', async (tx) => { @@ -785,7 +787,8 @@ export class DeferredPersonOverrideWriter { // n.b.: Ordering by id ensures we are processing in (roughly) FIFO order const { rows } = await this.postgres.query( tx, - `SELECT * FROM posthog_pendingpersonoverride ORDER BY id`, + `SELECT * FROM posthog_pendingpersonoverride ORDER BY id` + + (limit !== undefined ? ` LIMIT ${limit}` : ''), undefined, 'processPendingOverrides' ) diff --git a/plugin-server/tests/worker/ingestion/person-state.test.ts b/plugin-server/tests/worker/ingestion/person-state.test.ts index 48e35edea06b9..fd1b5cada4104 100644 --- a/plugin-server/tests/worker/ingestion/person-state.test.ts +++ b/plugin-server/tests/worker/ingestion/person-state.test.ts @@ -2270,4 +2270,34 @@ describe('DeferredPersonOverrideWriter', () => { expect(acquiredLock!).toBe(false) await expect(writer.processPendingOverrides(kafkaProducer)).resolves.toEqual(0) }) + + it('respects limit if provided', async () => { + const { postgres, kafkaProducer } = hub.db + + const overrides = [...Array(3)].map(() => ({ + old_person_id: new UUIDT().toString(), + override_person_id: new UUIDT().toString(), + })) + + await postgres.transaction(PostgresUse.COMMON_WRITE, '', async (tx) => { + await Promise.all( + overrides.map( + async (override) => + await writer.addPersonOverride(tx, { + team_id: teamId, + ...override, + oldest_event: DateTime.fromMillis(0), + }) + ) + ) + }) + + expect(await getPendingPersonOverrides()).toEqual(overrides) + + expect(await writer.processPendingOverrides(kafkaProducer, 2)).toEqual(2) + expect(await getPendingPersonOverrides()).toMatchObject(overrides.slice(-1)) + + expect(await writer.processPendingOverrides(kafkaProducer, 2)).toEqual(1) + expect(await getPendingPersonOverrides()).toEqual([]) + }) })