diff --git a/latest_migrations.manifest b/latest_migrations.manifest index 4e4b0e6c63d9d..c92bcbf594b88 100644 --- a/latest_migrations.manifest +++ b/latest_migrations.manifest @@ -5,7 +5,7 @@ contenttypes: 0002_remove_content_type_name ee: 0016_rolemembership_organization_member otp_static: 0002_throttling otp_totp: 0002_auto_20190420_0723 -posthog: 0431_externaldataschema_sync_type_payload +posthog: 0432_personlessdistinctid sessions: 0001_initial social_django: 0010_uid_db_index two_factor: 0007_auto_20201201_1019 diff --git a/plugin-server/src/utils/db/db.ts b/plugin-server/src/utils/db/db.ts index de57b602725cc..4883c4fa97eb8 100644 --- a/plugin-server/src/utils/db/db.ts +++ b/plugin-server/src/utils/db/db.ts @@ -641,11 +641,17 @@ export class DB { isUserId: number | null, isIdentified: boolean, uuid: string, - distinctIds?: string[], - version = 0 + distinctIds?: { distinctId: string; version?: number }[] ): Promise { distinctIds ||= [] + for (const distinctId of distinctIds) { + distinctId.version ||= 0 + } + + // The Person is being created, and so we can hardcode version 0! + const personVersion = 0 + const { rows } = await this.postgres.query( PostgresUse.COMMON_WRITE, `WITH inserted_person AS ( @@ -662,7 +668,12 @@ export class DB { // `addDistinctIdPooled` (_, index) => `, distinct_id_${index} AS ( INSERT INTO posthog_persondistinctid (distinct_id, person_id, team_id, version) - VALUES ($${10 + index}, (SELECT id FROM inserted_person), $5, $9))` + VALUES ( + $${11 + index + distinctIds!.length - 1}, + (SELECT id FROM inserted_person), + $5, + $${10 + index}) + )` ) .join('') + `SELECT * FROM inserted_person;`, @@ -675,14 +686,21 @@ export class DB { isUserId, isIdentified, uuid, - version, + personVersion, // The copy and reverse here is to maintain compatability with pre-existing code // and tests. Postgres appears to assign IDs in reverse order of the INSERTs in the // CTEs above, so we need to reverse the distinctIds to match the old behavior where // we would do a round trip for each INSERT. We shouldn't actually depend on the // `id` column of distinct_ids, so this is just a simple way to keeps tests exactly // the same and prove behavior is the same as before. - ...distinctIds.slice().reverse(), + ...distinctIds + .slice() + .reverse() + .map(({ version }) => version), + ...distinctIds + .slice() + .reverse() + .map(({ distinctId }) => distinctId), ], 'insertPerson' ) @@ -698,8 +716,8 @@ export class DB { value: JSON.stringify({ person_id: person.uuid, team_id: teamId, - distinct_id: distinctId, - version, + distinct_id: distinctId.distinctId, + version: distinctId.version, is_deleted: 0, }), }, @@ -830,8 +848,50 @@ export class DB { return personDistinctIds.map((pdi) => pdi.distinct_id) } - public async addDistinctId(person: InternalPerson, distinctId: string, version: number): Promise { - const kafkaMessages = await this.addDistinctIdPooled(person, distinctId, version) + public async addPersonlessDistinctId(teamId: number, distinctId: string): Promise { + const result = await this.postgres.query( + PostgresUse.COMMON_WRITE, + ` + INSERT INTO posthog_personlessdistinctid (team_id, distinct_id, is_merged, created_at) + VALUES ($1, $2, false, now()) + ON CONFLICT (team_id, distinct_id) DO NOTHING + RETURNING is_merged + `, + [teamId, distinctId], + 'addPersonlessDistinctId' + ) + + return result.rows[0]['is_merged'] + } + + public async addPersonlessDistinctIdForMerge( + teamId: number, + distinctId: string, + tx?: TransactionClient + ): Promise { + const result = await this.postgres.query( + tx ?? PostgresUse.COMMON_WRITE, + ` + INSERT INTO posthog_personlessdistinctid (team_id, distinct_id, is_merged, created_at) + VALUES ($1, $2, true, now()) + ON CONFLICT (team_id, distinct_id) DO UPDATE + SET is_merged = true + RETURNING (xmax = 0) AS inserted + `, + [teamId, distinctId], + 'addPersonlessDistinctIdForMerge' + ) + + return result.rows[0].inserted + } + + public async addDistinctId( + person: InternalPerson, + distinctId: string, + version: number, + tx?: TransactionClient + ): Promise { + const kafkaMessages = await this.addDistinctIdPooled(person, distinctId, version, tx) if (kafkaMessages.length) { await this.kafkaProducer.queueMessages({ kafkaMessages, waitForAck: true }) } diff --git a/plugin-server/src/worker/ingestion/person-state.ts b/plugin-server/src/worker/ingestion/person-state.ts index b0fd16fbde625..89472de4e345e 100644 --- a/plugin-server/src/worker/ingestion/person-state.ts +++ b/plugin-server/src/worker/ingestion/person-state.ts @@ -1,10 +1,12 @@ import { PluginEvent, Properties } from '@posthog/plugin-scaffold' import * as Sentry from '@sentry/node' import { ProducerRecord } from 'kafkajs' +import LRU from 'lru-cache' import { DateTime } from 'luxon' import { Counter } from 'prom-client' import { KafkaProducerWrapper } from 'utils/db/kafka-producer-wrapper' +import { ONE_HOUR } from '../../config/constants' import { KAFKA_PERSON_OVERRIDE } from '../../config/kafka-topics' import { InternalPerson, Person, PropertyUpdateOperation, TimestampFormat } from '../../types' import { DB } from '../../utils/db/db' @@ -57,6 +59,16 @@ const BARE_CASE_INSENSITIVE_ILLEGAL_IDS = [ 'false', ] +// Tracks whether we know we've already inserted a `posthog_personlessdistinctid` for the given +// (team_id, distinct_id) pair. If we have, then we can skip the INSERT attempt. +// TODO: Move this out of module scope, we don't currently have a clean place (outside of the Hub) +// to stash longer lived objects like caches. For now it's not important. +const PERSONLESS_DISTINCT_ID_INSERTED_CACHE = new LRU({ + max: 10_000, + maxAge: ONE_HOUR * 24, // cache up to 24h + updateAgeOnGet: true, +}) + const BARE_CASE_SENSITIVE_ILLEGAL_IDS = ['[object Object]', 'NaN', 'None', 'none', 'null', '0', 'undefined'] const PERSON_EVENTS = new Set(['$identify', '$create_alias', '$merge_dangerously', '$set']) @@ -110,7 +122,34 @@ export class PersonState { async update(): Promise<[Person, Promise]> { if (!this.processPerson) { - const existingPerson = await this.db.fetchPerson(this.teamId, this.distinctId, { useReadReplica: true }) + let existingPerson = await this.db.fetchPerson(this.teamId, this.distinctId, { useReadReplica: true }) + + if (!existingPerson) { + // See the comment in `mergeDistinctIds`. We are inserting a row into `posthog_personlessdistinctid` + // to note that this Distinct ID has been used in "personless" mode. This is necessary + // so that later, during a merge, we can decide whether we need to write out an override + // or not. + + const personlessDistinctIdCacheKey = `${this.teamId}|${this.distinctId}` + if (!PERSONLESS_DISTINCT_ID_INSERTED_CACHE.get(personlessDistinctIdCacheKey)) { + const personIsMerged = await this.db.addPersonlessDistinctId(this.teamId, this.distinctId) + + // We know the row is in PG now, and so future events for this Distinct ID can + // skip the PG I/O. + PERSONLESS_DISTINCT_ID_INSERTED_CACHE.set(personlessDistinctIdCacheKey, true) + + if (personIsMerged) { + // If `personIsMerged` comes back `true`, it means the `posthog_personlessdistinctid` + // has been updated by a merge (either since we called `fetchPerson` above, plus + // replication lag). We need to check `fetchPerson` again (this time using the leader) + // so that we properly associate this event with the Person we got merged into. + existingPerson = await this.db.fetchPerson(this.teamId, this.distinctId, { + useReadReplica: false, + }) + } + } + } + if (existingPerson) { const person = existingPerson as Person @@ -204,7 +243,7 @@ export class PersonState { // :NOTE: This should never be set in this branch, but adding this for logical consistency this.updateIsIdentified, this.event.uuid, - [this.distinctId] + [{ distinctId: this.distinctId }] ) return [person, true] } @@ -217,13 +256,12 @@ export class PersonState { isUserId: number | null, isIdentified: boolean, creatorEventUuid: string, - distinctIds: string[], - version = 0 + distinctIds: { distinctId: string; version?: number }[] ): Promise { if (distinctIds.length < 1) { throw new Error('at least 1 distinctId is required in `createPerson`') } - const uuid = uuidFromDistinctId(teamId, distinctIds[0]) + const uuid = uuidFromDistinctId(teamId, distinctIds[0].distinctId) const props = { ...propertiesOnce, ...properties, ...{ $creator_event_uuid: creatorEventUuid } } const propertiesLastOperation: Record = {} @@ -246,8 +284,7 @@ export class PersonState { isUserId, isIdentified, uuid, - distinctIds, - version + distinctIds ) } @@ -450,57 +487,143 @@ export class PersonState { const otherPerson = await this.db.fetchPerson(teamId, otherPersonDistinctId) const mergeIntoPerson = await this.db.fetchPerson(teamId, mergeIntoDistinctId) + // A note about the `distinctIdVersion` logic you'll find below: + // // Historically, we always INSERT-ed new `posthog_persondistinctid` rows with `version=0`. // Overrides are only created when the version is > 0, see: // https://github.com/PostHog/posthog/blob/92e17ce307a577c4233d4ab252eebc6c2207a5ee/posthog/models/person/sql.py#L269-L287 // - // With the addition of optional person processing, we are no longer creating + // With the addition of optional person profile processing, we are no longer creating // `posthog_persondistinctid` and `posthog_person` rows when $process_person_profile=false. - // This means that: - // 1. At merge time, it's possible this `distinct_id` and its deterministically generated - // `person.uuid` has already been used for events in ClickHouse, but they have no - // corresponding rows in the `posthog_persondistinctid` or `posthog_person` tables - // 2. We need to assume the `distinct_id`/`person.uuid` have been used before (by - // `$process_person_profile=false` events) and create an override row for this - // `distinct_id` even though we're just now INSERT-ing it into Postgres/ClickHouse. We do - // this by starting with `version=1`, as if we had just deleted the old user and were - // updating the `distinct_id` row as part of the merge - const addDistinctIdVersion = 1 - - if (otherPerson && !mergeIntoPerson) { - await this.db.addDistinctId(otherPerson, mergeIntoDistinctId, addDistinctIdVersion) - return [otherPerson, Promise.resolve()] - } else if (!otherPerson && mergeIntoPerson) { - await this.db.addDistinctId(mergeIntoPerson, otherPersonDistinctId, addDistinctIdVersion) - return [mergeIntoPerson, Promise.resolve()] + // This means that at merge time, it's possible this `distinct_id` and its deterministically + // generated `person.uuid` has already been used for events in ClickHouse, but they have no + // corresponding rows in the `posthog_persondistinctid` or `posthog_person` tables. + // + // For this reason, $process_person_profile=false write to the `posthog_personlessdistinctid` + // table just to note that a given Distinct ID was used for "personless" mode. Then, during + // our merges transactions below, we do two things: + // 1. We check whether a row exists in `posthog_personlessdistinctid` for that Distinct ID, + // if so, we need to write out `posthog_persondistinctid` rows with `version=1` so that + // an override is created in ClickHouse which will associate the old "personless" events + // with the Person UUID they were merged into. + // 2. We insert and/or update the `posthog_personlessdistinctid` ourselves, to mark that + // the Distinct ID has been merged. This is important so that an event being processed + // concurrently for that Distinct ID doesn't emit an event and _miss_ that a different + // Person UUID needs to be used now. (See the `processPerson` code in `update` for more.) + + if ((otherPerson && !mergeIntoPerson) || (!otherPerson && mergeIntoPerson)) { + // Only one of the two Distinct IDs points at an existing Person + + const [existingPerson, distinctIdToAdd] = (() => { + if (otherPerson) { + return [otherPerson!, mergeIntoDistinctId] + } else { + return [mergeIntoPerson!, otherPersonDistinctId] + } + })() + + return await this.db.postgres.transaction( + PostgresUse.COMMON_WRITE, + 'mergeDistinctIds-OneExists', + async (tx) => { + // See comment above about `distinctIdVersion` + const _insertedDistinctId = await this.db.addPersonlessDistinctIdForMerge( + this.teamId, + distinctIdToAdd, + tx + ) + const distinctIdVersion = 1 // TODO: Once `posthog_personlessdistinctid` is backfilled: insertedDistinctId ? 0 : 1 + + await this.db.addDistinctId(existingPerson, distinctIdToAdd, distinctIdVersion, tx) + return [existingPerson, Promise.resolve()] + } + ) } else if (otherPerson && mergeIntoPerson) { + // Both Distinct IDs point at an existing Person + if (otherPerson.id == mergeIntoPerson.id) { + // Nothing to do, they are the same Person return [mergeIntoPerson, Promise.resolve()] } + return await this.mergePeople({ mergeInto: mergeIntoPerson, mergeIntoDistinctId: mergeIntoDistinctId, otherPerson: otherPerson, otherPersonDistinctId: otherPersonDistinctId, }) - } + } else { + // Neither Distinct ID points at an existing Person + + let distinctId1 = mergeIntoDistinctId + let distinctId2 = otherPersonDistinctId + + return await this.db.postgres.transaction( + PostgresUse.COMMON_WRITE, + 'mergeDistinctIds-NeitherExist', + async (tx) => { + // See comment above about `distinctIdVersion` + const insertedDistinctId1 = await this.db.addPersonlessDistinctIdForMerge( + this.teamId, + distinctId1, + tx + ) - // The last case: (!oldPerson && !newPerson) - return [ - await this.createPerson( - // TODO: in this case we could skip the properties updates later - timestamp, - this.eventProperties['$set'] || {}, - this.eventProperties['$set_once'] || {}, - teamId, - null, - true, - this.event.uuid, - [mergeIntoDistinctId, otherPersonDistinctId], - addDistinctIdVersion - ), - Promise.resolve(), - ] + // See comment above about `distinctIdVersion` + const insertedDistinctId2 = await this.db.addPersonlessDistinctIdForMerge( + this.teamId, + distinctId2, + tx + ) + + // `createPerson` uses the first Distinct ID provided to generate the Person + // UUID. That means the first Distinct ID definitely doesn't need an override, + // and can always use version 0. Below, we exhaust all of the options to decide + // whether we can optimize away an override by doing a swap, or whether we + // need to actually write an override. (But mostly we're being verbose for + // documentation purposes) + let distinctId2Version = 1 // TODO: Once `posthog_personlessdistinctid` is backfilled, this should be = 0 + if (insertedDistinctId1 && insertedDistinctId2) { + // We were the first to insert both (neither was used for Personless), so we + // can use either as the primary Person UUID and create no overrides. + } else if (insertedDistinctId1 && !insertedDistinctId2) { + // We created 1, but 2 was already used for Personless. Let's swap so + // that 2 can be the primary Person UUID and no override is needed. + ;[distinctId1, distinctId2] = [distinctId2, distinctId1] + } else if (!insertedDistinctId1 && insertedDistinctId2) { + // We created 2, but 1 was already used for Personless, so we want to + // use 1 as the primary Person UUID so that no override is needed. + } else if (!insertedDistinctId1 && !insertedDistinctId2) { + // Both were used in Personless mode, so there is no more-correct choice of + // primary Person UUID to make here, and we need to drop an override by + // using version = 1 for Distinct ID 2. + distinctId2Version = 1 + } + + // The first Distinct ID is used to create the new Person's UUID, and so it + // never needs an override. + const distinctId1Version = 0 + + return [ + await this.createPerson( + // TODO: in this case we could skip the properties updates later + timestamp, + this.eventProperties['$set'] || {}, + this.eventProperties['$set_once'] || {}, + teamId, + null, + true, + this.event.uuid, + [ + { distinctId: distinctId1, version: distinctId1Version }, + { distinctId: distinctId2, version: distinctId2Version }, + ] + ), + Promise.resolve(), + ] + } + ) + } } public async mergePeople({ diff --git a/plugin-server/tests/main/db.test.ts b/plugin-server/tests/main/db.test.ts index 8d419b0b9fdb1..670685f445ddc 100644 --- a/plugin-server/tests/main/db.test.ts +++ b/plugin-server/tests/main/db.test.ts @@ -294,7 +294,7 @@ describe('DB', () => { }) test('without properties', async () => { - const person = await db.createPerson(TIMESTAMP, {}, {}, {}, team.id, null, false, uuid, [distinctId]) + const person = await db.createPerson(TIMESTAMP, {}, {}, {}, team.id, null, false, uuid, [{ distinctId }]) const fetched_person = await fetchPersonByPersonId(team.id, person.id) expect(fetched_person!.is_identified).toEqual(false) @@ -306,7 +306,7 @@ describe('DB', () => { }) test('without properties indentified true', async () => { - const person = await db.createPerson(TIMESTAMP, {}, {}, {}, team.id, null, true, uuid, [distinctId]) + const person = await db.createPerson(TIMESTAMP, {}, {}, {}, team.id, null, true, uuid, [{ distinctId }]) const fetched_person = await fetchPersonByPersonId(team.id, person.id) expect(fetched_person!.is_identified).toEqual(true) expect(fetched_person!.properties).toEqual({}) @@ -326,7 +326,7 @@ describe('DB', () => { null, false, uuid, - [distinctId] + [{ distinctId }] ) const fetched_person = await fetchPersonByPersonId(team.id, person.id) expect(fetched_person!.is_identified).toEqual(false) @@ -354,7 +354,7 @@ describe('DB', () => { const distinctId = 'distinct_id1' // Note that we update the person badly in case of concurrent updates, but lets make sure we're consistent const personDbBefore = await db.createPerson(TIMESTAMP, { c: 'aaa' }, {}, {}, team.id, null, false, uuid, [ - distinctId, + { distinctId }, ]) const providedPersonTs = DateTime.fromISO('2000-04-04T11:42:06.502Z').toUTC() const personProvided = { ...personDbBefore, properties: { c: 'bbb' }, created_at: providedPersonTs } @@ -486,7 +486,7 @@ describe('DB', () => { const team = await getFirstTeam(hub) const uuid = new UUIDT().toString() const createdPerson = await db.createPerson(TIMESTAMP, { foo: 'bar' }, {}, {}, team.id, null, true, uuid, [ - 'some_id', + { distinctId: 'some_id' }, ]) const person = await db.fetchPerson(team.id, 'some_id') @@ -852,7 +852,7 @@ describe('DB', () => { null, false, new UUIDT().toString(), - ['source_person'] + [{ distinctId: 'source_person' }] ) const targetPerson = await db.createPerson( TIMESTAMP, @@ -863,7 +863,7 @@ describe('DB', () => { null, false, new UUIDT().toString(), - ['target_person'] + [{ distinctId: 'target_person' }] ) sourcePersonID = sourcePerson.id targetPersonID = targetPerson.id diff --git a/plugin-server/tests/main/process-event.test.ts b/plugin-server/tests/main/process-event.test.ts index 72bb5879945a9..fa693a00b9651 100644 --- a/plugin-server/tests/main/process-event.test.ts +++ b/plugin-server/tests/main/process-event.test.ts @@ -50,7 +50,7 @@ export async function createPerson( null, false, new UUIDT().toString(), - distinctIds + distinctIds.map((distinctId) => ({ distinctId })) ) } diff --git a/plugin-server/tests/worker/ingestion/action-matcher.test.ts b/plugin-server/tests/worker/ingestion/action-matcher.test.ts index a66a8d03bb2be..d4a5770355a4b 100644 --- a/plugin-server/tests/worker/ingestion/action-matcher.test.ts +++ b/plugin-server/tests/worker/ingestion/action-matcher.test.ts @@ -741,7 +741,7 @@ describe('ActionMatcher', () => { null, true, new UUIDT().toString(), - ['random'] + [{ distinctId: 'random' }] ) const cohortPerson = await hub.db.createPerson( @@ -753,7 +753,7 @@ describe('ActionMatcher', () => { null, true, new UUIDT().toString(), - ['cohort'] + [{ distinctId: 'cohort' }] ) await hub.db.addPersonToCohort(testCohort.id, cohortPerson.id, testCohort.version) diff --git a/plugin-server/tests/worker/ingestion/event-pipeline/prepareEventStep.test.ts b/plugin-server/tests/worker/ingestion/event-pipeline/prepareEventStep.test.ts index 4c1467653f324..d09a149d44c8b 100644 --- a/plugin-server/tests/worker/ingestion/event-pipeline/prepareEventStep.test.ts +++ b/plugin-server/tests/worker/ingestion/event-pipeline/prepareEventStep.test.ts @@ -59,7 +59,7 @@ describe('prepareEventStep()', () => { // :KLUDGE: We test below whether kafka messages are produced, so make sure the person exists beforehand. await hub.db.createPerson(person.created_at, {}, {}, {}, pluginEvent.team_id, null, false, person.uuid, [ - 'my_id', + { distinctId: 'my_id' }, ]) hub.db.kafkaProducer!.queueMessage = jest.fn() diff --git a/plugin-server/tests/worker/ingestion/person-state.test.ts b/plugin-server/tests/worker/ingestion/person-state.test.ts index bed64d243e6c5..06bd8086ead84 100644 --- a/plugin-server/tests/worker/ingestion/person-state.test.ts +++ b/plugin-server/tests/worker/ingestion/person-state.test.ts @@ -222,39 +222,97 @@ describe('PersonState.update()', () => { expect(distinctIds).toEqual(expect.arrayContaining([])) }) - it('merging creates an override and force_upgrade works', async () => { - await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, false, oldUserUuid, [oldUserDistinctId]) + it('overrides are created only when distinct_id is in posthog_personlessdistinctid', async () => { + // oldUserDistinctId exists, and 'old2' will merge into it, but not create an override + await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, false, oldUserUuid, [ + { distinctId: oldUserDistinctId }, + ]) + + // newUserDistinctId exists, and 'new2' will merge into it, and will create an override + await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, false, newUserUuid, [ + { distinctId: newUserDistinctId }, + ]) + await hub.db.addPersonlessDistinctId(teamId, 'new2') const hubParam = undefined - let processPerson = true + const processPerson = true const [_person, kafkaAcks] = await personState( + { + event: '$identify', + distinct_id: oldUserDistinctId, + properties: { + $anon_distinct_id: 'old2', + }, + }, + hubParam, + processPerson + ).update() + + const [_person2, kafkaAcks2] = await personState( { event: '$identify', distinct_id: newUserDistinctId, properties: { - $anon_distinct_id: oldUserDistinctId, + $anon_distinct_id: 'new2', }, }, hubParam, processPerson ).update() + await hub.db.kafkaProducer.flush() await kafkaAcks + await kafkaAcks2 - await delayUntilEventIngested(() => fetchOverridesForDistinctId(newUserDistinctId)) - const chOverrides = await fetchOverridesForDistinctId(newUserDistinctId) + // new2 has an override, because it was in posthog_personlessdistinctid + await delayUntilEventIngested(() => fetchOverridesForDistinctId('new2')) + const chOverrides = await fetchOverridesForDistinctId('new2') expect(chOverrides.length).toEqual(1) - - // Override created for Person that never existed in the DB expect(chOverrides).toEqual( expect.arrayContaining([ expect.objectContaining({ - distinct_id: newUserDistinctId, + distinct_id: 'new2', + person_id: newUserUuid, + version: 1, + }), + ]) + ) + + // old2 does have an override, because we are temporarily writing out unnecessary + // overrides while we backfill `posthog_personlessdistinctid` + const chOverridesOld = await fetchOverridesForDistinctId('old2') + expect(chOverridesOld.length).toEqual(1) + expect(chOverridesOld).toEqual( + expect.arrayContaining([ + expect.objectContaining({ + distinct_id: 'old2', person_id: oldUserUuid, version: 1, }), ]) ) + }) + + it('force_upgrade works', async () => { + await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, false, oldUserUuid, [ + { distinctId: oldUserDistinctId }, + ]) + + const hubParam = undefined + let processPerson = true + const [_person, kafkaAcks] = await personState( + { + event: '$identify', + distinct_id: newUserDistinctId, + properties: { + $anon_distinct_id: oldUserDistinctId, + }, + }, + hubParam, + processPerson + ).update() + await hub.db.kafkaProducer.flush() + await kafkaAcks // Using the `distinct_id` again with `processPerson=false` results in // `force_upgrade=true` and real Person `uuid` and `created_at` @@ -378,7 +436,9 @@ describe('PersonState.update()', () => { }) it('handles person being created in a race condition', async () => { - await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, false, newUserUuid, [newUserDistinctId]) + await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, false, newUserUuid, [ + { distinctId: newUserDistinctId }, + ]) jest.spyOn(hub.db, 'fetchPerson').mockImplementationOnce(() => { return Promise.resolve(undefined) @@ -415,7 +475,7 @@ describe('PersonState.update()', () => { it('handles person being created in a race condition updates properties if needed', async () => { await hub.db.createPerson(timestamp, { b: 3, c: 4 }, {}, {}, teamId, null, false, newUserUuid, [ - newUserDistinctId, + { distinctId: newUserDistinctId }, ]) jest.spyOn(hub.db, 'fetchPerson').mockImplementationOnce(() => { @@ -503,7 +563,7 @@ describe('PersonState.update()', () => { null, false, newUserUuid, - [newUserDistinctId] + [{ distinctId: newUserDistinctId }] ) const [person, kafkaAcks] = await personState({ @@ -539,7 +599,7 @@ describe('PersonState.update()', () => { it('updates person properties - no update if not needed', async () => { await hub.db.createPerson(timestamp, { $current_url: 123 }, {}, {}, teamId, null, false, newUserUuid, [ - newUserDistinctId, + { distinctId: newUserDistinctId }, ]) const [person, kafkaAcks] = await personState({ @@ -581,7 +641,7 @@ describe('PersonState.update()', () => { it('updates person properties - always update for person events', async () => { await hub.db.createPerson(timestamp, { $current_url: 123 }, {}, {}, teamId, null, false, newUserUuid, [ - newUserDistinctId, + { distinctId: newUserDistinctId }, ]) const [person, kafkaAcks] = await personState({ @@ -614,7 +674,9 @@ describe('PersonState.update()', () => { }) it('updates person properties - always update if undefined before', async () => { - await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, false, newUserUuid, [newUserDistinctId]) + await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, false, newUserUuid, [ + { distinctId: newUserDistinctId }, + ]) const [person, kafkaAcks] = await personState({ event: '$pageview', @@ -655,7 +717,7 @@ describe('PersonState.update()', () => { null, false, newUserUuid, - [newUserDistinctId] + [{ distinctId: newUserDistinctId }] ) const [person, kafkaAcks] = await personState({ @@ -697,7 +759,7 @@ describe('PersonState.update()', () => { null, false, newUserUuid, - [newUserDistinctId] + [{ distinctId: newUserDistinctId }] ) const personS = personState({ @@ -736,7 +798,7 @@ describe('PersonState.update()', () => { it('does not update person if not needed', async () => { await hub.db.createPerson(timestamp, { b: 3, c: 4 }, {}, {}, teamId, null, false, newUserUuid, [ - newUserDistinctId, + { distinctId: newUserDistinctId }, ]) const [person, kafkaAcks] = await personState({ @@ -771,7 +833,9 @@ describe('PersonState.update()', () => { }) it('marks user as is_identified', async () => { - await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, false, newUserUuid, [newUserDistinctId]) + await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, false, newUserUuid, [ + { distinctId: newUserDistinctId }, + ]) const personS = personState({ event: '$pageview', distinct_id: newUserDistinctId, @@ -823,8 +887,8 @@ describe('PersonState.update()', () => { properties_last_operation: {}, } await hub.db.createPerson(timestamp, { a: 6, c: 8 }, {}, {}, teamId, null, true, newUserUuid, [ - newUserDistinctId, - oldUserDistinctId, + { distinctId: newUserDistinctId }, + { distinctId: oldUserDistinctId }, ]) // the merged Person const personS = personState({ @@ -901,7 +965,7 @@ describe('PersonState.update()', () => { uuid: newUserUuid, properties: { foo: 'bar' }, created_at: timestamp, - version: 1, + version: 0, is_identified: true, }) ) @@ -920,8 +984,8 @@ describe('PersonState.update()', () => { it(`marks is_identified to be updated when no changes to distinct_ids but $anon_distinct_id passe`, async () => { await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, false, newUserUuid, [ - newUserDistinctId, - oldUserDistinctId, + { distinctId: newUserDistinctId }, + { distinctId: oldUserDistinctId }, ]) const personS = personState({ @@ -954,7 +1018,9 @@ describe('PersonState.update()', () => { }) it(`add distinct id and marks user is_identified when passed $anon_distinct_id person does not exists and distinct_id does`, async () => { - await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, false, newUserUuid, [newUserDistinctId]) + await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, false, newUserUuid, [ + { distinctId: newUserDistinctId }, + ]) const personS = personState({ event: '$identify', @@ -990,7 +1056,9 @@ describe('PersonState.update()', () => { }) it(`add distinct id and marks user as is_identified when passed $anon_distinct_id person exists and distinct_id does not`, async () => { - await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, false, oldUserUuid, [oldUserDistinctId]) + await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, false, oldUserUuid, [ + { distinctId: oldUserDistinctId }, + ]) const personS = personState({ event: '$identify', @@ -1027,8 +1095,12 @@ describe('PersonState.update()', () => { }) it(`merge into distinct_id person and marks user as is_identified when both persons have is_identified false`, async () => { - await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, false, oldUserUuid, [oldUserDistinctId]) - await hub.db.createPerson(timestamp2, {}, {}, {}, teamId, null, false, newUserUuid, [newUserDistinctId]) + await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, false, oldUserUuid, [ + { distinctId: oldUserDistinctId }, + ]) + await hub.db.createPerson(timestamp2, {}, {}, {}, teamId, null, false, newUserUuid, [ + { distinctId: newUserDistinctId }, + ]) const [person, kafkaAcks] = await personState({ event: '$identify', @@ -1090,8 +1162,12 @@ describe('PersonState.update()', () => { }) it(`merge into distinct_id person and marks user as is_identified when distinct_id user is identified and $anon_distinct_id user is not`, async () => { - await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, false, oldUserUuid, [oldUserDistinctId]) - await hub.db.createPerson(timestamp2, {}, {}, {}, teamId, null, true, newUserUuid, [newUserDistinctId]) + await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, false, oldUserUuid, [ + { distinctId: oldUserDistinctId }, + ]) + await hub.db.createPerson(timestamp2, {}, {}, {}, teamId, null, true, newUserUuid, [ + { distinctId: newUserDistinctId }, + ]) const [person, kafkaAcks] = await personState({ event: '$identify', @@ -1153,8 +1229,12 @@ describe('PersonState.update()', () => { }) it(`does not merge people when distinct_id user is not identified and $anon_distinct_id user is`, async () => { - await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, true, oldUserUuid, [oldUserDistinctId]) - await hub.db.createPerson(timestamp2, {}, {}, {}, teamId, null, false, newUserUuid, [newUserDistinctId]) + await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, true, oldUserUuid, [ + { distinctId: oldUserDistinctId }, + ]) + await hub.db.createPerson(timestamp2, {}, {}, {}, teamId, null, false, newUserUuid, [ + { distinctId: newUserDistinctId }, + ]) const personS = personState({ event: '$identify', @@ -1202,8 +1282,12 @@ describe('PersonState.update()', () => { }) it(`does not merge people when both users are identified`, async () => { - await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, true, oldUserUuid, [oldUserDistinctId]) - await hub.db.createPerson(timestamp2, {}, {}, {}, teamId, null, true, newUserUuid, [newUserDistinctId]) + await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, true, oldUserUuid, [ + { distinctId: oldUserDistinctId }, + ]) + await hub.db.createPerson(timestamp2, {}, {}, {}, teamId, null, true, newUserUuid, [ + { distinctId: newUserDistinctId }, + ]) const [person, kafkaAcks] = await personState({ event: '$identify', @@ -1250,10 +1334,10 @@ describe('PersonState.update()', () => { it(`merge into distinct_id person and updates properties with $set/$set_once`, async () => { await hub.db.createPerson(timestamp, { a: 1, b: 2 }, {}, {}, teamId, null, false, oldUserUuid, [ - oldUserDistinctId, + { distinctId: oldUserDistinctId }, ]) await hub.db.createPerson(timestamp2, { b: 3, c: 4, d: 5 }, {}, {}, teamId, null, false, newUserUuid, [ - newUserDistinctId, + { distinctId: newUserDistinctId }, ]) const [person, kafkaAcks] = await personState({ @@ -1318,7 +1402,9 @@ describe('PersonState.update()', () => { }) it(`handles race condition when other thread creates the user`, async () => { - await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, false, oldUserUuid, [oldUserDistinctId]) + await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, false, oldUserUuid, [ + { distinctId: oldUserDistinctId }, + ]) // Fake the race by assuming createPerson was called before the addDistinctId creation above jest.spyOn(hub.db, 'addDistinctId').mockImplementation(async (person, distinctId) => { @@ -1331,7 +1417,7 @@ describe('PersonState.update()', () => { null, false, uuidFromDistinctId(teamId, distinctId), - [distinctId] + [{ distinctId }] ) await hub.db.addDistinctId(person, distinctId, 0) // this throws }) @@ -1435,8 +1521,12 @@ describe('PersonState.update()', () => { describe(`overrides: ${useOverridesMode}`, () => { // only difference between $merge_dangerously and $identify it(`merge_dangerously can merge people when alias id user is identified`, async () => { - await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, true, oldUserUuid, [oldUserDistinctId]) - await hub.db.createPerson(timestamp2, {}, {}, {}, teamId, null, true, newUserUuid, [newUserDistinctId]) + await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, true, oldUserUuid, [ + { distinctId: oldUserDistinctId }, + ]) + await hub.db.createPerson(timestamp2, {}, {}, {}, teamId, null, true, newUserUuid, [ + { distinctId: newUserDistinctId }, + ]) const [person, kafkaAcks] = await personState({ event: '$merge_dangerously', @@ -1569,7 +1659,7 @@ describe('PersonState.update()', () => { null, false, uuidFromDistinctId(teamId, 'anonymous_id'), - ['anonymous_id'] + [{ distinctId: 'anonymous_id' }] ) const identifiedPerson = await hub.db.createPerson( timestamp, @@ -1580,7 +1670,7 @@ describe('PersonState.update()', () => { null, false, uuidFromDistinctId(teamId, 'new_distinct_id'), - ['new_distinct_id'] + [{ distinctId: 'new_distinct_id' }] ) // existing overrides @@ -1646,7 +1736,7 @@ describe('PersonState.update()', () => { null, false, uuidFromDistinctId(teamId, 'anonymous_id'), - ['anonymous_id'] + [{ distinctId: 'anonymous_id' }] ) const identifiedPerson = await hub.db.createPerson( timestamp, @@ -1657,7 +1747,7 @@ describe('PersonState.update()', () => { null, false, uuidFromDistinctId(teamId, 'new_distinct_id'), - ['new_distinct_id'] + [{ distinctId: 'new_distinct_id' }] ) // existing overrides for both anonPerson and identifiedPerson @@ -1731,7 +1821,7 @@ describe('PersonState.update()', () => { null, false, uuidFromDistinctId(teamId, 'anonymous_id'), - ['anonymous_id'] + [{ distinctId: 'anonymous_id' }] ) const identifiedPerson = await hub.db.createPerson( timestamp, @@ -1742,7 +1832,7 @@ describe('PersonState.update()', () => { null, false, uuidFromDistinctId(teamId, 'new_distinct_id'), - ['new_distinct_id'] + [{ distinctId: 'new_distinct_id' }] ) await insertRow(hub.db.postgres, 'posthog_featureflaghashkeyoverride', { @@ -1814,8 +1904,8 @@ describe('PersonState.update()', () => { describe(`overrides: ${useOverridesMode}`, () => { it(`no-op if persons already merged`, async () => { await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, true, firstUserUuid, [ - firstUserDistinctId, - secondUserDistinctId, + { distinctId: firstUserDistinctId }, + { distinctId: secondUserDistinctId }, ]) const state: PersonState = personState({}, hub) jest.spyOn(hub.db.kafkaProducer, 'queueMessages') @@ -1852,7 +1942,7 @@ describe('PersonState.update()', () => { null, false, firstUserUuid, - [firstUserDistinctId] + [{ distinctId: firstUserDistinctId }] ) const second: InternalPerson = await hub.db.createPerson( timestamp, @@ -1863,7 +1953,7 @@ describe('PersonState.update()', () => { null, false, secondUserUuid, - [secondUserDistinctId] + [{ distinctId: secondUserDistinctId }] ) const state: PersonState = personState({}, hub) @@ -1945,7 +2035,7 @@ describe('PersonState.update()', () => { null, false, firstUserUuid, - [firstUserDistinctId] + [{ distinctId: firstUserDistinctId }] ) const second: InternalPerson = await hub.db.createPerson( timestamp, @@ -1956,7 +2046,7 @@ describe('PersonState.update()', () => { null, false, secondUserUuid, - [secondUserDistinctId] + [{ distinctId: secondUserDistinctId }] ) const state: PersonState = personState({}, hub) @@ -2005,10 +2095,10 @@ describe('PersonState.update()', () => { it(`retries merges up to retry limit if postgres down`, async () => { await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, false, firstUserUuid, [ - firstUserDistinctId, + { distinctId: firstUserDistinctId }, ]) await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, false, secondUserUuid, [ - secondUserDistinctId, + { distinctId: secondUserDistinctId }, ]) const state: PersonState = personState({}, hub) @@ -2054,10 +2144,10 @@ describe('PersonState.update()', () => { it(`handleIdentifyOrAlias does not throw on merge failure`, async () => { // TODO: This the current state, we should probably change it await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, false, firstUserUuid, [ - firstUserDistinctId, + { distinctId: firstUserDistinctId }, ]) await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, false, secondUserUuid, [ - secondUserDistinctId, + { distinctId: secondUserDistinctId }, ]) const state: PersonState = personState( @@ -2117,7 +2207,7 @@ describe('PersonState.update()', () => { null, false, firstUserUuid, - [firstUserDistinctId] + [{ distinctId: firstUserDistinctId }] ) const second: InternalPerson = await hub.db.createPerson( timestamp, @@ -2128,7 +2218,7 @@ describe('PersonState.update()', () => { null, false, secondUserUuid, - [secondUserDistinctId] + [{ distinctId: secondUserDistinctId }] ) const state: PersonState = personState({}, hub) @@ -2245,7 +2335,7 @@ describe('PersonState.update()', () => { null, false, firstUserUuid, - [firstUserDistinctId] + [{ distinctId: firstUserDistinctId }] ) const second: InternalPerson = await hub.db.createPerson( timestamp.plus({ minutes: 2 }), @@ -2256,7 +2346,7 @@ describe('PersonState.update()', () => { null, false, secondUserUuid, - [secondUserDistinctId] + [{ distinctId: secondUserDistinctId }] ) const third: InternalPerson = await hub.db.createPerson( timestamp.plus({ minutes: 5 }), @@ -2267,7 +2357,7 @@ describe('PersonState.update()', () => { null, false, new UUIDT().toString(), - ['third'] + [{ distinctId: 'third' }] ) // We want to simulate a concurrent update to person_overrides. We do @@ -2393,7 +2483,7 @@ describe('PersonState.update()', () => { null, false, firstUserUuid, - [firstUserDistinctId] + [{ distinctId: firstUserDistinctId }] ) const second: InternalPerson = await hub.db.createPerson( timestamp.plus({ minutes: 2 }), @@ -2404,7 +2494,7 @@ describe('PersonState.update()', () => { null, false, secondUserUuid, - [secondUserDistinctId] + [{ distinctId: secondUserDistinctId }] ) const third: InternalPerson = await hub.db.createPerson( timestamp.plus({ minutes: 5 }), @@ -2415,7 +2505,7 @@ describe('PersonState.update()', () => { null, false, new UUIDT().toString(), - ['third'] + [{ distinctId: 'third' }] ) await personState( diff --git a/plugin-server/tests/worker/ingestion/postgres-parity.test.ts b/plugin-server/tests/worker/ingestion/postgres-parity.test.ts index 2879328275478..632241251331d 100644 --- a/plugin-server/tests/worker/ingestion/postgres-parity.test.ts +++ b/plugin-server/tests/worker/ingestion/postgres-parity.test.ts @@ -78,7 +78,7 @@ describe('postgres parity', () => { null, true, uuid, - ['distinct1', 'distinct2'] + [{ distinctId: 'distinct1' }, { distinctId: 'distinct2' }] ) await delayUntilEventIngested(() => hub.db.fetchPersons(Database.ClickHouse)) await delayUntilEventIngested(() => hub.db.fetchDistinctIdValues(person, Database.ClickHouse), 2) @@ -170,7 +170,7 @@ describe('postgres parity', () => { null, false, uuid, - ['distinct1', 'distinct2'] + [{ distinctId: 'distinct1' }, { distinctId: 'distinct2' }] ) await delayUntilEventIngested(() => hub.db.fetchPersons(Database.ClickHouse)) await delayUntilEventIngested(() => hub.db.fetchDistinctIdValues(person, Database.ClickHouse), 2) @@ -251,7 +251,7 @@ describe('postgres parity', () => { null, true, uuid, - ['distinct1'] + [{ distinctId: 'distinct1' }] ) const anotherPerson = await hub.db.createPerson( DateTime.utc(), @@ -262,7 +262,7 @@ describe('postgres parity', () => { null, true, uuid2, - ['another_distinct_id'] + [{ distinctId: 'another_distinct_id' }] ) await delayUntilEventIngested(() => hub.db.fetchPersons(Database.ClickHouse)) const [postgresPerson] = await hub.db.fetchPersons(Database.Postgres) @@ -334,7 +334,7 @@ describe('postgres parity', () => { null, false, uuid, - ['distinct1'] + [{ distinctId: 'distinct1' }] ) const anotherPerson = await hub.db.createPerson( DateTime.utc(), @@ -345,7 +345,7 @@ describe('postgres parity', () => { null, true, uuid2, - ['another_distinct_id'] + [{ distinctId: 'another_distinct_id' }] ) await delayUntilEventIngested(() => hub.db.fetchPersons(Database.ClickHouse)) const [postgresPerson] = await hub.db.fetchPersons(Database.Postgres) diff --git a/plugin-server/tests/worker/ingestion/process-event.test.ts b/plugin-server/tests/worker/ingestion/process-event.test.ts index b9947bb7eec74..e4353bdfc36f0 100644 --- a/plugin-server/tests/worker/ingestion/process-event.test.ts +++ b/plugin-server/tests/worker/ingestion/process-event.test.ts @@ -63,7 +63,7 @@ describe('EventsProcessor#createEvent()', () => { null, false, personUuid, - ['my_id'] + [{ distinctId: 'my_id' }] ) }) diff --git a/plugin-server/tests/worker/ingestion/properties-updater.test.ts b/plugin-server/tests/worker/ingestion/properties-updater.test.ts index b5bc38d64d2d8..16cde1c7e84ab 100644 --- a/plugin-server/tests/worker/ingestion/properties-updater.test.ts +++ b/plugin-server/tests/worker/ingestion/properties-updater.test.ts @@ -29,7 +29,7 @@ describe('properties-updater', () => { db = hub.db team = await getFirstTeam(hub) - await db.createPerson(PAST_TIMESTAMP, {}, {}, {}, team.id, null, false, uuid, [distinctId]) + await db.createPerson(PAST_TIMESTAMP, {}, {}, {}, team.id, null, false, uuid, [{ distinctId }]) jest.spyOn(hub.db, 'updateGroup') jest.spyOn(hub.db, 'insertGroup') diff --git a/posthog/migrations/0432_personlessdistinctid.py b/posthog/migrations/0432_personlessdistinctid.py new file mode 100644 index 0000000000000..96c6bcfc8717b --- /dev/null +++ b/posthog/migrations/0432_personlessdistinctid.py @@ -0,0 +1,32 @@ +# Generated by Django 4.2.11 on 2024-07-01 16:50 + +from django.db import migrations, models +import django.db.models.deletion + + +class Migration(migrations.Migration): + dependencies = [ + ("posthog", "0431_externaldataschema_sync_type_payload"), + ] + + operations = [ + migrations.CreateModel( + name="PersonlessDistinctId", + fields=[ + ("id", models.BigAutoField(primary_key=True, serialize=False)), + ("distinct_id", models.CharField(max_length=400)), + ("is_merged", models.BooleanField(default=False)), + ("created_at", models.DateTimeField(auto_now_add=True)), + ( + "team", + models.ForeignKey(db_index=False, on_delete=django.db.models.deletion.CASCADE, to="posthog.team"), + ), + ], + ), + migrations.AddConstraint( + model_name="personlessdistinctid", + constraint=models.UniqueConstraint( + fields=("team", "distinct_id"), name="unique personless distinct_id for team" + ), + ), + ] diff --git a/posthog/models/person/person.py b/posthog/models/person/person.py index 72a5bd7c79948..7ccd7acabc650 100644 --- a/posthog/models/person/person.py +++ b/posthog/models/person/person.py @@ -128,6 +128,19 @@ class Meta: version: models.BigIntegerField = models.BigIntegerField(null=True, blank=True) +class PersonlessDistinctId(models.Model): + class Meta: + constraints = [ + models.UniqueConstraint(fields=["team", "distinct_id"], name="unique personless distinct_id for team") + ] + + id: models.BigAutoField = models.BigAutoField(primary_key=True) + team: models.ForeignKey = models.ForeignKey("Team", on_delete=models.CASCADE, db_index=False) + distinct_id: models.CharField = models.CharField(max_length=400) + is_merged: models.BooleanField = models.BooleanField(default=False) + created_at: models.DateTimeField = models.DateTimeField(auto_now_add=True, blank=True) + + class PersonOverrideMapping(models.Model): """A model of persons to be overriden in merge or merge-like events."""