From fa3fc2886c52f31e0b35b925cb4b0c29b9462a47 Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Wed, 6 Dec 2023 15:34:33 -0800 Subject: [PATCH 01/20] Add basic tests for ensuring implementations remain in sync. --- .../src/worker/ingestion/person-state.ts | 25 ++++++ .../worker/ingestion/person-state.test.ts | 80 +++++++++++++++++++ 2 files changed, 105 insertions(+) diff --git a/plugin-server/src/worker/ingestion/person-state.ts b/plugin-server/src/worker/ingestion/person-state.ts index df9377a7df425..ff36bd4e6dd4f 100644 --- a/plugin-server/src/worker/ingestion/person-state.ts +++ b/plugin-server/src/worker/ingestion/person-state.ts @@ -706,6 +706,31 @@ export class PersonOverrideWriter { return id } + + public async getPersonOverrides(teamId: number): Promise { + const { rows } = await this.postgres.query( + PostgresUse.COMMON_WRITE, + SQL` + SELECT + override.team_id, + old_person.uuid as old_person_id, + override_person.uuid as override_person_id, + oldest_event + FROM posthog_personoverride override + LEFT OUTER JOIN posthog_personoverridemapping old_person + ON override.team_id = old_person.team_id AND override.old_person_id = old_person.id + LEFT OUTER JOIN posthog_personoverridemapping override_person + ON override.team_id = override_person.team_id AND override.override_person_id = override_person.id + WHERE override.team_id = ${teamId} + `, + undefined, + 'getPersonOverrides' + ) + return rows.map((row) => ({ + ...row, + oldest_event: DateTime.fromISO(row.oldest_event), + })) + } } const deferredPersonOverridesWrittenCounter = new Counter({ diff --git a/plugin-server/tests/worker/ingestion/person-state.test.ts b/plugin-server/tests/worker/ingestion/person-state.test.ts index 492beab70ca0d..4a0c237d19c96 100644 --- a/plugin-server/tests/worker/ingestion/person-state.test.ts +++ b/plugin-server/tests/worker/ingestion/person-state.test.ts @@ -2095,6 +2095,86 @@ describe('PersonState.update()', () => { }) }) +describe('person overrides', () => { + let hub: Hub + let closeHub: () => Promise + + let organizationId: string + let teamId: number + let writer: PersonOverrideWriter + + beforeAll(async () => { + ;[hub, closeHub] = await createHub({}) + organizationId = await createOrganization(hub.db.postgres) + writer = new PersonOverrideWriter(hub.db.postgres) + }) + + beforeEach(async () => { + teamId = await createTeam(hub.db.postgres, organizationId) + }) + + afterAll(async () => { + await closeHub() + }) + + it('handles direct overrides', async () => { + const { postgres } = hub.db + + const defaults = { + team_id: teamId, + oldest_event: DateTime.fromMillis(0), + } + + const override = { + old_person_id: new UUIDT().toString(), + override_person_id: new UUIDT().toString(), + } + + await postgres.transaction(PostgresUse.COMMON_WRITE, '', async (tx) => { + await writer.addPersonOverride(tx, { ...defaults, ...override }) + }) + + expect(await writer.getPersonOverrides(teamId)).toEqual([{ ...defaults, ...override }]) + }) + + it('handles transitive overrides', async () => { + const { postgres } = hub.db + + const defaults = { + team_id: teamId, + oldest_event: DateTime.fromMillis(0), + } + + const overrides = [ + { + old_person_id: new UUIDT().toString(), + override_person_id: new UUIDT().toString(), + }, + ] + + overrides.push({ + old_person_id: overrides[0].override_person_id, + override_person_id: new UUIDT().toString(), + }) + + await postgres.transaction(PostgresUse.COMMON_WRITE, '', async (tx) => { + for (const override of overrides) { + await writer.addPersonOverride(tx, { ...defaults, ...override }) + } + }) + + expect(new Set(await writer.getPersonOverrides(teamId))).toEqual( + new Set( + overrides.map(({ old_person_id }) => ({ + old_person_id, + override_person_id: overrides.at(-1)!.override_person_id, + ...defaults, + })) + ) + ) + }) +}) + describe('deferred person overrides', () => { let hub: Hub let closeHub: () => Promise From 2ee74a39b08aa67f02e84caff1715dfa006d8b10 Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Wed, 6 Dec 2023 16:12:29 -0800 Subject: [PATCH 02/20] Add flat table override definition. --- posthog/models/person/person.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/posthog/models/person/person.py b/posthog/models/person/person.py index 9ef4a58e18cba..603280d1e4ec9 100644 --- a/posthog/models/person/person.py +++ b/posthog/models/person/person.py @@ -142,6 +142,16 @@ class PendingPersonOverride(models.Model): oldest_event = models.DateTimeField() +class FlatPersonOverride(models.Model): + # TODO: What additional constraints here make sense (and are practical to implement?) + id = models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID") + team_id = models.BigIntegerField() # TODO: Foreign key or not? + old_person_id = models.UUIDField() + override_person_id = models.UUIDField() + oldest_event = models.DateTimeField() + version = models.BigIntegerField(null=True, blank=True) + + class PersonOverride(models.Model): """A model of persons to be overriden in merge or merge-like events. From 4f04b50c9d5c0d346fa39fdc4304f98cc0fbe8d9 Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Wed, 6 Dec 2023 16:38:06 -0800 Subject: [PATCH 03/20] Add automatic migration. --- latest_migrations.manifest | 2 +- posthog/migrations/0376_flatpersonoverride.py | 23 +++++++++++++++++++ 2 files changed, 24 insertions(+), 1 deletion(-) create mode 100644 posthog/migrations/0376_flatpersonoverride.py diff --git a/latest_migrations.manifest b/latest_migrations.manifest index 0e2a72475b984..26bece2933a48 100644 --- a/latest_migrations.manifest +++ b/latest_migrations.manifest @@ -5,7 +5,7 @@ contenttypes: 0002_remove_content_type_name ee: 0015_add_verified_properties otp_static: 0002_throttling otp_totp: 0002_auto_20190420_0723 -posthog: 0375_alter_survey_type +posthog: 0376_flatpersonoverride sessions: 0001_initial social_django: 0010_uid_db_index two_factor: 0007_auto_20201201_1019 diff --git a/posthog/migrations/0376_flatpersonoverride.py b/posthog/migrations/0376_flatpersonoverride.py new file mode 100644 index 0000000000000..c804beaae8035 --- /dev/null +++ b/posthog/migrations/0376_flatpersonoverride.py @@ -0,0 +1,23 @@ +# Generated by Django 3.2.19 on 2023-12-07 00:38 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + dependencies = [ + ("posthog", "0375_alter_survey_type"), + ] + + operations = [ + migrations.CreateModel( + name="FlatPersonOverride", + fields=[ + ("id", models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID")), + ("team_id", models.BigIntegerField()), + ("old_person_id", models.UUIDField()), + ("override_person_id", models.UUIDField()), + ("oldest_event", models.DateTimeField()), + ("version", models.BigIntegerField(blank=True, null=True)), + ], + ), + ] From 9d758609a7bbd0827a2541d0253ecbb50dd50b7e Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Wed, 6 Dec 2023 16:28:04 -0800 Subject: [PATCH 04/20] Add flat person override writer and add to override writer test suite. --- .../src/worker/ingestion/person-state.ts | 106 +++++++++++++++++- .../worker/ingestion/person-state.test.ts | 16 ++- 2 files changed, 118 insertions(+), 4 deletions(-) diff --git a/plugin-server/src/worker/ingestion/person-state.ts b/plugin-server/src/worker/ingestion/person-state.ts index ff36bd4e6dd4f..449bc8e3b8fcc 100644 --- a/plugin-server/src/worker/ingestion/person-state.ts +++ b/plugin-server/src/worker/ingestion/person-state.ts @@ -733,11 +733,115 @@ export class PersonOverrideWriter { } } +export class FlatPersonOverrideWriter { + constructor(private postgres: PostgresRouter) {} + + public async addPersonOverride( + tx: TransactionClient, + overrideDetails: PersonOverrideDetails + ): Promise { + const mergedAt = DateTime.now() + + await this.postgres.query( + tx, + SQL` + INSERT INTO posthog_flatpersonoverride ( + team_id, + old_person_id, + override_person_id, + oldest_event, + version + ) VALUES ( + ${overrideDetails.team_id}, + ${overrideDetails.old_person_id}, + ${overrideDetails.override_person_id}, + ${overrideDetails.oldest_event}, + 0 + ) + `, + undefined, + 'personOverride' + ) + + const { rows: transitiveUpdates } = await this.postgres.query( + tx, + SQL` + UPDATE + posthog_flatpersonoverride + SET + override_person_id = ${overrideDetails.override_person_id}, + version = COALESCE(version, 0)::numeric + 1 + WHERE + team_id = ${overrideDetails.team_id} AND override_person_id = ${overrideDetails.old_person_id} + RETURNING + old_person_id, + version, + oldest_event + `, + undefined, + 'transitivePersonOverrides' + ) + + status.debug('🔁', 'person_overrides_updated', { transitiveUpdates }) + + const personOverrideMessages: ProducerRecord[] = [ + { + topic: KAFKA_PERSON_OVERRIDE, + messages: [ + { + value: JSON.stringify({ + team_id: overrideDetails.team_id, + old_person_id: overrideDetails.old_person_id, + override_person_id: overrideDetails.override_person_id, + oldest_event: castTimestampOrNow(overrideDetails.oldest_event, TimestampFormat.ClickHouse), + merged_at: castTimestampOrNow(mergedAt, TimestampFormat.ClickHouse), + version: 0, + }), + }, + ...transitiveUpdates.map(({ old_person_id, version, oldest_event }) => ({ + value: JSON.stringify({ + team_id: overrideDetails.team_id, + old_person_id: old_person_id, + override_person_id: overrideDetails.override_person_id, + oldest_event: castTimestampOrNow(oldest_event, TimestampFormat.ClickHouse), + merged_at: castTimestampOrNow(mergedAt, TimestampFormat.ClickHouse), + version: version, + }), + })), + ], + }, + ] + + return personOverrideMessages + } + + public async getPersonOverrides(teamId: number): Promise { + const { rows } = await this.postgres.query( + PostgresUse.COMMON_WRITE, + SQL` + SELECT + team_id, + old_person_id, + override_person_id, + oldest_event + FROM posthog_flatpersonoverride + WHERE team_id = ${teamId} + `, + undefined, + 'getPersonOverrides' + ) + return rows.map((row) => ({ + ...row, + team_id: parseInt(row.team_id), // XXX: pg returns bigint as str (reasonably so) + oldest_event: DateTime.fromISO(row.oldest_event), + })) + } +} + const deferredPersonOverridesWrittenCounter = new Counter({ name: 'deferred_person_overrides_written', help: 'Number of person overrides that have been written as pending', }) - export class DeferredPersonOverrideWriter { constructor(private postgres: PostgresRouter) {} diff --git a/plugin-server/tests/worker/ingestion/person-state.test.ts b/plugin-server/tests/worker/ingestion/person-state.test.ts index 4a0c237d19c96..8d3e364f6709b 100644 --- a/plugin-server/tests/worker/ingestion/person-state.test.ts +++ b/plugin-server/tests/worker/ingestion/person-state.test.ts @@ -11,6 +11,7 @@ import { UUIDT } from '../../../src/utils/utils' import { DeferredPersonOverrideWorker, DeferredPersonOverrideWriter, + FlatPersonOverrideWriter, PersonOverrideWriter, PersonState, } from '../../../src/worker/ingestion/person-state' @@ -2095,18 +2096,27 @@ describe('PersonState.update()', () => { }) }) -describe('person overrides', () => { +describe.each(['flat', 'mapping'])('person overrides: %s', (mode) => { let hub: Hub let closeHub: () => Promise let organizationId: string let teamId: number - let writer: PersonOverrideWriter + let writer: PersonOverrideWriter | FlatPersonOverrideWriter beforeAll(async () => { ;[hub, closeHub] = await createHub({}) organizationId = await createOrganization(hub.db.postgres) - writer = new PersonOverrideWriter(hub.db.postgres) + switch (mode) { + case 'mapping': + writer = new PersonOverrideWriter(hub.db.postgres) + break + case 'flat': + writer = new FlatPersonOverrideWriter(hub.db.postgres) + break + default: + throw new Error('unexpected mode') + } }) beforeEach(async () => { From 55c0aafe687079568bf0eebee36c51beb5d214f7 Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Wed, 6 Dec 2023 19:21:34 -0800 Subject: [PATCH 05/20] Improve test structure. --- .../worker/ingestion/person-state.test.ts | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/plugin-server/tests/worker/ingestion/person-state.test.ts b/plugin-server/tests/worker/ingestion/person-state.test.ts index 8d3e364f6709b..28f3f2dc3dc8f 100644 --- a/plugin-server/tests/worker/ingestion/person-state.test.ts +++ b/plugin-server/tests/worker/ingestion/person-state.test.ts @@ -2096,7 +2096,12 @@ describe('PersonState.update()', () => { }) }) -describe.each(['flat', 'mapping'])('person overrides: %s', (mode) => { +const PersonOverridesWriterMode = { + mapping: (hub: Hub) => new PersonOverrideWriter(hub.db.postgres), + flat: (hub: Hub) => new FlatPersonOverrideWriter(hub.db.postgres), +} + +describe.each(Object.keys(PersonOverridesWriterMode))('person overrides: %s', (mode) => { let hub: Hub let closeHub: () => Promise @@ -2107,16 +2112,7 @@ describe.each(['flat', 'mapping'])('person overrides: %s', (mode) => { beforeAll(async () => { ;[hub, closeHub] = await createHub({}) organizationId = await createOrganization(hub.db.postgres) - switch (mode) { - case 'mapping': - writer = new PersonOverrideWriter(hub.db.postgres) - break - case 'flat': - writer = new FlatPersonOverrideWriter(hub.db.postgres) - break - default: - throw new Error('unexpected mode') - } + writer = PersonOverridesWriterMode[mode](hub) }) beforeEach(async () => { From 24dfd484c679e228c0e293b653798e07c65b97db Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Wed, 6 Dec 2023 19:25:08 -0800 Subject: [PATCH 06/20] Provide the override writer as a constructor argument to the override worker. --- plugin-server/src/main/pluginsServer.ts | 8 ++++++-- plugin-server/src/worker/ingestion/person-state.ts | 10 +++++----- .../tests/worker/ingestion/person-state.test.ts | 12 ++++++++++-- 3 files changed, 21 insertions(+), 9 deletions(-) diff --git a/plugin-server/src/main/pluginsServer.ts b/plugin-server/src/main/pluginsServer.ts index f59d196e93d24..c4f23b392bd43 100644 --- a/plugin-server/src/main/pluginsServer.ts +++ b/plugin-server/src/main/pluginsServer.ts @@ -21,7 +21,7 @@ import { status } from '../utils/status' import { delay } from '../utils/utils' import { AppMetrics } from '../worker/ingestion/app-metrics' import { OrganizationManager } from '../worker/ingestion/organization-manager' -import { DeferredPersonOverrideWorker } from '../worker/ingestion/person-state' +import { DeferredPersonOverrideWorker, PersonOverrideWriter } from '../worker/ingestion/person-state' import { TeamManager } from '../worker/ingestion/team-manager' import Piscina, { makePiscina as defaultMakePiscina } from '../worker/piscina' import { GraphileWorker } from './graphile-worker/graphile-worker' @@ -437,7 +437,11 @@ export async function startPluginsServer( const postgres = hub?.postgres ?? new PostgresRouter(serverConfig) const kafkaProducer = hub?.kafkaProducer ?? (await createKafkaProducerWrapper(serverConfig)) - personOverridesPeriodicTask = new DeferredPersonOverrideWorker(postgres, kafkaProducer).runTask(5000) + personOverridesPeriodicTask = new DeferredPersonOverrideWorker( + postgres, + kafkaProducer, + new PersonOverrideWriter(postgres) + ).runTask(5000) personOverridesPeriodicTask.promise.catch(async () => { status.error('⚠️', 'Person override worker task crashed! Requesting shutdown...') await closeJobs() diff --git a/plugin-server/src/worker/ingestion/person-state.ts b/plugin-server/src/worker/ingestion/person-state.ts index 449bc8e3b8fcc..f49ee1ad334e9 100644 --- a/plugin-server/src/worker/ingestion/person-state.ts +++ b/plugin-server/src/worker/ingestion/person-state.ts @@ -888,11 +888,11 @@ export class DeferredPersonOverrideWorker { // it just needs to be consistent across all processes. public readonly lockId = 567 - private writer: PersonOverrideWriter - - constructor(private postgres: PostgresRouter, private kafkaProducer: KafkaProducerWrapper) { - this.writer = new PersonOverrideWriter(this.postgres) - } + constructor( + private postgres: PostgresRouter, + private kafkaProducer: KafkaProducerWrapper, + private writer: PersonOverrideWriter | FlatPersonOverrideWriter + ) {} /** * Process all (or up to the given limit) pending overrides. diff --git a/plugin-server/tests/worker/ingestion/person-state.test.ts b/plugin-server/tests/worker/ingestion/person-state.test.ts index 28f3f2dc3dc8f..5fc83ac579086 100644 --- a/plugin-server/tests/worker/ingestion/person-state.test.ts +++ b/plugin-server/tests/worker/ingestion/person-state.test.ts @@ -74,7 +74,11 @@ const PersonOverridesModes: Record = { deferred: { getWriter: (hub) => new DeferredPersonOverrideWriter(hub.db.postgres), fetchPostgresPersonIdOverrides: async (hub, teamId) => { - await new DeferredPersonOverrideWorker(hub.db.postgres, hub.db.kafkaProducer).processPendingOverrides() + await new DeferredPersonOverrideWorker( + hub.db.postgres, + hub.db.kafkaProducer, + new PersonOverrideWriter(hub.db.postgres) + ).processPendingOverrides() return await fetchPostgresPersonIdOverrides(hub, teamId) }, }, @@ -2196,7 +2200,11 @@ describe('deferred person overrides', () => { ;[hub, closeHub] = await createHub({}) organizationId = await createOrganization(hub.db.postgres) writer = new DeferredPersonOverrideWriter(hub.db.postgres) - worker = new DeferredPersonOverrideWorker(hub.db.postgres, hub.db.kafkaProducer) + worker = new DeferredPersonOverrideWorker( + hub.db.postgres, + hub.db.kafkaProducer, + new PersonOverrideWriter(hub.db.postgres) + ) }) beforeEach(async () => { From 3834e981110c69edade3e15aa21abbd0e8124295 Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Thu, 7 Dec 2023 13:24:00 -0800 Subject: [PATCH 07/20] Consolidate tests on the `getPersonOverrides` writer method for easier test variations. --- .../worker/ingestion/person-state.test.ts | 68 ++++++------------- 1 file changed, 22 insertions(+), 46 deletions(-) diff --git a/plugin-server/tests/worker/ingestion/person-state.test.ts b/plugin-server/tests/worker/ingestion/person-state.test.ts index 5fc83ac579086..adb3f7ab84616 100644 --- a/plugin-server/tests/worker/ingestion/person-state.test.ts +++ b/plugin-server/tests/worker/ingestion/person-state.test.ts @@ -25,41 +25,6 @@ const timestamp = DateTime.fromISO('2020-01-01T12:00:05.200Z').toUTC() const timestamp2 = DateTime.fromISO('2020-02-02T12:00:05.200Z').toUTC() const timestampch = '2020-01-01 12:00:05.000' -async function fetchPostgresPersonIdOverrides(hub: Hub, teamId: number): Promise<[string, string][]> { - const result = await hub.db.postgres.query( - PostgresUse.COMMON_WRITE, - ` - WITH overrides AS ( - SELECT id, old_person_id, override_person_id - FROM posthog_personoverride - WHERE team_id = ${teamId} - ORDER BY id - ) - SELECT - mapping.uuid AS old_person_id, - overrides_mapping.uuid AS override_person_id - FROM - overrides AS first - JOIN - posthog_personoverridemapping AS mapping ON first.old_person_id = mapping.id - JOIN ( - SELECT - second.id AS id, - uuid - FROM - overrides AS second - JOIN posthog_personoverridemapping AS mapping ON second.override_person_id = mapping.id - ) AS overrides_mapping ON overrides_mapping.id = first.id - `, - undefined, - 'fetchPersonIdOverrides' - ) - return result.rows.map(({ old_person_id, override_person_id }) => [old_person_id, override_person_id]).sort() as [ - string, - string - ][] -} - interface PersonOverridesMode { getWriter(hub: Hub): PersonOverrideWriter | DeferredPersonOverrideWriter fetchPostgresPersonIdOverrides(hub: Hub, teamId: number): Promise<[string, string][]> @@ -69,17 +34,27 @@ const PersonOverridesModes: Record = { disabled: undefined, immediate: { getWriter: (hub) => new PersonOverrideWriter(hub.db.postgres), - fetchPostgresPersonIdOverrides: (hub, teamId) => fetchPostgresPersonIdOverrides(hub, teamId), + fetchPostgresPersonIdOverrides: async (hub, teamId) => { + const writer = new PersonOverrideWriter(hub.db.postgres) // XXX: ideally would reference ``this``, not new instance + const overrides = await writer.getPersonOverrides(teamId) + return overrides + .map(({ old_person_id, override_person_id }) => [old_person_id, override_person_id]) + .sort() as [string, string][] + }, }, deferred: { getWriter: (hub) => new DeferredPersonOverrideWriter(hub.db.postgres), fetchPostgresPersonIdOverrides: async (hub, teamId) => { + const writer = new PersonOverrideWriter(hub.db.postgres) await new DeferredPersonOverrideWorker( hub.db.postgres, hub.db.kafkaProducer, - new PersonOverrideWriter(hub.db.postgres) + writer ).processPendingOverrides() - return await fetchPostgresPersonIdOverrides(hub, teamId) + const overrides = await writer.getPersonOverrides(teamId) + return overrides + .map(({ old_person_id, override_person_id }) => [old_person_id, override_person_id]) + .sort() as [string, string][] }, }, } @@ -2194,17 +2169,15 @@ describe('deferred person overrides', () => { let teamId: number let writer: DeferredPersonOverrideWriter + let syncWriter: PersonOverrideWriter let worker: DeferredPersonOverrideWorker beforeAll(async () => { ;[hub, closeHub] = await createHub({}) organizationId = await createOrganization(hub.db.postgres) writer = new DeferredPersonOverrideWriter(hub.db.postgres) - worker = new DeferredPersonOverrideWorker( - hub.db.postgres, - hub.db.kafkaProducer, - new PersonOverrideWriter(hub.db.postgres) - ) + syncWriter = new PersonOverrideWriter(hub.db.postgres) + worker = new DeferredPersonOverrideWorker(hub.db.postgres, hub.db.kafkaProducer, syncWriter) }) beforeEach(async () => { @@ -2255,9 +2228,12 @@ describe('deferred person overrides', () => { expect(await getPendingPersonOverrides()).toMatchObject([]) - expect(await fetchPostgresPersonIdOverrides(hub, teamId)).toEqual([ - [override.old_person_id, override.override_person_id], - ]) + expect( + (await syncWriter.getPersonOverrides(teamId)).map(({ old_person_id, override_person_id }) => [ + old_person_id, + override_person_id, + ]) + ).toEqual([[override.old_person_id, override.override_person_id]]) const clickhouseOverrides = await waitForExpect(async () => { const { data } = await hub.db.clickhouse.querying( From 201f74d40cb38f0898916689c89e68f40b4d9eff Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Thu, 7 Dec 2023 13:29:57 -0800 Subject: [PATCH 08/20] Use set instead of sorted arrays. --- .../worker/ingestion/person-state.test.ts | 55 +++++++++++-------- 1 file changed, 33 insertions(+), 22 deletions(-) diff --git a/plugin-server/tests/worker/ingestion/person-state.test.ts b/plugin-server/tests/worker/ingestion/person-state.test.ts index adb3f7ab84616..a202889968f43 100644 --- a/plugin-server/tests/worker/ingestion/person-state.test.ts +++ b/plugin-server/tests/worker/ingestion/person-state.test.ts @@ -27,7 +27,10 @@ const timestampch = '2020-01-01 12:00:05.000' interface PersonOverridesMode { getWriter(hub: Hub): PersonOverrideWriter | DeferredPersonOverrideWriter - fetchPostgresPersonIdOverrides(hub: Hub, teamId: number): Promise<[string, string][]> + fetchPostgresPersonIdOverrides( + hub: Hub, + teamId: number + ): Promise> } const PersonOverridesModes: Record = { @@ -36,25 +39,29 @@ const PersonOverridesModes: Record = { getWriter: (hub) => new PersonOverrideWriter(hub.db.postgres), fetchPostgresPersonIdOverrides: async (hub, teamId) => { const writer = new PersonOverrideWriter(hub.db.postgres) // XXX: ideally would reference ``this``, not new instance - const overrides = await writer.getPersonOverrides(teamId) - return overrides - .map(({ old_person_id, override_person_id }) => [old_person_id, override_person_id]) - .sort() as [string, string][] + return new Set( + (await writer.getPersonOverrides(teamId)).map(({ old_person_id, override_person_id }) => ({ + old_person_id, + override_person_id, + })) + ) }, }, deferred: { getWriter: (hub) => new DeferredPersonOverrideWriter(hub.db.postgres), fetchPostgresPersonIdOverrides: async (hub, teamId) => { - const writer = new PersonOverrideWriter(hub.db.postgres) + const syncWriter = new PersonOverrideWriter(hub.db.postgres) await new DeferredPersonOverrideWorker( hub.db.postgres, hub.db.kafkaProducer, - writer + syncWriter ).processPendingOverrides() - const overrides = await writer.getPersonOverrides(teamId) - return overrides - .map(({ old_person_id, override_person_id }) => [old_person_id, override_person_id]) - .sort() as [string, string][] + return new Set( + (await syncWriter.getPersonOverrides(teamId)).map(({ old_person_id, override_person_id }) => ({ + old_person_id, + override_person_id, + })) + ) }, }, } @@ -1551,7 +1558,7 @@ describe('PersonState.update()', () => { // verify Postgres person_id overrides, if applicable if (overridesMode) { const overrides = await overridesMode.fetchPostgresPersonIdOverrides(hub, teamId) - expect(overrides).toEqual([[second.uuid, first.uuid]]) + expect(overrides).toEqual(new Set([{ old_person_id: second.uuid, override_person_id: first.uuid }])) // & CH person overrides // TODO } @@ -1802,7 +1809,7 @@ describe('PersonState.update()', () => { // verify Postgres person_id overrides const overridesAfterFailure = await overridesMode!.fetchPostgresPersonIdOverrides(hub, teamId) - expect(overridesAfterFailure).toEqual([]) + expect(overridesAfterFailure).toEqual(new Set()) // Now verify we successfully get to our target state if we do not have // any db errors. @@ -1837,7 +1844,7 @@ describe('PersonState.update()', () => { // verify Postgres person_id overrides const overrides = await overridesMode!.fetchPostgresPersonIdOverrides(hub, teamId) - expect(overrides).toEqual([[second.uuid, first.uuid]]) + expect(overrides).toEqual(new Set([{ old_person_id: second.uuid, override_person_id: first.uuid }])) }) it(`handles a chain of overrides being applied concurrently`, async () => { @@ -1978,10 +1985,12 @@ describe('PersonState.update()', () => { // verify Postgres person_id overrides, if applicable if (overridesMode) { const overrides = await overridesMode.fetchPostgresPersonIdOverrides(hub, teamId) - expect(overrides).toEqual([ - [second.uuid, first.uuid], - [third.uuid, first.uuid], - ]) + expect(overrides).toEqual( + new Set([ + { old_person_id: second.uuid, override_person_id: first.uuid }, + { old_person_id: third.uuid, override_person_id: first.uuid }, + ]) + ) } }) @@ -2065,10 +2074,12 @@ describe('PersonState.update()', () => { // verify Postgres person_id overrides, if applicable if (overridesMode) { const overrides = await overridesMode.fetchPostgresPersonIdOverrides(hub, teamId) - expect(overrides).toEqual([ - [second.uuid, first.uuid], - [third.uuid, first.uuid], - ]) + expect(overrides).toEqual( + new Set([ + { old_person_id: second.uuid, override_person_id: first.uuid }, + { old_person_id: third.uuid, override_person_id: first.uuid }, + ]) + ) } }) }) From ce1365b5f44a4d0af23a0a2cb56bd21ed5107d18 Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Thu, 7 Dec 2023 14:04:38 -0800 Subject: [PATCH 09/20] Add new test variant for flat overrides to the big test suite. --- .../worker/ingestion/person-state.test.ts | 29 ++++++++++++++++--- 1 file changed, 25 insertions(+), 4 deletions(-) diff --git a/plugin-server/tests/worker/ingestion/person-state.test.ts b/plugin-server/tests/worker/ingestion/person-state.test.ts index a202889968f43..7f912d73eaddf 100644 --- a/plugin-server/tests/worker/ingestion/person-state.test.ts +++ b/plugin-server/tests/worker/ingestion/person-state.test.ts @@ -26,6 +26,7 @@ const timestamp2 = DateTime.fromISO('2020-02-02T12:00:05.200Z').toUTC() const timestampch = '2020-01-01 12:00:05.000' interface PersonOverridesMode { + supportsSyncTransaction: boolean getWriter(hub: Hub): PersonOverrideWriter | DeferredPersonOverrideWriter fetchPostgresPersonIdOverrides( hub: Hub, @@ -35,7 +36,8 @@ interface PersonOverridesMode { const PersonOverridesModes: Record = { disabled: undefined, - immediate: { + 'immediate, with mappings': { + supportsSyncTransaction: true, getWriter: (hub) => new PersonOverrideWriter(hub.db.postgres), fetchPostgresPersonIdOverrides: async (hub, teamId) => { const writer = new PersonOverrideWriter(hub.db.postgres) // XXX: ideally would reference ``this``, not new instance @@ -47,7 +49,8 @@ const PersonOverridesModes: Record = { ) }, }, - deferred: { + 'deferred, with mappings': { + supportsSyncTransaction: false, getWriter: (hub) => new DeferredPersonOverrideWriter(hub.db.postgres), fetchPostgresPersonIdOverrides: async (hub, teamId) => { const syncWriter = new PersonOverrideWriter(hub.db.postgres) @@ -64,6 +67,24 @@ const PersonOverridesModes: Record = { ) }, }, + 'deferred, without mappings (flat)': { + supportsSyncTransaction: false, + getWriter: (hub) => new DeferredPersonOverrideWriter(hub.db.postgres), + fetchPostgresPersonIdOverrides: async (hub, teamId) => { + const syncWriter = new FlatPersonOverrideWriter(hub.db.postgres) + await new DeferredPersonOverrideWorker( + hub.db.postgres, + hub.db.kafkaProducer, + syncWriter + ).processPendingOverrides() + return new Set( + (await syncWriter.getPersonOverrides(teamId)).map(({ old_person_id, override_person_id }) => ({ + old_person_id, + override_person_id, + })) + ) + }, + }, } describe('PersonState.update()', () => { @@ -1720,8 +1741,8 @@ describe('PersonState.update()', () => { }) it(`does not commit partial transactions on override conflicts`, async () => { - if (overridesMode !== PersonOverridesModes.immediate) { - return // this behavior is only supported with immediate overrides + if (!overridesMode?.supportsSyncTransaction) { + return } const first: Person = await hub.db.createPerson( timestamp, From 922d11d36439dcb578b384da9f94ae4107f800ab Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Thu, 7 Dec 2023 14:10:06 -0800 Subject: [PATCH 10/20] Test twiddling --- plugin-server/tests/worker/ingestion/person-state.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugin-server/tests/worker/ingestion/person-state.test.ts b/plugin-server/tests/worker/ingestion/person-state.test.ts index 7f912d73eaddf..318c3504d3e98 100644 --- a/plugin-server/tests/worker/ingestion/person-state.test.ts +++ b/plugin-server/tests/worker/ingestion/person-state.test.ts @@ -2112,7 +2112,7 @@ const PersonOverridesWriterMode = { flat: (hub: Hub) => new FlatPersonOverrideWriter(hub.db.postgres), } -describe.each(Object.keys(PersonOverridesWriterMode))('person overrides: %s', (mode) => { +describe.each(Object.keys(PersonOverridesWriterMode))('person overrides writer: %s', (mode) => { let hub: Hub let closeHub: () => Promise From 845cf509955831ec5d3826f6748ab3e8a26e0b44 Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Thu, 7 Dec 2023 14:23:06 -0800 Subject: [PATCH 11/20] Add `POE_DEFERRED_WRITES_USE_FLAT_OVERRIDES` setting. --- plugin-server/src/config/config.ts | 1 + plugin-server/src/main/pluginsServer.ts | 10 ++++++++-- plugin-server/src/types.ts | 1 + 3 files changed, 10 insertions(+), 2 deletions(-) diff --git a/plugin-server/src/config/config.ts b/plugin-server/src/config/config.ts index 40afd2c248481..973b9c99eba46 100644 --- a/plugin-server/src/config/config.ts +++ b/plugin-server/src/config/config.ts @@ -129,6 +129,7 @@ export function getDefaultConfig(): PluginsServerConfig { DROP_EVENTS_BY_TOKEN_DISTINCT_ID: '', DROP_EVENTS_BY_TOKEN: '', POE_DEFERRED_WRITES_ENABLED: false, + POE_DEFERRED_WRITES_USE_FLAT_OVERRIDES: false, POE_EMBRACE_JOIN_FOR_TEAMS: '', RELOAD_PLUGIN_JITTER_MAX_MS: 60000, RUSTY_HOOK_FOR_TEAMS: '', diff --git a/plugin-server/src/main/pluginsServer.ts b/plugin-server/src/main/pluginsServer.ts index c4f23b392bd43..74a1dff5289c7 100644 --- a/plugin-server/src/main/pluginsServer.ts +++ b/plugin-server/src/main/pluginsServer.ts @@ -21,7 +21,11 @@ import { status } from '../utils/status' import { delay } from '../utils/utils' import { AppMetrics } from '../worker/ingestion/app-metrics' import { OrganizationManager } from '../worker/ingestion/organization-manager' -import { DeferredPersonOverrideWorker, PersonOverrideWriter } from '../worker/ingestion/person-state' +import { + DeferredPersonOverrideWorker, + FlatPersonOverrideWriter, + PersonOverrideWriter, +} from '../worker/ingestion/person-state' import { TeamManager } from '../worker/ingestion/team-manager' import Piscina, { makePiscina as defaultMakePiscina } from '../worker/piscina' import { GraphileWorker } from './graphile-worker/graphile-worker' @@ -440,7 +444,9 @@ export async function startPluginsServer( personOverridesPeriodicTask = new DeferredPersonOverrideWorker( postgres, kafkaProducer, - new PersonOverrideWriter(postgres) + hub?.POE_DEFERRED_WRITES_USE_FLAT_OVERRIDES // XXX: should we ensure there is a valid hub instance here for config? + ? new FlatPersonOverrideWriter(postgres) + : new PersonOverrideWriter(postgres) ).runTask(5000) personOverridesPeriodicTask.promise.catch(async () => { status.error('⚠️', 'Person override worker task crashed! Requesting shutdown...') diff --git a/plugin-server/src/types.ts b/plugin-server/src/types.ts index 062735c2525b2..0d15899c84aa2 100644 --- a/plugin-server/src/types.ts +++ b/plugin-server/src/types.ts @@ -200,6 +200,7 @@ export interface PluginsServerConfig { DROP_EVENTS_BY_TOKEN: string POE_EMBRACE_JOIN_FOR_TEAMS: string POE_DEFERRED_WRITES_ENABLED: boolean + POE_DEFERRED_WRITES_USE_FLAT_OVERRIDES: boolean RELOAD_PLUGIN_JITTER_MAX_MS: number RUSTY_HOOK_FOR_TEAMS: string RUSTY_HOOK_URL: string From abe8f8ad8a08a908b7d07fa58d69792c0d724c41 Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Thu, 7 Dec 2023 18:33:19 -0800 Subject: [PATCH 12/20] Remove comment about foreign key on team_id. --- posthog/models/person/person.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/posthog/models/person/person.py b/posthog/models/person/person.py index 603280d1e4ec9..ad3b9c16b04ba 100644 --- a/posthog/models/person/person.py +++ b/posthog/models/person/person.py @@ -145,7 +145,7 @@ class PendingPersonOverride(models.Model): class FlatPersonOverride(models.Model): # TODO: What additional constraints here make sense (and are practical to implement?) id = models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID") - team_id = models.BigIntegerField() # TODO: Foreign key or not? + team_id = models.BigIntegerField() old_person_id = models.UUIDField() override_person_id = models.UUIDField() oldest_event = models.DateTimeField() From 61fb17e8af8746d605d8acfa563be35e8ecc763e Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Thu, 7 Dec 2023 18:37:40 -0800 Subject: [PATCH 13/20] Split up non-deferred from deferred overrides, remove another comment. --- posthog/models/person/person.py | 35 ++++++++++++++++----------------- 1 file changed, 17 insertions(+), 18 deletions(-) diff --git a/posthog/models/person/person.py b/posthog/models/person/person.py index ad3b9c16b04ba..364e76c7758f8 100644 --- a/posthog/models/person/person.py +++ b/posthog/models/person/person.py @@ -134,24 +134,6 @@ class Meta: uuid = models.UUIDField() -class PendingPersonOverride(models.Model): - id = models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID") - team_id = models.BigIntegerField() - old_person_id = models.UUIDField() - override_person_id = models.UUIDField() - oldest_event = models.DateTimeField() - - -class FlatPersonOverride(models.Model): - # TODO: What additional constraints here make sense (and are practical to implement?) - id = models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID") - team_id = models.BigIntegerField() - old_person_id = models.UUIDField() - override_person_id = models.UUIDField() - oldest_event = models.DateTimeField() - version = models.BigIntegerField(null=True, blank=True) - - class PersonOverride(models.Model): """A model of persons to be overriden in merge or merge-like events. @@ -197,6 +179,23 @@ class Meta: version: models.BigIntegerField = models.BigIntegerField(null=True, blank=True) +class PendingPersonOverride(models.Model): + id = models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID") + team_id = models.BigIntegerField() + old_person_id = models.UUIDField() + override_person_id = models.UUIDField() + oldest_event = models.DateTimeField() + + +class FlatPersonOverride(models.Model): + id = models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID") + team_id = models.BigIntegerField() + old_person_id = models.UUIDField() + override_person_id = models.UUIDField() + oldest_event = models.DateTimeField() + version = models.BigIntegerField(null=True, blank=True) + + def get_distinct_ids_for_subquery(person: Person | None, team: Team) -> List[str]: """_summary_ Fetching distinct_ids for a person from CH is slow, so we From 72e2c9c30bc55068b8c3ae982d761b873078d933 Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Thu, 14 Dec 2023 20:18:40 -0800 Subject: [PATCH 14/20] Use `serverConfig` instead of optional `hub` when setting up --- plugin-server/src/main/pluginsServer.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugin-server/src/main/pluginsServer.ts b/plugin-server/src/main/pluginsServer.ts index 74a1dff5289c7..5b94900e807f9 100644 --- a/plugin-server/src/main/pluginsServer.ts +++ b/plugin-server/src/main/pluginsServer.ts @@ -444,7 +444,7 @@ export async function startPluginsServer( personOverridesPeriodicTask = new DeferredPersonOverrideWorker( postgres, kafkaProducer, - hub?.POE_DEFERRED_WRITES_USE_FLAT_OVERRIDES // XXX: should we ensure there is a valid hub instance here for config? + serverConfig.POE_DEFERRED_WRITES_USE_FLAT_OVERRIDES ? new FlatPersonOverrideWriter(postgres) : new PersonOverrideWriter(postgres) ).runTask(5000) From 1a1d5a0f338b91d2f3e48632d11c7d422a1c7b4b Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Fri, 15 Dec 2023 14:32:17 -0800 Subject: [PATCH 15/20] Add index --- posthog/migrations/0376_flatpersonoverride.py | 4 ++++ posthog/models/person/person.py | 5 +++++ 2 files changed, 9 insertions(+) diff --git a/posthog/migrations/0376_flatpersonoverride.py b/posthog/migrations/0376_flatpersonoverride.py index c804beaae8035..b7d918679b52b 100644 --- a/posthog/migrations/0376_flatpersonoverride.py +++ b/posthog/migrations/0376_flatpersonoverride.py @@ -20,4 +20,8 @@ class Migration(migrations.Migration): ("version", models.BigIntegerField(blank=True, null=True)), ], ), + migrations.AddIndex( + model_name="flatpersonoverride", + index=models.Index(fields=["team_id", "override_person_id"], name="posthog_fla_team_id_224253_idx"), + ), ] diff --git a/posthog/models/person/person.py b/posthog/models/person/person.py index 364e76c7758f8..a970910a9b479 100644 --- a/posthog/models/person/person.py +++ b/posthog/models/person/person.py @@ -195,6 +195,11 @@ class FlatPersonOverride(models.Model): oldest_event = models.DateTimeField() version = models.BigIntegerField(null=True, blank=True) + class Meta: + indexes = [ + models.Index(fields=["team_id", "override_person_id"]), + ] + def get_distinct_ids_for_subquery(person: Person | None, team: Team) -> List[str]: """_summary_ From 796983f41f635d7eb5b6734af3e31ddc01345bd5 Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Fri, 15 Dec 2023 15:13:00 -0800 Subject: [PATCH 16/20] Write comment for pending overrides. --- posthog/models/person/person.py | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/posthog/models/person/person.py b/posthog/models/person/person.py index a970910a9b479..efd290c40e3d2 100644 --- a/posthog/models/person/person.py +++ b/posthog/models/person/person.py @@ -180,6 +180,33 @@ class Meta: class PendingPersonOverride(models.Model): + """ + The pending person overrides model/table contains records of merges that + have occurred, but have not yet been integrated into the person overrides + table. + + This table should generally be considered as log table or queue. When a + merge occurs, it is recorded to the log (added to the queue) as part of the + merge transaction. Later, another process comes along, reading from the + other end of the log (popping from the queue) and applying the necessary + updates to the person overrides table as part of secondary transaction that + does not adversely impact the ingestion pipeline. + + This approach allows us to decouple the set of operations that must occur as + part of an atomic transactional unit during person merging (moving distinct + IDs, merging properties, deleting the subsumed person, etc.) from those that + are more tolerant to eventual consistency (updating person overrides in + Postgres and subsequently relaying those updates to ClickHouse in various + forms to update the person associated with an event.) This decoupling helps + us to minimize the overhead of the primary merge transaction by reducing the + degree of contention within the ingestion pipeline caused by long-running + transactions. This decoupling also allows us to serialize the execution of + all updates to the person overrides table through a single writer, which + allows us to safely update the person overrides table while handling tricky + cases like applying transitive updates without the need for expensive table + constraints to ensure their validity. + """ + id = models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID") team_id = models.BigIntegerField() old_person_id = models.UUIDField() From 9515c372d32ac79e2cd8d7e10386e0b1f18034b3 Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Fri, 15 Dec 2023 15:19:48 -0800 Subject: [PATCH 17/20] Tweak woring --- posthog/models/person/person.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/posthog/models/person/person.py b/posthog/models/person/person.py index efd290c40e3d2..5fcd0f09e4caf 100644 --- a/posthog/models/person/person.py +++ b/posthog/models/person/person.py @@ -185,12 +185,11 @@ class PendingPersonOverride(models.Model): have occurred, but have not yet been integrated into the person overrides table. - This table should generally be considered as log table or queue. When a + This table should generally be considered as a log table or queue. When a merge occurs, it is recorded to the log (added to the queue) as part of the merge transaction. Later, another process comes along, reading from the other end of the log (popping from the queue) and applying the necessary - updates to the person overrides table as part of secondary transaction that - does not adversely impact the ingestion pipeline. + updates to the person overrides table as part of secondary transaction. This approach allows us to decouple the set of operations that must occur as part of an atomic transactional unit during person merging (moving distinct From fe8c88a2d6d12c06cab8e41220bc2faafe1cd157 Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Fri, 15 Dec 2023 15:55:34 -0800 Subject: [PATCH 18/20] Add comment to flat person model. --- posthog/models/person/person.py | 32 ++++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/posthog/models/person/person.py b/posthog/models/person/person.py index 5fcd0f09e4caf..1085938628f37 100644 --- a/posthog/models/person/person.py +++ b/posthog/models/person/person.py @@ -214,6 +214,38 @@ class PendingPersonOverride(models.Model): class FlatPersonOverride(models.Model): + """ + The (flat) person overrides model/table contains a consolidated record of + all merges that have occurred, but have not yet been integrated into the + ClickHouse events table through a squash operation. Once the effects of a + merge have been integrated into the events table, the associated override + record can be deleted from this table. + + This table is in some sense a materialized view over the pending person + overrides table (i.e. the merge log.) It differs from that base table in + that it should be maintained during updates to account for the effects of + transitive merges. For example, if person A is merged into person B, and + then person B is merged into person C, we'd expect the first record (A->B) + to be updated to reflect that person A has been merged into person C (A->C, + eliding the intermediate step.) + + There are several important expectations about the nature of the data within + this table: + + * A person should only appear as an "old" person at most once for a given + team (as appearing more than once would imply they were merged into + multiple people.) + * A person cannot be merged into themselves (i.e. be both the "old" and . + "override" person within a given row.) + * A person should only appear in a table as _either_ an "old" person or + "override" person for a given team -- but never both, as this would + indicate a failure to account for a transitive merge. + + The "flat" in the table name is used to distinguish this table from a prior + approach that required multiple tables to maintain the same state but + otherwise has little significance of its own. + """ + id = models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID") team_id = models.BigIntegerField() old_person_id = models.UUIDField() From b042f2f520de7fe0ae07e244de0f2f96bc16bd05 Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Fri, 15 Dec 2023 16:03:46 -0800 Subject: [PATCH 19/20] Remove errant punctuation --- posthog/models/person/person.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/posthog/models/person/person.py b/posthog/models/person/person.py index 1085938628f37..ccd0f6ab23957 100644 --- a/posthog/models/person/person.py +++ b/posthog/models/person/person.py @@ -235,7 +235,7 @@ class FlatPersonOverride(models.Model): * A person should only appear as an "old" person at most once for a given team (as appearing more than once would imply they were merged into multiple people.) - * A person cannot be merged into themselves (i.e. be both the "old" and . + * A person cannot be merged into themselves (i.e. be both the "old" and "override" person within a given row.) * A person should only appear in a table as _either_ an "old" person or "override" person for a given team -- but never both, as this would From 7b8176bde66d499af75130759fca69bf54892164 Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Mon, 18 Dec 2023 11:34:51 -0800 Subject: [PATCH 20/20] Add back constraints. --- posthog/migrations/0376_flatpersonoverride.py | 16 ++++++++++++++++ posthog/models/person/person.py | 15 +++++++++++++++ 2 files changed, 31 insertions(+) diff --git a/posthog/migrations/0376_flatpersonoverride.py b/posthog/migrations/0376_flatpersonoverride.py index b7d918679b52b..128e7409598a7 100644 --- a/posthog/migrations/0376_flatpersonoverride.py +++ b/posthog/migrations/0376_flatpersonoverride.py @@ -1,6 +1,7 @@ # Generated by Django 3.2.19 on 2023-12-07 00:38 from django.db import migrations, models +import django.db.models.expressions class Migration(migrations.Migration): @@ -24,4 +25,19 @@ class Migration(migrations.Migration): model_name="flatpersonoverride", index=models.Index(fields=["team_id", "override_person_id"], name="posthog_fla_team_id_224253_idx"), ), + migrations.AddConstraint( + model_name="flatpersonoverride", + constraint=models.UniqueConstraint( + fields=("team_id", "old_person_id"), name="flatpersonoverride_unique_old_person_by_team" + ), + ), + migrations.AddConstraint( + model_name="flatpersonoverride", + constraint=models.CheckConstraint( + check=models.Q( + ("old_person_id__exact", django.db.models.expressions.F("override_person_id")), _negated=True + ), + name="flatpersonoverride_check_circular_reference", + ), + ), ] diff --git a/posthog/models/person/person.py b/posthog/models/person/person.py index ccd0f6ab23957..902742219ab0a 100644 --- a/posthog/models/person/person.py +++ b/posthog/models/person/person.py @@ -241,6 +241,11 @@ class FlatPersonOverride(models.Model): "override" person for a given team -- but never both, as this would indicate a failure to account for a transitive merge. + The first two of these expectations can be enforced as constraints, but + unfortunately we've found the third to be too costly to enforce in practice. + Instead, we try to ensure that this invariant holds by serializing all + writes to this table through the ``PendingPersonOverride`` model above. + The "flat" in the table name is used to distinguish this table from a prior approach that required multiple tables to maintain the same state but otherwise has little significance of its own. @@ -257,6 +262,16 @@ class Meta: indexes = [ models.Index(fields=["team_id", "override_person_id"]), ] + constraints = [ + models.UniqueConstraint( + fields=["team_id", "old_person_id"], + name="flatpersonoverride_unique_old_person_by_team", + ), + models.CheckConstraint( + check=~Q(old_person_id__exact=F("override_person_id")), + name="flatpersonoverride_check_circular_reference", + ), + ] def get_distinct_ids_for_subquery(person: Person | None, team: Team) -> List[str]: