diff --git a/latest_migrations.manifest b/latest_migrations.manifest index 6ac9f5e85697a..13510a650acfe 100644 --- a/latest_migrations.manifest +++ b/latest_migrations.manifest @@ -5,7 +5,7 @@ contenttypes: 0002_remove_content_type_name ee: 0015_add_verified_properties otp_static: 0002_throttling otp_totp: 0002_auto_20190420_0723 -posthog: 0376_externaldataschema_last_synced_at +posthog: 0377_flatpersonoverride sessions: 0001_initial social_django: 0010_uid_db_index two_factor: 0007_auto_20201201_1019 \ No newline at end of file diff --git a/plugin-server/src/config/config.ts b/plugin-server/src/config/config.ts index 40afd2c248481..973b9c99eba46 100644 --- a/plugin-server/src/config/config.ts +++ b/plugin-server/src/config/config.ts @@ -129,6 +129,7 @@ export function getDefaultConfig(): PluginsServerConfig { DROP_EVENTS_BY_TOKEN_DISTINCT_ID: '', DROP_EVENTS_BY_TOKEN: '', POE_DEFERRED_WRITES_ENABLED: false, + POE_DEFERRED_WRITES_USE_FLAT_OVERRIDES: false, POE_EMBRACE_JOIN_FOR_TEAMS: '', RELOAD_PLUGIN_JITTER_MAX_MS: 60000, RUSTY_HOOK_FOR_TEAMS: '', diff --git a/plugin-server/src/main/pluginsServer.ts b/plugin-server/src/main/pluginsServer.ts index f59d196e93d24..5b94900e807f9 100644 --- a/plugin-server/src/main/pluginsServer.ts +++ b/plugin-server/src/main/pluginsServer.ts @@ -21,7 +21,11 @@ import { status } from '../utils/status' import { delay } from '../utils/utils' import { AppMetrics } from '../worker/ingestion/app-metrics' import { OrganizationManager } from '../worker/ingestion/organization-manager' -import { DeferredPersonOverrideWorker } from '../worker/ingestion/person-state' +import { + DeferredPersonOverrideWorker, + FlatPersonOverrideWriter, + PersonOverrideWriter, +} from '../worker/ingestion/person-state' import { TeamManager } from '../worker/ingestion/team-manager' import Piscina, { makePiscina as defaultMakePiscina } from '../worker/piscina' import { GraphileWorker } from './graphile-worker/graphile-worker' @@ -437,7 +441,13 @@ export async function startPluginsServer( const postgres = hub?.postgres ?? new PostgresRouter(serverConfig) const kafkaProducer = hub?.kafkaProducer ?? (await createKafkaProducerWrapper(serverConfig)) - personOverridesPeriodicTask = new DeferredPersonOverrideWorker(postgres, kafkaProducer).runTask(5000) + personOverridesPeriodicTask = new DeferredPersonOverrideWorker( + postgres, + kafkaProducer, + serverConfig.POE_DEFERRED_WRITES_USE_FLAT_OVERRIDES + ? new FlatPersonOverrideWriter(postgres) + : new PersonOverrideWriter(postgres) + ).runTask(5000) personOverridesPeriodicTask.promise.catch(async () => { status.error('⚠️', 'Person override worker task crashed! Requesting shutdown...') await closeJobs() diff --git a/plugin-server/src/types.ts b/plugin-server/src/types.ts index 062735c2525b2..0d15899c84aa2 100644 --- a/plugin-server/src/types.ts +++ b/plugin-server/src/types.ts @@ -200,6 +200,7 @@ export interface PluginsServerConfig { DROP_EVENTS_BY_TOKEN: string POE_EMBRACE_JOIN_FOR_TEAMS: string POE_DEFERRED_WRITES_ENABLED: boolean + POE_DEFERRED_WRITES_USE_FLAT_OVERRIDES: boolean RELOAD_PLUGIN_JITTER_MAX_MS: number RUSTY_HOOK_FOR_TEAMS: string RUSTY_HOOK_URL: string diff --git a/plugin-server/src/worker/ingestion/person-state.ts b/plugin-server/src/worker/ingestion/person-state.ts index df9377a7df425..f49ee1ad334e9 100644 --- a/plugin-server/src/worker/ingestion/person-state.ts +++ b/plugin-server/src/worker/ingestion/person-state.ts @@ -706,13 +706,142 @@ export class PersonOverrideWriter { return id } + + public async getPersonOverrides(teamId: number): Promise { + const { rows } = await this.postgres.query( + PostgresUse.COMMON_WRITE, + SQL` + SELECT + override.team_id, + old_person.uuid as old_person_id, + override_person.uuid as override_person_id, + oldest_event + FROM posthog_personoverride override + LEFT OUTER JOIN posthog_personoverridemapping old_person + ON override.team_id = old_person.team_id AND override.old_person_id = old_person.id + LEFT OUTER JOIN posthog_personoverridemapping override_person + ON override.team_id = override_person.team_id AND override.override_person_id = override_person.id + WHERE override.team_id = ${teamId} + `, + undefined, + 'getPersonOverrides' + ) + return rows.map((row) => ({ + ...row, + oldest_event: DateTime.fromISO(row.oldest_event), + })) + } +} + +export class FlatPersonOverrideWriter { + constructor(private postgres: PostgresRouter) {} + + public async addPersonOverride( + tx: TransactionClient, + overrideDetails: PersonOverrideDetails + ): Promise { + const mergedAt = DateTime.now() + + await this.postgres.query( + tx, + SQL` + INSERT INTO posthog_flatpersonoverride ( + team_id, + old_person_id, + override_person_id, + oldest_event, + version + ) VALUES ( + ${overrideDetails.team_id}, + ${overrideDetails.old_person_id}, + ${overrideDetails.override_person_id}, + ${overrideDetails.oldest_event}, + 0 + ) + `, + undefined, + 'personOverride' + ) + + const { rows: transitiveUpdates } = await this.postgres.query( + tx, + SQL` + UPDATE + posthog_flatpersonoverride + SET + override_person_id = ${overrideDetails.override_person_id}, + version = COALESCE(version, 0)::numeric + 1 + WHERE + team_id = ${overrideDetails.team_id} AND override_person_id = ${overrideDetails.old_person_id} + RETURNING + old_person_id, + version, + oldest_event + `, + undefined, + 'transitivePersonOverrides' + ) + + status.debug('🔁', 'person_overrides_updated', { transitiveUpdates }) + + const personOverrideMessages: ProducerRecord[] = [ + { + topic: KAFKA_PERSON_OVERRIDE, + messages: [ + { + value: JSON.stringify({ + team_id: overrideDetails.team_id, + old_person_id: overrideDetails.old_person_id, + override_person_id: overrideDetails.override_person_id, + oldest_event: castTimestampOrNow(overrideDetails.oldest_event, TimestampFormat.ClickHouse), + merged_at: castTimestampOrNow(mergedAt, TimestampFormat.ClickHouse), + version: 0, + }), + }, + ...transitiveUpdates.map(({ old_person_id, version, oldest_event }) => ({ + value: JSON.stringify({ + team_id: overrideDetails.team_id, + old_person_id: old_person_id, + override_person_id: overrideDetails.override_person_id, + oldest_event: castTimestampOrNow(oldest_event, TimestampFormat.ClickHouse), + merged_at: castTimestampOrNow(mergedAt, TimestampFormat.ClickHouse), + version: version, + }), + })), + ], + }, + ] + + return personOverrideMessages + } + + public async getPersonOverrides(teamId: number): Promise { + const { rows } = await this.postgres.query( + PostgresUse.COMMON_WRITE, + SQL` + SELECT + team_id, + old_person_id, + override_person_id, + oldest_event + FROM posthog_flatpersonoverride + WHERE team_id = ${teamId} + `, + undefined, + 'getPersonOverrides' + ) + return rows.map((row) => ({ + ...row, + team_id: parseInt(row.team_id), // XXX: pg returns bigint as str (reasonably so) + oldest_event: DateTime.fromISO(row.oldest_event), + })) + } } const deferredPersonOverridesWrittenCounter = new Counter({ name: 'deferred_person_overrides_written', help: 'Number of person overrides that have been written as pending', }) - export class DeferredPersonOverrideWriter { constructor(private postgres: PostgresRouter) {} @@ -759,11 +888,11 @@ export class DeferredPersonOverrideWorker { // it just needs to be consistent across all processes. public readonly lockId = 567 - private writer: PersonOverrideWriter - - constructor(private postgres: PostgresRouter, private kafkaProducer: KafkaProducerWrapper) { - this.writer = new PersonOverrideWriter(this.postgres) - } + constructor( + private postgres: PostgresRouter, + private kafkaProducer: KafkaProducerWrapper, + private writer: PersonOverrideWriter | FlatPersonOverrideWriter + ) {} /** * Process all (or up to the given limit) pending overrides. diff --git a/plugin-server/tests/worker/ingestion/person-state.test.ts b/plugin-server/tests/worker/ingestion/person-state.test.ts index 492beab70ca0d..318c3504d3e98 100644 --- a/plugin-server/tests/worker/ingestion/person-state.test.ts +++ b/plugin-server/tests/worker/ingestion/person-state.test.ts @@ -11,6 +11,7 @@ import { UUIDT } from '../../../src/utils/utils' import { DeferredPersonOverrideWorker, DeferredPersonOverrideWriter, + FlatPersonOverrideWriter, PersonOverrideWriter, PersonState, } from '../../../src/worker/ingestion/person-state' @@ -24,57 +25,64 @@ const timestamp = DateTime.fromISO('2020-01-01T12:00:05.200Z').toUTC() const timestamp2 = DateTime.fromISO('2020-02-02T12:00:05.200Z').toUTC() const timestampch = '2020-01-01 12:00:05.000' -async function fetchPostgresPersonIdOverrides(hub: Hub, teamId: number): Promise<[string, string][]> { - const result = await hub.db.postgres.query( - PostgresUse.COMMON_WRITE, - ` - WITH overrides AS ( - SELECT id, old_person_id, override_person_id - FROM posthog_personoverride - WHERE team_id = ${teamId} - ORDER BY id - ) - SELECT - mapping.uuid AS old_person_id, - overrides_mapping.uuid AS override_person_id - FROM - overrides AS first - JOIN - posthog_personoverridemapping AS mapping ON first.old_person_id = mapping.id - JOIN ( - SELECT - second.id AS id, - uuid - FROM - overrides AS second - JOIN posthog_personoverridemapping AS mapping ON second.override_person_id = mapping.id - ) AS overrides_mapping ON overrides_mapping.id = first.id - `, - undefined, - 'fetchPersonIdOverrides' - ) - return result.rows.map(({ old_person_id, override_person_id }) => [old_person_id, override_person_id]).sort() as [ - string, - string - ][] -} - interface PersonOverridesMode { + supportsSyncTransaction: boolean getWriter(hub: Hub): PersonOverrideWriter | DeferredPersonOverrideWriter - fetchPostgresPersonIdOverrides(hub: Hub, teamId: number): Promise<[string, string][]> + fetchPostgresPersonIdOverrides( + hub: Hub, + teamId: number + ): Promise> } const PersonOverridesModes: Record = { disabled: undefined, - immediate: { + 'immediate, with mappings': { + supportsSyncTransaction: true, getWriter: (hub) => new PersonOverrideWriter(hub.db.postgres), - fetchPostgresPersonIdOverrides: (hub, teamId) => fetchPostgresPersonIdOverrides(hub, teamId), + fetchPostgresPersonIdOverrides: async (hub, teamId) => { + const writer = new PersonOverrideWriter(hub.db.postgres) // XXX: ideally would reference ``this``, not new instance + return new Set( + (await writer.getPersonOverrides(teamId)).map(({ old_person_id, override_person_id }) => ({ + old_person_id, + override_person_id, + })) + ) + }, + }, + 'deferred, with mappings': { + supportsSyncTransaction: false, + getWriter: (hub) => new DeferredPersonOverrideWriter(hub.db.postgres), + fetchPostgresPersonIdOverrides: async (hub, teamId) => { + const syncWriter = new PersonOverrideWriter(hub.db.postgres) + await new DeferredPersonOverrideWorker( + hub.db.postgres, + hub.db.kafkaProducer, + syncWriter + ).processPendingOverrides() + return new Set( + (await syncWriter.getPersonOverrides(teamId)).map(({ old_person_id, override_person_id }) => ({ + old_person_id, + override_person_id, + })) + ) + }, }, - deferred: { + 'deferred, without mappings (flat)': { + supportsSyncTransaction: false, getWriter: (hub) => new DeferredPersonOverrideWriter(hub.db.postgres), fetchPostgresPersonIdOverrides: async (hub, teamId) => { - await new DeferredPersonOverrideWorker(hub.db.postgres, hub.db.kafkaProducer).processPendingOverrides() - return await fetchPostgresPersonIdOverrides(hub, teamId) + const syncWriter = new FlatPersonOverrideWriter(hub.db.postgres) + await new DeferredPersonOverrideWorker( + hub.db.postgres, + hub.db.kafkaProducer, + syncWriter + ).processPendingOverrides() + return new Set( + (await syncWriter.getPersonOverrides(teamId)).map(({ old_person_id, override_person_id }) => ({ + old_person_id, + override_person_id, + })) + ) }, }, } @@ -1571,7 +1579,7 @@ describe('PersonState.update()', () => { // verify Postgres person_id overrides, if applicable if (overridesMode) { const overrides = await overridesMode.fetchPostgresPersonIdOverrides(hub, teamId) - expect(overrides).toEqual([[second.uuid, first.uuid]]) + expect(overrides).toEqual(new Set([{ old_person_id: second.uuid, override_person_id: first.uuid }])) // & CH person overrides // TODO } @@ -1733,8 +1741,8 @@ describe('PersonState.update()', () => { }) it(`does not commit partial transactions on override conflicts`, async () => { - if (overridesMode !== PersonOverridesModes.immediate) { - return // this behavior is only supported with immediate overrides + if (!overridesMode?.supportsSyncTransaction) { + return } const first: Person = await hub.db.createPerson( timestamp, @@ -1822,7 +1830,7 @@ describe('PersonState.update()', () => { // verify Postgres person_id overrides const overridesAfterFailure = await overridesMode!.fetchPostgresPersonIdOverrides(hub, teamId) - expect(overridesAfterFailure).toEqual([]) + expect(overridesAfterFailure).toEqual(new Set()) // Now verify we successfully get to our target state if we do not have // any db errors. @@ -1857,7 +1865,7 @@ describe('PersonState.update()', () => { // verify Postgres person_id overrides const overrides = await overridesMode!.fetchPostgresPersonIdOverrides(hub, teamId) - expect(overrides).toEqual([[second.uuid, first.uuid]]) + expect(overrides).toEqual(new Set([{ old_person_id: second.uuid, override_person_id: first.uuid }])) }) it(`handles a chain of overrides being applied concurrently`, async () => { @@ -1998,10 +2006,12 @@ describe('PersonState.update()', () => { // verify Postgres person_id overrides, if applicable if (overridesMode) { const overrides = await overridesMode.fetchPostgresPersonIdOverrides(hub, teamId) - expect(overrides).toEqual([ - [second.uuid, first.uuid], - [third.uuid, first.uuid], - ]) + expect(overrides).toEqual( + new Set([ + { old_person_id: second.uuid, override_person_id: first.uuid }, + { old_person_id: third.uuid, override_person_id: first.uuid }, + ]) + ) } }) @@ -2085,16 +2095,103 @@ describe('PersonState.update()', () => { // verify Postgres person_id overrides, if applicable if (overridesMode) { const overrides = await overridesMode.fetchPostgresPersonIdOverrides(hub, teamId) - expect(overrides).toEqual([ - [second.uuid, first.uuid], - [third.uuid, first.uuid], - ]) + expect(overrides).toEqual( + new Set([ + { old_person_id: second.uuid, override_person_id: first.uuid }, + { old_person_id: third.uuid, override_person_id: first.uuid }, + ]) + ) } }) }) }) }) +const PersonOverridesWriterMode = { + mapping: (hub: Hub) => new PersonOverrideWriter(hub.db.postgres), + flat: (hub: Hub) => new FlatPersonOverrideWriter(hub.db.postgres), +} + +describe.each(Object.keys(PersonOverridesWriterMode))('person overrides writer: %s', (mode) => { + let hub: Hub + let closeHub: () => Promise + + let organizationId: string + let teamId: number + let writer: PersonOverrideWriter | FlatPersonOverrideWriter + + beforeAll(async () => { + ;[hub, closeHub] = await createHub({}) + organizationId = await createOrganization(hub.db.postgres) + writer = PersonOverridesWriterMode[mode](hub) + }) + + beforeEach(async () => { + teamId = await createTeam(hub.db.postgres, organizationId) + }) + + afterAll(async () => { + await closeHub() + }) + + it('handles direct overrides', async () => { + const { postgres } = hub.db + + const defaults = { + team_id: teamId, + oldest_event: DateTime.fromMillis(0), + } + + const override = { + old_person_id: new UUIDT().toString(), + override_person_id: new UUIDT().toString(), + } + + await postgres.transaction(PostgresUse.COMMON_WRITE, '', async (tx) => { + await writer.addPersonOverride(tx, { ...defaults, ...override }) + }) + + expect(await writer.getPersonOverrides(teamId)).toEqual([{ ...defaults, ...override }]) + }) + + it('handles transitive overrides', async () => { + const { postgres } = hub.db + + const defaults = { + team_id: teamId, + oldest_event: DateTime.fromMillis(0), + } + + const overrides = [ + { + old_person_id: new UUIDT().toString(), + override_person_id: new UUIDT().toString(), + }, + ] + + overrides.push({ + old_person_id: overrides[0].override_person_id, + override_person_id: new UUIDT().toString(), + }) + + await postgres.transaction(PostgresUse.COMMON_WRITE, '', async (tx) => { + for (const override of overrides) { + await writer.addPersonOverride(tx, { ...defaults, ...override }) + } + }) + + expect(new Set(await writer.getPersonOverrides(teamId))).toEqual( + new Set( + overrides.map(({ old_person_id }) => ({ + old_person_id, + override_person_id: overrides.at(-1)!.override_person_id, + ...defaults, + })) + ) + ) + }) +}) + describe('deferred person overrides', () => { let hub: Hub let closeHub: () => Promise @@ -2104,13 +2201,15 @@ describe('deferred person overrides', () => { let teamId: number let writer: DeferredPersonOverrideWriter + let syncWriter: PersonOverrideWriter let worker: DeferredPersonOverrideWorker beforeAll(async () => { ;[hub, closeHub] = await createHub({}) organizationId = await createOrganization(hub.db.postgres) writer = new DeferredPersonOverrideWriter(hub.db.postgres) - worker = new DeferredPersonOverrideWorker(hub.db.postgres, hub.db.kafkaProducer) + syncWriter = new PersonOverrideWriter(hub.db.postgres) + worker = new DeferredPersonOverrideWorker(hub.db.postgres, hub.db.kafkaProducer, syncWriter) }) beforeEach(async () => { @@ -2161,9 +2260,12 @@ describe('deferred person overrides', () => { expect(await getPendingPersonOverrides()).toMatchObject([]) - expect(await fetchPostgresPersonIdOverrides(hub, teamId)).toEqual([ - [override.old_person_id, override.override_person_id], - ]) + expect( + (await syncWriter.getPersonOverrides(teamId)).map(({ old_person_id, override_person_id }) => [ + old_person_id, + override_person_id, + ]) + ).toEqual([[override.old_person_id, override.override_person_id]]) const clickhouseOverrides = await waitForExpect(async () => { const { data } = await hub.db.clickhouse.querying( diff --git a/posthog/migrations/0377_flatpersonoverride.py b/posthog/migrations/0377_flatpersonoverride.py new file mode 100644 index 0000000000000..3ba865394fca4 --- /dev/null +++ b/posthog/migrations/0377_flatpersonoverride.py @@ -0,0 +1,43 @@ +# Generated by Django 3.2.19 on 2023-12-07 00:38 + +from django.db import migrations, models +import django.db.models.expressions + + +class Migration(migrations.Migration): + dependencies = [ + ("posthog", "0376_externaldataschema_last_synced_at"), + ] + + operations = [ + migrations.CreateModel( + name="FlatPersonOverride", + fields=[ + ("id", models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID")), + ("team_id", models.BigIntegerField()), + ("old_person_id", models.UUIDField()), + ("override_person_id", models.UUIDField()), + ("oldest_event", models.DateTimeField()), + ("version", models.BigIntegerField(blank=True, null=True)), + ], + ), + migrations.AddIndex( + model_name="flatpersonoverride", + index=models.Index(fields=["team_id", "override_person_id"], name="posthog_fla_team_id_224253_idx"), + ), + migrations.AddConstraint( + model_name="flatpersonoverride", + constraint=models.UniqueConstraint( + fields=("team_id", "old_person_id"), name="flatpersonoverride_unique_old_person_by_team" + ), + ), + migrations.AddConstraint( + model_name="flatpersonoverride", + constraint=models.CheckConstraint( + check=models.Q( + ("old_person_id__exact", django.db.models.expressions.F("override_person_id")), _negated=True + ), + name="flatpersonoverride_check_circular_reference", + ), + ), + ] diff --git a/posthog/models/person/person.py b/posthog/models/person/person.py index 9ef4a58e18cba..902742219ab0a 100644 --- a/posthog/models/person/person.py +++ b/posthog/models/person/person.py @@ -134,14 +134,6 @@ class Meta: uuid = models.UUIDField() -class PendingPersonOverride(models.Model): - id = models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID") - team_id = models.BigIntegerField() - old_person_id = models.UUIDField() - override_person_id = models.UUIDField() - oldest_event = models.DateTimeField() - - class PersonOverride(models.Model): """A model of persons to be overriden in merge or merge-like events. @@ -187,6 +179,101 @@ class Meta: version: models.BigIntegerField = models.BigIntegerField(null=True, blank=True) +class PendingPersonOverride(models.Model): + """ + The pending person overrides model/table contains records of merges that + have occurred, but have not yet been integrated into the person overrides + table. + + This table should generally be considered as a log table or queue. When a + merge occurs, it is recorded to the log (added to the queue) as part of the + merge transaction. Later, another process comes along, reading from the + other end of the log (popping from the queue) and applying the necessary + updates to the person overrides table as part of secondary transaction. + + This approach allows us to decouple the set of operations that must occur as + part of an atomic transactional unit during person merging (moving distinct + IDs, merging properties, deleting the subsumed person, etc.) from those that + are more tolerant to eventual consistency (updating person overrides in + Postgres and subsequently relaying those updates to ClickHouse in various + forms to update the person associated with an event.) This decoupling helps + us to minimize the overhead of the primary merge transaction by reducing the + degree of contention within the ingestion pipeline caused by long-running + transactions. This decoupling also allows us to serialize the execution of + all updates to the person overrides table through a single writer, which + allows us to safely update the person overrides table while handling tricky + cases like applying transitive updates without the need for expensive table + constraints to ensure their validity. + """ + + id = models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID") + team_id = models.BigIntegerField() + old_person_id = models.UUIDField() + override_person_id = models.UUIDField() + oldest_event = models.DateTimeField() + + +class FlatPersonOverride(models.Model): + """ + The (flat) person overrides model/table contains a consolidated record of + all merges that have occurred, but have not yet been integrated into the + ClickHouse events table through a squash operation. Once the effects of a + merge have been integrated into the events table, the associated override + record can be deleted from this table. + + This table is in some sense a materialized view over the pending person + overrides table (i.e. the merge log.) It differs from that base table in + that it should be maintained during updates to account for the effects of + transitive merges. For example, if person A is merged into person B, and + then person B is merged into person C, we'd expect the first record (A->B) + to be updated to reflect that person A has been merged into person C (A->C, + eliding the intermediate step.) + + There are several important expectations about the nature of the data within + this table: + + * A person should only appear as an "old" person at most once for a given + team (as appearing more than once would imply they were merged into + multiple people.) + * A person cannot be merged into themselves (i.e. be both the "old" and + "override" person within a given row.) + * A person should only appear in a table as _either_ an "old" person or + "override" person for a given team -- but never both, as this would + indicate a failure to account for a transitive merge. + + The first two of these expectations can be enforced as constraints, but + unfortunately we've found the third to be too costly to enforce in practice. + Instead, we try to ensure that this invariant holds by serializing all + writes to this table through the ``PendingPersonOverride`` model above. + + The "flat" in the table name is used to distinguish this table from a prior + approach that required multiple tables to maintain the same state but + otherwise has little significance of its own. + """ + + id = models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID") + team_id = models.BigIntegerField() + old_person_id = models.UUIDField() + override_person_id = models.UUIDField() + oldest_event = models.DateTimeField() + version = models.BigIntegerField(null=True, blank=True) + + class Meta: + indexes = [ + models.Index(fields=["team_id", "override_person_id"]), + ] + constraints = [ + models.UniqueConstraint( + fields=["team_id", "old_person_id"], + name="flatpersonoverride_unique_old_person_by_team", + ), + models.CheckConstraint( + check=~Q(old_person_id__exact=F("override_person_id")), + name="flatpersonoverride_check_circular_reference", + ), + ] + + def get_distinct_ids_for_subquery(person: Person | None, team: Team) -> List[str]: """_summary_ Fetching distinct_ids for a person from CH is slow, so we