diff --git a/plugin-server/src/utils/db/db.ts b/plugin-server/src/utils/db/db.ts index 9501f5a58cbf54..de57b602725cce 100644 --- a/plugin-server/src/utils/db/db.ts +++ b/plugin-server/src/utils/db/db.ts @@ -641,20 +641,13 @@ export class DB { isUserId: number | null, isIdentified: boolean, uuid: string, - distinctIds?: { distinctId: string; version?: number }[], - tx?: TransactionClient + distinctIds?: string[], + version = 0 ): Promise { distinctIds ||= [] - for (const distinctId of distinctIds) { - distinctId.version ||= 0 - } - - // The Person is being created, and so we can hardcode version 0! - const personVersion = 0 - const { rows } = await this.postgres.query( - tx ?? PostgresUse.COMMON_WRITE, + PostgresUse.COMMON_WRITE, `WITH inserted_person AS ( INSERT INTO posthog_person ( created_at, properties, properties_last_updated_at, @@ -669,12 +662,7 @@ export class DB { // `addDistinctIdPooled` (_, index) => `, distinct_id_${index} AS ( INSERT INTO posthog_persondistinctid (distinct_id, person_id, team_id, version) - VALUES ( - $${11 + index + distinctIds!.length - 1}, - (SELECT id FROM inserted_person), - $5, - $${10 + index}) - )` + VALUES ($${10 + index}, (SELECT id FROM inserted_person), $5, $9))` ) .join('') + `SELECT * FROM inserted_person;`, @@ -687,21 +675,14 @@ export class DB { isUserId, isIdentified, uuid, - personVersion, + version, // The copy and reverse here is to maintain compatability with pre-existing code // and tests. Postgres appears to assign IDs in reverse order of the INSERTs in the // CTEs above, so we need to reverse the distinctIds to match the old behavior where // we would do a round trip for each INSERT. We shouldn't actually depend on the // `id` column of distinct_ids, so this is just a simple way to keeps tests exactly // the same and prove behavior is the same as before. - ...distinctIds - .slice() - .reverse() - .map(({ version }) => version), - ...distinctIds - .slice() - .reverse() - .map(({ distinctId }) => distinctId), + ...distinctIds.slice().reverse(), ], 'insertPerson' ) @@ -717,8 +698,8 @@ export class DB { value: JSON.stringify({ person_id: person.uuid, team_id: teamId, - distinct_id: distinctId.distinctId, - version: distinctId.version, + distinct_id: distinctId, + version, is_deleted: 0, }), }, @@ -849,50 +830,8 @@ export class DB { return personDistinctIds.map((pdi) => pdi.distinct_id) } - public async addPersonlessDistinctId(teamId: number, distinctId: string): Promise { - const result = await this.postgres.query( - PostgresUse.COMMON_WRITE, - ` - INSERT INTO posthog_personlessdistinctid (team_id, distinct_id, is_merged, created_at) - VALUES ($1, $2, false, now()) - ON CONFLICT (team_id, distinct_id) DO NOTHING - RETURNING is_merged - `, - [teamId, distinctId], - 'addPersonlessDistinctId' - ) - - return result.rows[0]['is_merged'] - } - - public async addPersonlessDistinctIdForMerge( - teamId: number, - distinctId: string, - tx?: TransactionClient - ): Promise { - const result = await this.postgres.query( - tx ?? PostgresUse.COMMON_WRITE, - ` - INSERT INTO posthog_personlessdistinctid (team_id, distinct_id, is_merged, created_at) - VALUES ($1, $2, true, now()) - ON CONFLICT (team_id, distinct_id) DO UPDATE - SET is_merged = true - RETURNING (xmax = 0) AS inserted - `, - [teamId, distinctId], - 'addPersonlessDistinctIdForMerge' - ) - - return result.rows[0].inserted - } - - public async addDistinctId( - person: InternalPerson, - distinctId: string, - version: number, - tx?: TransactionClient - ): Promise { - const kafkaMessages = await this.addDistinctIdPooled(person, distinctId, version, tx) + public async addDistinctId(person: InternalPerson, distinctId: string, version: number): Promise { + const kafkaMessages = await this.addDistinctIdPooled(person, distinctId, version) if (kafkaMessages.length) { await this.kafkaProducer.queueMessages({ kafkaMessages, waitForAck: true }) } diff --git a/plugin-server/src/worker/ingestion/person-state.ts b/plugin-server/src/worker/ingestion/person-state.ts index 3475bc669528a1..b0fd16fbde6257 100644 --- a/plugin-server/src/worker/ingestion/person-state.ts +++ b/plugin-server/src/worker/ingestion/person-state.ts @@ -1,12 +1,10 @@ import { PluginEvent, Properties } from '@posthog/plugin-scaffold' import * as Sentry from '@sentry/node' import { ProducerRecord } from 'kafkajs' -import LRU from 'lru-cache' import { DateTime } from 'luxon' import { Counter } from 'prom-client' import { KafkaProducerWrapper } from 'utils/db/kafka-producer-wrapper' -import { ONE_HOUR } from '../../config/constants' import { KAFKA_PERSON_OVERRIDE } from '../../config/kafka-topics' import { InternalPerson, Person, PropertyUpdateOperation, TimestampFormat } from '../../types' import { DB } from '../../utils/db/db' @@ -59,16 +57,6 @@ const BARE_CASE_INSENSITIVE_ILLEGAL_IDS = [ 'false', ] -// Tracks whether we know we've already inserted a `posthog_personlessdistinctid` for the given -// (team_id, distinct_id) pair. If we have, then we can skip the INSERT attempt. -// TODO: Move this out of module scope, we don't currently have a clean place (outside of the Hub) -// to stash longer lived objects like caches. For now it's not important. -const PERSONLESS_DISTINCT_ID_INSERTED_CACHE = new LRU({ - max: 10_000, - maxAge: ONE_HOUR * 24, // cache up to 24h - updateAgeOnGet: true, -}) - const BARE_CASE_SENSITIVE_ILLEGAL_IDS = ['[object Object]', 'NaN', 'None', 'none', 'null', '0', 'undefined'] const PERSON_EVENTS = new Set(['$identify', '$create_alias', '$merge_dangerously', '$set']) @@ -122,34 +110,7 @@ export class PersonState { async update(): Promise<[Person, Promise]> { if (!this.processPerson) { - let existingPerson = await this.db.fetchPerson(this.teamId, this.distinctId, { useReadReplica: true }) - - if (!existingPerson) { - // See the comment in `mergeDistinctIds`. We are inserting a row into `posthog_personlessdistinctid` - // to note that this Distinct ID has been used in "personless" mode. This is necessary - // so that later, during a merge, we can decide whether we need to write out an override - // or not. - - const personlessDistinctIdCacheKey = `${this.teamId}|${this.distinctId}` - if (!PERSONLESS_DISTINCT_ID_INSERTED_CACHE.get(personlessDistinctIdCacheKey)) { - const personIsMerged = await this.db.addPersonlessDistinctId(this.teamId, this.distinctId) - - // We know the row is in PG now, and so future events for this Distinct ID can - // skip the PG I/O. - PERSONLESS_DISTINCT_ID_INSERTED_CACHE.set(personlessDistinctIdCacheKey, true) - - if (personIsMerged) { - // If `personIsMerged` comes back `true`, it means the `posthog_personlessdistinctid` - // has been updated by a merge (either since we called `fetchPerson` above, plus - // replication lag). We need to check `fetchPerson` again (this time using the leader) - // so that we properly associate this event with the Person we got merged into. - existingPerson = await this.db.fetchPerson(this.teamId, this.distinctId, { - useReadReplica: false, - }) - } - } - } - + const existingPerson = await this.db.fetchPerson(this.teamId, this.distinctId, { useReadReplica: true }) if (existingPerson) { const person = existingPerson as Person @@ -243,7 +204,7 @@ export class PersonState { // :NOTE: This should never be set in this branch, but adding this for logical consistency this.updateIsIdentified, this.event.uuid, - [{ distinctId: this.distinctId }] + [this.distinctId] ) return [person, true] } @@ -256,13 +217,13 @@ export class PersonState { isUserId: number | null, isIdentified: boolean, creatorEventUuid: string, - distinctIds: { distinctId: string; version?: number }[], - tx?: TransactionClient + distinctIds: string[], + version = 0 ): Promise { if (distinctIds.length < 1) { throw new Error('at least 1 distinctId is required in `createPerson`') } - const uuid = uuidFromDistinctId(teamId, distinctIds[0].distinctId) + const uuid = uuidFromDistinctId(teamId, distinctIds[0]) const props = { ...propertiesOnce, ...properties, ...{ $creator_event_uuid: creatorEventUuid } } const propertiesLastOperation: Record = {} @@ -286,7 +247,7 @@ export class PersonState { isIdentified, uuid, distinctIds, - tx + version ) } @@ -489,144 +450,57 @@ export class PersonState { const otherPerson = await this.db.fetchPerson(teamId, otherPersonDistinctId) const mergeIntoPerson = await this.db.fetchPerson(teamId, mergeIntoDistinctId) - // A note about the `distinctIdVersion` logic you'll find below: - // // Historically, we always INSERT-ed new `posthog_persondistinctid` rows with `version=0`. // Overrides are only created when the version is > 0, see: // https://github.com/PostHog/posthog/blob/92e17ce307a577c4233d4ab252eebc6c2207a5ee/posthog/models/person/sql.py#L269-L287 // - // With the addition of optional person profile processing, we are no longer creating + // With the addition of optional person processing, we are no longer creating // `posthog_persondistinctid` and `posthog_person` rows when $process_person_profile=false. - // This means that at merge time, it's possible this `distinct_id` and its deterministically - // generated `person.uuid` has already been used for events in ClickHouse, but they have no - // corresponding rows in the `posthog_persondistinctid` or `posthog_person` tables. - // - // For this reason, $process_person_profile=false write to the `posthog_personlessdistinctid` - // table just to note that a given Distinct ID was used for "personless" mode. Then, during - // our merges transactions below, we do two things: - // 1. We check whether a row exists in `posthog_personlessdistinctid` for that Distinct ID, - // if so, we need to write out `posthog_persondistinctid` rows with `version=1` so that - // an override is created in ClickHouse which will associate the old "personless" events - // with the Person UUID they were merged into. - // 2. We insert and/or update the `posthog_personlessdistinctid` ourselves, to mark that - // the Distinct ID has been merged. This is important so that an event being processed - // concurrently for that Distinct ID doesn't emit an event and _miss_ that a different - // Person UUID needs to be used now. (See the `processPerson` code in `update` for more.) - - if ((otherPerson && !mergeIntoPerson) || (!otherPerson && mergeIntoPerson)) { - // Only one of the two Distinct IDs points at an existing Person - - const [existingPerson, distinctIdToAdd] = (() => { - if (otherPerson) { - return [otherPerson!, mergeIntoDistinctId] - } else { - return [mergeIntoPerson!, otherPersonDistinctId] - } - })() - - return await this.db.postgres.transaction( - PostgresUse.COMMON_WRITE, - 'mergeDistinctIds-OneExists', - async (tx) => { - // See comment above about `distinctIdVersion` - const _insertedDistinctId = await this.db.addPersonlessDistinctIdForMerge( - this.teamId, - distinctIdToAdd, - tx - ) - const distinctIdVersion = 1 // TODO: Once `posthog_personlessdistinctid` is backfilled: insertedDistinctId ? 0 : 1 - - await this.db.addDistinctId(existingPerson, distinctIdToAdd, distinctIdVersion, tx) - return [existingPerson, Promise.resolve()] - } - ) + // This means that: + // 1. At merge time, it's possible this `distinct_id` and its deterministically generated + // `person.uuid` has already been used for events in ClickHouse, but they have no + // corresponding rows in the `posthog_persondistinctid` or `posthog_person` tables + // 2. We need to assume the `distinct_id`/`person.uuid` have been used before (by + // `$process_person_profile=false` events) and create an override row for this + // `distinct_id` even though we're just now INSERT-ing it into Postgres/ClickHouse. We do + // this by starting with `version=1`, as if we had just deleted the old user and were + // updating the `distinct_id` row as part of the merge + const addDistinctIdVersion = 1 + + if (otherPerson && !mergeIntoPerson) { + await this.db.addDistinctId(otherPerson, mergeIntoDistinctId, addDistinctIdVersion) + return [otherPerson, Promise.resolve()] + } else if (!otherPerson && mergeIntoPerson) { + await this.db.addDistinctId(mergeIntoPerson, otherPersonDistinctId, addDistinctIdVersion) + return [mergeIntoPerson, Promise.resolve()] } else if (otherPerson && mergeIntoPerson) { - // Both Distinct IDs point at an existing Person - if (otherPerson.id == mergeIntoPerson.id) { - // Nothing to do, they are the same Person return [mergeIntoPerson, Promise.resolve()] } - return await this.mergePeople({ mergeInto: mergeIntoPerson, mergeIntoDistinctId: mergeIntoDistinctId, otherPerson: otherPerson, otherPersonDistinctId: otherPersonDistinctId, }) - } else { - // Neither Distinct ID points at an existing Person - - let distinctId1 = mergeIntoDistinctId - let distinctId2 = otherPersonDistinctId - - return await this.db.postgres.transaction( - PostgresUse.COMMON_WRITE, - 'mergeDistinctIds-NeitherExist', - async (tx) => { - // See comment above about `distinctIdVersion` - const insertedDistinctId1 = await this.db.addPersonlessDistinctIdForMerge( - this.teamId, - distinctId1, - tx - ) - - // See comment above about `distinctIdVersion` - const insertedDistinctId2 = await this.db.addPersonlessDistinctIdForMerge( - this.teamId, - distinctId2, - tx - ) - - // `createPerson` uses the first Distinct ID provided to generate the Person - // UUID. That means the first Distinct ID definitely doesn't need an override, - // and can always use version 0. Below, we exhaust all of the options to decide - // whether we can optimize away an override by doing a swap, or whether we - // need to actually write an override. (But mostly we're being verbose for - // documentation purposes) - let distinctId2Version = 1 // TODO: Once `posthog_personlessdistinctid` is backfilled, this should be = 0 - if (insertedDistinctId1 && insertedDistinctId2) { - // We were the first to insert both (neither was used for Personless), so we - // can use either as the primary Person UUID and create no overrides. - } else if (insertedDistinctId1 && !insertedDistinctId2) { - // We created 1, but 2 was already used for Personless. Let's swap so - // that 2 can be the primary Person UUID and no override is needed. - ;[distinctId1, distinctId2] = [distinctId2, distinctId1] - } else if (!insertedDistinctId1 && insertedDistinctId2) { - // We created 2, but 1 was already used for Personless, so we want to - // use 1 as the primary Person UUID so that no override is needed. - } else if (!insertedDistinctId1 && !insertedDistinctId2) { - // Both were used in Personless mode, so there is no more-correct choice of - // primary Person UUID to make here, and we need to drop an override by - // using version = 1 for Distinct ID 2. - distinctId2Version = 1 - } - - // The first Distinct ID is used to create the new Person's UUID, and so it - // never needs an override. - const distinctId1Version = 0 - - return [ - await this.createPerson( - // TODO: in this case we could skip the properties updates later - timestamp, - this.eventProperties['$set'] || {}, - this.eventProperties['$set_once'] || {}, - teamId, - null, - true, - this.event.uuid, - [ - { distinctId: distinctId1, version: distinctId1Version }, - { distinctId: distinctId2, version: distinctId2Version }, - ], - tx - ), - Promise.resolve(), - ] - } - ) } + + // The last case: (!oldPerson && !newPerson) + return [ + await this.createPerson( + // TODO: in this case we could skip the properties updates later + timestamp, + this.eventProperties['$set'] || {}, + this.eventProperties['$set_once'] || {}, + teamId, + null, + true, + this.event.uuid, + [mergeIntoDistinctId, otherPersonDistinctId], + addDistinctIdVersion + ), + Promise.resolve(), + ] } public async mergePeople({ diff --git a/plugin-server/tests/main/db.test.ts b/plugin-server/tests/main/db.test.ts index 670685f445ddce..8d419b0b9fdb1e 100644 --- a/plugin-server/tests/main/db.test.ts +++ b/plugin-server/tests/main/db.test.ts @@ -294,7 +294,7 @@ describe('DB', () => { }) test('without properties', async () => { - const person = await db.createPerson(TIMESTAMP, {}, {}, {}, team.id, null, false, uuid, [{ distinctId }]) + const person = await db.createPerson(TIMESTAMP, {}, {}, {}, team.id, null, false, uuid, [distinctId]) const fetched_person = await fetchPersonByPersonId(team.id, person.id) expect(fetched_person!.is_identified).toEqual(false) @@ -306,7 +306,7 @@ describe('DB', () => { }) test('without properties indentified true', async () => { - const person = await db.createPerson(TIMESTAMP, {}, {}, {}, team.id, null, true, uuid, [{ distinctId }]) + const person = await db.createPerson(TIMESTAMP, {}, {}, {}, team.id, null, true, uuid, [distinctId]) const fetched_person = await fetchPersonByPersonId(team.id, person.id) expect(fetched_person!.is_identified).toEqual(true) expect(fetched_person!.properties).toEqual({}) @@ -326,7 +326,7 @@ describe('DB', () => { null, false, uuid, - [{ distinctId }] + [distinctId] ) const fetched_person = await fetchPersonByPersonId(team.id, person.id) expect(fetched_person!.is_identified).toEqual(false) @@ -354,7 +354,7 @@ describe('DB', () => { const distinctId = 'distinct_id1' // Note that we update the person badly in case of concurrent updates, but lets make sure we're consistent const personDbBefore = await db.createPerson(TIMESTAMP, { c: 'aaa' }, {}, {}, team.id, null, false, uuid, [ - { distinctId }, + distinctId, ]) const providedPersonTs = DateTime.fromISO('2000-04-04T11:42:06.502Z').toUTC() const personProvided = { ...personDbBefore, properties: { c: 'bbb' }, created_at: providedPersonTs } @@ -486,7 +486,7 @@ describe('DB', () => { const team = await getFirstTeam(hub) const uuid = new UUIDT().toString() const createdPerson = await db.createPerson(TIMESTAMP, { foo: 'bar' }, {}, {}, team.id, null, true, uuid, [ - { distinctId: 'some_id' }, + 'some_id', ]) const person = await db.fetchPerson(team.id, 'some_id') @@ -852,7 +852,7 @@ describe('DB', () => { null, false, new UUIDT().toString(), - [{ distinctId: 'source_person' }] + ['source_person'] ) const targetPerson = await db.createPerson( TIMESTAMP, @@ -863,7 +863,7 @@ describe('DB', () => { null, false, new UUIDT().toString(), - [{ distinctId: 'target_person' }] + ['target_person'] ) sourcePersonID = sourcePerson.id targetPersonID = targetPerson.id diff --git a/plugin-server/tests/main/process-event.test.ts b/plugin-server/tests/main/process-event.test.ts index 9d9056ce8c3809..0cad2f8b075563 100644 --- a/plugin-server/tests/main/process-event.test.ts +++ b/plugin-server/tests/main/process-event.test.ts @@ -50,7 +50,7 @@ export async function createPerson( null, false, new UUIDT().toString(), - distinctIds.map((distinctId) => ({ distinctId })) + distinctIds ) } diff --git a/plugin-server/tests/worker/ingestion/action-matcher.test.ts b/plugin-server/tests/worker/ingestion/action-matcher.test.ts index d4a5770355a4bb..a66a8d03bb2be4 100644 --- a/plugin-server/tests/worker/ingestion/action-matcher.test.ts +++ b/plugin-server/tests/worker/ingestion/action-matcher.test.ts @@ -741,7 +741,7 @@ describe('ActionMatcher', () => { null, true, new UUIDT().toString(), - [{ distinctId: 'random' }] + ['random'] ) const cohortPerson = await hub.db.createPerson( @@ -753,7 +753,7 @@ describe('ActionMatcher', () => { null, true, new UUIDT().toString(), - [{ distinctId: 'cohort' }] + ['cohort'] ) await hub.db.addPersonToCohort(testCohort.id, cohortPerson.id, testCohort.version) diff --git a/plugin-server/tests/worker/ingestion/event-pipeline/prepareEventStep.test.ts b/plugin-server/tests/worker/ingestion/event-pipeline/prepareEventStep.test.ts index d09a149d44c8b6..4c1467653f3245 100644 --- a/plugin-server/tests/worker/ingestion/event-pipeline/prepareEventStep.test.ts +++ b/plugin-server/tests/worker/ingestion/event-pipeline/prepareEventStep.test.ts @@ -59,7 +59,7 @@ describe('prepareEventStep()', () => { // :KLUDGE: We test below whether kafka messages are produced, so make sure the person exists beforehand. await hub.db.createPerson(person.created_at, {}, {}, {}, pluginEvent.team_id, null, false, person.uuid, [ - { distinctId: 'my_id' }, + 'my_id', ]) hub.db.kafkaProducer!.queueMessage = jest.fn() diff --git a/plugin-server/tests/worker/ingestion/person-state.test.ts b/plugin-server/tests/worker/ingestion/person-state.test.ts index 06bd8086ead840..bed64d243e6c56 100644 --- a/plugin-server/tests/worker/ingestion/person-state.test.ts +++ b/plugin-server/tests/worker/ingestion/person-state.test.ts @@ -222,97 +222,39 @@ describe('PersonState.update()', () => { expect(distinctIds).toEqual(expect.arrayContaining([])) }) - it('overrides are created only when distinct_id is in posthog_personlessdistinctid', async () => { - // oldUserDistinctId exists, and 'old2' will merge into it, but not create an override - await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, false, oldUserUuid, [ - { distinctId: oldUserDistinctId }, - ]) - - // newUserDistinctId exists, and 'new2' will merge into it, and will create an override - await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, false, newUserUuid, [ - { distinctId: newUserDistinctId }, - ]) - await hub.db.addPersonlessDistinctId(teamId, 'new2') + it('merging creates an override and force_upgrade works', async () => { + await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, false, oldUserUuid, [oldUserDistinctId]) const hubParam = undefined - const processPerson = true + let processPerson = true const [_person, kafkaAcks] = await personState( - { - event: '$identify', - distinct_id: oldUserDistinctId, - properties: { - $anon_distinct_id: 'old2', - }, - }, - hubParam, - processPerson - ).update() - - const [_person2, kafkaAcks2] = await personState( { event: '$identify', distinct_id: newUserDistinctId, properties: { - $anon_distinct_id: 'new2', + $anon_distinct_id: oldUserDistinctId, }, }, hubParam, processPerson ).update() - await hub.db.kafkaProducer.flush() await kafkaAcks - await kafkaAcks2 - // new2 has an override, because it was in posthog_personlessdistinctid - await delayUntilEventIngested(() => fetchOverridesForDistinctId('new2')) - const chOverrides = await fetchOverridesForDistinctId('new2') + await delayUntilEventIngested(() => fetchOverridesForDistinctId(newUserDistinctId)) + const chOverrides = await fetchOverridesForDistinctId(newUserDistinctId) expect(chOverrides.length).toEqual(1) - expect(chOverrides).toEqual( - expect.arrayContaining([ - expect.objectContaining({ - distinct_id: 'new2', - person_id: newUserUuid, - version: 1, - }), - ]) - ) - // old2 does have an override, because we are temporarily writing out unnecessary - // overrides while we backfill `posthog_personlessdistinctid` - const chOverridesOld = await fetchOverridesForDistinctId('old2') - expect(chOverridesOld.length).toEqual(1) - expect(chOverridesOld).toEqual( + // Override created for Person that never existed in the DB + expect(chOverrides).toEqual( expect.arrayContaining([ expect.objectContaining({ - distinct_id: 'old2', + distinct_id: newUserDistinctId, person_id: oldUserUuid, version: 1, }), ]) ) - }) - - it('force_upgrade works', async () => { - await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, false, oldUserUuid, [ - { distinctId: oldUserDistinctId }, - ]) - - const hubParam = undefined - let processPerson = true - const [_person, kafkaAcks] = await personState( - { - event: '$identify', - distinct_id: newUserDistinctId, - properties: { - $anon_distinct_id: oldUserDistinctId, - }, - }, - hubParam, - processPerson - ).update() - await hub.db.kafkaProducer.flush() - await kafkaAcks // Using the `distinct_id` again with `processPerson=false` results in // `force_upgrade=true` and real Person `uuid` and `created_at` @@ -436,9 +378,7 @@ describe('PersonState.update()', () => { }) it('handles person being created in a race condition', async () => { - await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, false, newUserUuid, [ - { distinctId: newUserDistinctId }, - ]) + await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, false, newUserUuid, [newUserDistinctId]) jest.spyOn(hub.db, 'fetchPerson').mockImplementationOnce(() => { return Promise.resolve(undefined) @@ -475,7 +415,7 @@ describe('PersonState.update()', () => { it('handles person being created in a race condition updates properties if needed', async () => { await hub.db.createPerson(timestamp, { b: 3, c: 4 }, {}, {}, teamId, null, false, newUserUuid, [ - { distinctId: newUserDistinctId }, + newUserDistinctId, ]) jest.spyOn(hub.db, 'fetchPerson').mockImplementationOnce(() => { @@ -563,7 +503,7 @@ describe('PersonState.update()', () => { null, false, newUserUuid, - [{ distinctId: newUserDistinctId }] + [newUserDistinctId] ) const [person, kafkaAcks] = await personState({ @@ -599,7 +539,7 @@ describe('PersonState.update()', () => { it('updates person properties - no update if not needed', async () => { await hub.db.createPerson(timestamp, { $current_url: 123 }, {}, {}, teamId, null, false, newUserUuid, [ - { distinctId: newUserDistinctId }, + newUserDistinctId, ]) const [person, kafkaAcks] = await personState({ @@ -641,7 +581,7 @@ describe('PersonState.update()', () => { it('updates person properties - always update for person events', async () => { await hub.db.createPerson(timestamp, { $current_url: 123 }, {}, {}, teamId, null, false, newUserUuid, [ - { distinctId: newUserDistinctId }, + newUserDistinctId, ]) const [person, kafkaAcks] = await personState({ @@ -674,9 +614,7 @@ describe('PersonState.update()', () => { }) it('updates person properties - always update if undefined before', async () => { - await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, false, newUserUuid, [ - { distinctId: newUserDistinctId }, - ]) + await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, false, newUserUuid, [newUserDistinctId]) const [person, kafkaAcks] = await personState({ event: '$pageview', @@ -717,7 +655,7 @@ describe('PersonState.update()', () => { null, false, newUserUuid, - [{ distinctId: newUserDistinctId }] + [newUserDistinctId] ) const [person, kafkaAcks] = await personState({ @@ -759,7 +697,7 @@ describe('PersonState.update()', () => { null, false, newUserUuid, - [{ distinctId: newUserDistinctId }] + [newUserDistinctId] ) const personS = personState({ @@ -798,7 +736,7 @@ describe('PersonState.update()', () => { it('does not update person if not needed', async () => { await hub.db.createPerson(timestamp, { b: 3, c: 4 }, {}, {}, teamId, null, false, newUserUuid, [ - { distinctId: newUserDistinctId }, + newUserDistinctId, ]) const [person, kafkaAcks] = await personState({ @@ -833,9 +771,7 @@ describe('PersonState.update()', () => { }) it('marks user as is_identified', async () => { - await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, false, newUserUuid, [ - { distinctId: newUserDistinctId }, - ]) + await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, false, newUserUuid, [newUserDistinctId]) const personS = personState({ event: '$pageview', distinct_id: newUserDistinctId, @@ -887,8 +823,8 @@ describe('PersonState.update()', () => { properties_last_operation: {}, } await hub.db.createPerson(timestamp, { a: 6, c: 8 }, {}, {}, teamId, null, true, newUserUuid, [ - { distinctId: newUserDistinctId }, - { distinctId: oldUserDistinctId }, + newUserDistinctId, + oldUserDistinctId, ]) // the merged Person const personS = personState({ @@ -965,7 +901,7 @@ describe('PersonState.update()', () => { uuid: newUserUuid, properties: { foo: 'bar' }, created_at: timestamp, - version: 0, + version: 1, is_identified: true, }) ) @@ -984,8 +920,8 @@ describe('PersonState.update()', () => { it(`marks is_identified to be updated when no changes to distinct_ids but $anon_distinct_id passe`, async () => { await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, false, newUserUuid, [ - { distinctId: newUserDistinctId }, - { distinctId: oldUserDistinctId }, + newUserDistinctId, + oldUserDistinctId, ]) const personS = personState({ @@ -1018,9 +954,7 @@ describe('PersonState.update()', () => { }) it(`add distinct id and marks user is_identified when passed $anon_distinct_id person does not exists and distinct_id does`, async () => { - await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, false, newUserUuid, [ - { distinctId: newUserDistinctId }, - ]) + await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, false, newUserUuid, [newUserDistinctId]) const personS = personState({ event: '$identify', @@ -1056,9 +990,7 @@ describe('PersonState.update()', () => { }) it(`add distinct id and marks user as is_identified when passed $anon_distinct_id person exists and distinct_id does not`, async () => { - await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, false, oldUserUuid, [ - { distinctId: oldUserDistinctId }, - ]) + await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, false, oldUserUuid, [oldUserDistinctId]) const personS = personState({ event: '$identify', @@ -1095,12 +1027,8 @@ describe('PersonState.update()', () => { }) it(`merge into distinct_id person and marks user as is_identified when both persons have is_identified false`, async () => { - await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, false, oldUserUuid, [ - { distinctId: oldUserDistinctId }, - ]) - await hub.db.createPerson(timestamp2, {}, {}, {}, teamId, null, false, newUserUuid, [ - { distinctId: newUserDistinctId }, - ]) + await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, false, oldUserUuid, [oldUserDistinctId]) + await hub.db.createPerson(timestamp2, {}, {}, {}, teamId, null, false, newUserUuid, [newUserDistinctId]) const [person, kafkaAcks] = await personState({ event: '$identify', @@ -1162,12 +1090,8 @@ describe('PersonState.update()', () => { }) it(`merge into distinct_id person and marks user as is_identified when distinct_id user is identified and $anon_distinct_id user is not`, async () => { - await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, false, oldUserUuid, [ - { distinctId: oldUserDistinctId }, - ]) - await hub.db.createPerson(timestamp2, {}, {}, {}, teamId, null, true, newUserUuid, [ - { distinctId: newUserDistinctId }, - ]) + await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, false, oldUserUuid, [oldUserDistinctId]) + await hub.db.createPerson(timestamp2, {}, {}, {}, teamId, null, true, newUserUuid, [newUserDistinctId]) const [person, kafkaAcks] = await personState({ event: '$identify', @@ -1229,12 +1153,8 @@ describe('PersonState.update()', () => { }) it(`does not merge people when distinct_id user is not identified and $anon_distinct_id user is`, async () => { - await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, true, oldUserUuid, [ - { distinctId: oldUserDistinctId }, - ]) - await hub.db.createPerson(timestamp2, {}, {}, {}, teamId, null, false, newUserUuid, [ - { distinctId: newUserDistinctId }, - ]) + await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, true, oldUserUuid, [oldUserDistinctId]) + await hub.db.createPerson(timestamp2, {}, {}, {}, teamId, null, false, newUserUuid, [newUserDistinctId]) const personS = personState({ event: '$identify', @@ -1282,12 +1202,8 @@ describe('PersonState.update()', () => { }) it(`does not merge people when both users are identified`, async () => { - await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, true, oldUserUuid, [ - { distinctId: oldUserDistinctId }, - ]) - await hub.db.createPerson(timestamp2, {}, {}, {}, teamId, null, true, newUserUuid, [ - { distinctId: newUserDistinctId }, - ]) + await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, true, oldUserUuid, [oldUserDistinctId]) + await hub.db.createPerson(timestamp2, {}, {}, {}, teamId, null, true, newUserUuid, [newUserDistinctId]) const [person, kafkaAcks] = await personState({ event: '$identify', @@ -1334,10 +1250,10 @@ describe('PersonState.update()', () => { it(`merge into distinct_id person and updates properties with $set/$set_once`, async () => { await hub.db.createPerson(timestamp, { a: 1, b: 2 }, {}, {}, teamId, null, false, oldUserUuid, [ - { distinctId: oldUserDistinctId }, + oldUserDistinctId, ]) await hub.db.createPerson(timestamp2, { b: 3, c: 4, d: 5 }, {}, {}, teamId, null, false, newUserUuid, [ - { distinctId: newUserDistinctId }, + newUserDistinctId, ]) const [person, kafkaAcks] = await personState({ @@ -1402,9 +1318,7 @@ describe('PersonState.update()', () => { }) it(`handles race condition when other thread creates the user`, async () => { - await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, false, oldUserUuid, [ - { distinctId: oldUserDistinctId }, - ]) + await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, false, oldUserUuid, [oldUserDistinctId]) // Fake the race by assuming createPerson was called before the addDistinctId creation above jest.spyOn(hub.db, 'addDistinctId').mockImplementation(async (person, distinctId) => { @@ -1417,7 +1331,7 @@ describe('PersonState.update()', () => { null, false, uuidFromDistinctId(teamId, distinctId), - [{ distinctId }] + [distinctId] ) await hub.db.addDistinctId(person, distinctId, 0) // this throws }) @@ -1521,12 +1435,8 @@ describe('PersonState.update()', () => { describe(`overrides: ${useOverridesMode}`, () => { // only difference between $merge_dangerously and $identify it(`merge_dangerously can merge people when alias id user is identified`, async () => { - await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, true, oldUserUuid, [ - { distinctId: oldUserDistinctId }, - ]) - await hub.db.createPerson(timestamp2, {}, {}, {}, teamId, null, true, newUserUuid, [ - { distinctId: newUserDistinctId }, - ]) + await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, true, oldUserUuid, [oldUserDistinctId]) + await hub.db.createPerson(timestamp2, {}, {}, {}, teamId, null, true, newUserUuid, [newUserDistinctId]) const [person, kafkaAcks] = await personState({ event: '$merge_dangerously', @@ -1659,7 +1569,7 @@ describe('PersonState.update()', () => { null, false, uuidFromDistinctId(teamId, 'anonymous_id'), - [{ distinctId: 'anonymous_id' }] + ['anonymous_id'] ) const identifiedPerson = await hub.db.createPerson( timestamp, @@ -1670,7 +1580,7 @@ describe('PersonState.update()', () => { null, false, uuidFromDistinctId(teamId, 'new_distinct_id'), - [{ distinctId: 'new_distinct_id' }] + ['new_distinct_id'] ) // existing overrides @@ -1736,7 +1646,7 @@ describe('PersonState.update()', () => { null, false, uuidFromDistinctId(teamId, 'anonymous_id'), - [{ distinctId: 'anonymous_id' }] + ['anonymous_id'] ) const identifiedPerson = await hub.db.createPerson( timestamp, @@ -1747,7 +1657,7 @@ describe('PersonState.update()', () => { null, false, uuidFromDistinctId(teamId, 'new_distinct_id'), - [{ distinctId: 'new_distinct_id' }] + ['new_distinct_id'] ) // existing overrides for both anonPerson and identifiedPerson @@ -1821,7 +1731,7 @@ describe('PersonState.update()', () => { null, false, uuidFromDistinctId(teamId, 'anonymous_id'), - [{ distinctId: 'anonymous_id' }] + ['anonymous_id'] ) const identifiedPerson = await hub.db.createPerson( timestamp, @@ -1832,7 +1742,7 @@ describe('PersonState.update()', () => { null, false, uuidFromDistinctId(teamId, 'new_distinct_id'), - [{ distinctId: 'new_distinct_id' }] + ['new_distinct_id'] ) await insertRow(hub.db.postgres, 'posthog_featureflaghashkeyoverride', { @@ -1904,8 +1814,8 @@ describe('PersonState.update()', () => { describe(`overrides: ${useOverridesMode}`, () => { it(`no-op if persons already merged`, async () => { await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, true, firstUserUuid, [ - { distinctId: firstUserDistinctId }, - { distinctId: secondUserDistinctId }, + firstUserDistinctId, + secondUserDistinctId, ]) const state: PersonState = personState({}, hub) jest.spyOn(hub.db.kafkaProducer, 'queueMessages') @@ -1942,7 +1852,7 @@ describe('PersonState.update()', () => { null, false, firstUserUuid, - [{ distinctId: firstUserDistinctId }] + [firstUserDistinctId] ) const second: InternalPerson = await hub.db.createPerson( timestamp, @@ -1953,7 +1863,7 @@ describe('PersonState.update()', () => { null, false, secondUserUuid, - [{ distinctId: secondUserDistinctId }] + [secondUserDistinctId] ) const state: PersonState = personState({}, hub) @@ -2035,7 +1945,7 @@ describe('PersonState.update()', () => { null, false, firstUserUuid, - [{ distinctId: firstUserDistinctId }] + [firstUserDistinctId] ) const second: InternalPerson = await hub.db.createPerson( timestamp, @@ -2046,7 +1956,7 @@ describe('PersonState.update()', () => { null, false, secondUserUuid, - [{ distinctId: secondUserDistinctId }] + [secondUserDistinctId] ) const state: PersonState = personState({}, hub) @@ -2095,10 +2005,10 @@ describe('PersonState.update()', () => { it(`retries merges up to retry limit if postgres down`, async () => { await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, false, firstUserUuid, [ - { distinctId: firstUserDistinctId }, + firstUserDistinctId, ]) await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, false, secondUserUuid, [ - { distinctId: secondUserDistinctId }, + secondUserDistinctId, ]) const state: PersonState = personState({}, hub) @@ -2144,10 +2054,10 @@ describe('PersonState.update()', () => { it(`handleIdentifyOrAlias does not throw on merge failure`, async () => { // TODO: This the current state, we should probably change it await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, false, firstUserUuid, [ - { distinctId: firstUserDistinctId }, + firstUserDistinctId, ]) await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, false, secondUserUuid, [ - { distinctId: secondUserDistinctId }, + secondUserDistinctId, ]) const state: PersonState = personState( @@ -2207,7 +2117,7 @@ describe('PersonState.update()', () => { null, false, firstUserUuid, - [{ distinctId: firstUserDistinctId }] + [firstUserDistinctId] ) const second: InternalPerson = await hub.db.createPerson( timestamp, @@ -2218,7 +2128,7 @@ describe('PersonState.update()', () => { null, false, secondUserUuid, - [{ distinctId: secondUserDistinctId }] + [secondUserDistinctId] ) const state: PersonState = personState({}, hub) @@ -2335,7 +2245,7 @@ describe('PersonState.update()', () => { null, false, firstUserUuid, - [{ distinctId: firstUserDistinctId }] + [firstUserDistinctId] ) const second: InternalPerson = await hub.db.createPerson( timestamp.plus({ minutes: 2 }), @@ -2346,7 +2256,7 @@ describe('PersonState.update()', () => { null, false, secondUserUuid, - [{ distinctId: secondUserDistinctId }] + [secondUserDistinctId] ) const third: InternalPerson = await hub.db.createPerson( timestamp.plus({ minutes: 5 }), @@ -2357,7 +2267,7 @@ describe('PersonState.update()', () => { null, false, new UUIDT().toString(), - [{ distinctId: 'third' }] + ['third'] ) // We want to simulate a concurrent update to person_overrides. We do @@ -2483,7 +2393,7 @@ describe('PersonState.update()', () => { null, false, firstUserUuid, - [{ distinctId: firstUserDistinctId }] + [firstUserDistinctId] ) const second: InternalPerson = await hub.db.createPerson( timestamp.plus({ minutes: 2 }), @@ -2494,7 +2404,7 @@ describe('PersonState.update()', () => { null, false, secondUserUuid, - [{ distinctId: secondUserDistinctId }] + [secondUserDistinctId] ) const third: InternalPerson = await hub.db.createPerson( timestamp.plus({ minutes: 5 }), @@ -2505,7 +2415,7 @@ describe('PersonState.update()', () => { null, false, new UUIDT().toString(), - [{ distinctId: 'third' }] + ['third'] ) await personState( diff --git a/plugin-server/tests/worker/ingestion/postgres-parity.test.ts b/plugin-server/tests/worker/ingestion/postgres-parity.test.ts index 632241251331d3..28793282754783 100644 --- a/plugin-server/tests/worker/ingestion/postgres-parity.test.ts +++ b/plugin-server/tests/worker/ingestion/postgres-parity.test.ts @@ -78,7 +78,7 @@ describe('postgres parity', () => { null, true, uuid, - [{ distinctId: 'distinct1' }, { distinctId: 'distinct2' }] + ['distinct1', 'distinct2'] ) await delayUntilEventIngested(() => hub.db.fetchPersons(Database.ClickHouse)) await delayUntilEventIngested(() => hub.db.fetchDistinctIdValues(person, Database.ClickHouse), 2) @@ -170,7 +170,7 @@ describe('postgres parity', () => { null, false, uuid, - [{ distinctId: 'distinct1' }, { distinctId: 'distinct2' }] + ['distinct1', 'distinct2'] ) await delayUntilEventIngested(() => hub.db.fetchPersons(Database.ClickHouse)) await delayUntilEventIngested(() => hub.db.fetchDistinctIdValues(person, Database.ClickHouse), 2) @@ -251,7 +251,7 @@ describe('postgres parity', () => { null, true, uuid, - [{ distinctId: 'distinct1' }] + ['distinct1'] ) const anotherPerson = await hub.db.createPerson( DateTime.utc(), @@ -262,7 +262,7 @@ describe('postgres parity', () => { null, true, uuid2, - [{ distinctId: 'another_distinct_id' }] + ['another_distinct_id'] ) await delayUntilEventIngested(() => hub.db.fetchPersons(Database.ClickHouse)) const [postgresPerson] = await hub.db.fetchPersons(Database.Postgres) @@ -334,7 +334,7 @@ describe('postgres parity', () => { null, false, uuid, - [{ distinctId: 'distinct1' }] + ['distinct1'] ) const anotherPerson = await hub.db.createPerson( DateTime.utc(), @@ -345,7 +345,7 @@ describe('postgres parity', () => { null, true, uuid2, - [{ distinctId: 'another_distinct_id' }] + ['another_distinct_id'] ) await delayUntilEventIngested(() => hub.db.fetchPersons(Database.ClickHouse)) const [postgresPerson] = await hub.db.fetchPersons(Database.Postgres) diff --git a/plugin-server/tests/worker/ingestion/process-event.test.ts b/plugin-server/tests/worker/ingestion/process-event.test.ts index e4353bdfc36f02..b9947bb7eec746 100644 --- a/plugin-server/tests/worker/ingestion/process-event.test.ts +++ b/plugin-server/tests/worker/ingestion/process-event.test.ts @@ -63,7 +63,7 @@ describe('EventsProcessor#createEvent()', () => { null, false, personUuid, - [{ distinctId: 'my_id' }] + ['my_id'] ) }) diff --git a/plugin-server/tests/worker/ingestion/properties-updater.test.ts b/plugin-server/tests/worker/ingestion/properties-updater.test.ts index 16cde1c7e84abf..b5bc38d64d2d87 100644 --- a/plugin-server/tests/worker/ingestion/properties-updater.test.ts +++ b/plugin-server/tests/worker/ingestion/properties-updater.test.ts @@ -29,7 +29,7 @@ describe('properties-updater', () => { db = hub.db team = await getFirstTeam(hub) - await db.createPerson(PAST_TIMESTAMP, {}, {}, {}, team.id, null, false, uuid, [{ distinctId }]) + await db.createPerson(PAST_TIMESTAMP, {}, {}, {}, team.id, null, false, uuid, [distinctId]) jest.spyOn(hub.db, 'updateGroup') jest.spyOn(hub.db, 'insertGroup')