From fff6720947c503f7b79ae6d611301f8f40c5de45 Mon Sep 17 00:00:00 2001 From: Brett Hoerner Date: Wed, 17 Apr 2024 13:06:53 -0600 Subject: [PATCH] chore(plugin-server): use uuidv5 for person uuids, based on distinct_id (#21547) * chore(plugin-server): bump uuid package * chore(plugin-server): use uuidv5 for person uuids, based on distinct_id --- plugin-server/package.json | 4 +- plugin-server/pnpm-lock.yaml | 19 +- .../src/worker/ingestion/person-state.ts | 28 +- .../worker/ingestion/person-state.test.ts | 518 ++++++++++-------- 4 files changed, 322 insertions(+), 247 deletions(-) diff --git a/plugin-server/package.json b/plugin-server/package.json index 115cf7a77d565..e5766fa3d44f5 100644 --- a/plugin-server/package.json +++ b/plugin-server/package.json @@ -82,7 +82,7 @@ "re2": "^1.20.3", "safe-stable-stringify": "^2.4.0", "tail": "^2.2.6", - "uuid": "^8.3.2", + "uuid": "^9.0.1", "v8-profiler-next": "^1.9.0", "vm2": "3.9.18" }, @@ -111,7 +111,7 @@ "@types/redlock": "^4.0.1", "@types/snowflake-sdk": "^1.5.1", "@types/tar-stream": "^2.2.0", - "@types/uuid": "^8.3.0", + "@types/uuid": "^9.0.1", "@typescript-eslint/eslint-plugin": "^7.1.1", "@typescript-eslint/parser": "^7.1.1", "babel-eslint": "^10.1.0", diff --git a/plugin-server/pnpm-lock.yaml b/plugin-server/pnpm-lock.yaml index d00ff283b2cc8..05fb5a15d84bd 100644 --- a/plugin-server/pnpm-lock.yaml +++ b/plugin-server/pnpm-lock.yaml @@ -1,4 +1,4 @@ -lockfileVersion: '6.1' +lockfileVersion: '6.0' settings: autoInstallPeers: true @@ -140,8 +140,8 @@ dependencies: specifier: ^2.2.6 version: 2.2.6 uuid: - specifier: ^8.3.2 - version: 8.3.2 + specifier: ^9.0.1 + version: 9.0.1 v8-profiler-next: specifier: ^1.9.0 version: 1.9.0 @@ -223,8 +223,8 @@ devDependencies: specifier: ^2.2.0 version: 2.2.2 '@types/uuid': - specifier: ^8.3.0 - version: 8.3.4 + specifier: ^9.0.1 + version: 9.0.8 '@typescript-eslint/eslint-plugin': specifier: ^7.1.1 version: 7.1.1(@typescript-eslint/parser@7.1.1)(eslint@8.53.0)(typescript@4.9.5) @@ -3764,8 +3764,8 @@ packages: '@types/node': 16.18.25 dev: true - /@types/uuid@8.3.4: - resolution: {integrity: sha512-c/I8ZRb51j+pYGAu5CrFMRxqZ2ke4y2grEBO5AUjgSkSk+qT2Ea+OdWElz/OiMf5MNpn2b17kuVBwZLQJXzihw==} + /@types/uuid@9.0.8: + resolution: {integrity: sha512-jg+97EGIcY9AGHJJRaaPVgetKDsrTgbRjQ5Msgjh/DQKEFl0DtyRr/VCOyD1T2R1MNeWPK/u7JoGhlDZnKBAfA==} dev: true /@types/yargs-parser@21.0.0: @@ -10396,6 +10396,11 @@ packages: hasBin: true dev: false + /uuid@9.0.1: + resolution: {integrity: sha512-b+1eJOlsR9K8HJpow9Ok3fiWOWSIcIzXodvv0rQjVoOVNpWMpxf1wZNpt4y9h10odCNrqnYp1OBzRktckBe3sA==} + hasBin: true + dev: false + /v8-compile-cache-lib@3.0.1: resolution: {integrity: sha512-wa7YjyUGfNZngI/vtK0UHAN+lgDCxBPCylVXGp0zu59Fz5aiGtNXaq3DhIov063MorB+VfufLh3JlF2KdTK3xg==} dev: true diff --git a/plugin-server/src/worker/ingestion/person-state.ts b/plugin-server/src/worker/ingestion/person-state.ts index 00468129cfefc..b7cd3b8b6afcc 100644 --- a/plugin-server/src/worker/ingestion/person-state.ts +++ b/plugin-server/src/worker/ingestion/person-state.ts @@ -4,6 +4,7 @@ import { ProducerRecord } from 'kafkajs' import { DateTime } from 'luxon' import { Counter } from 'prom-client' import { KafkaProducerWrapper } from 'utils/db/kafka-producer-wrapper' +import { parse as parseUuid, v5 as uuidv5 } from 'uuid' import { KAFKA_PERSON_OVERRIDE } from '../../config/kafka-topics' import { Person, PropertyUpdateOperation, TimestampFormat } from '../../types' @@ -13,7 +14,7 @@ import { timeoutGuard } from '../../utils/db/utils' import { PeriodicTask } from '../../utils/periodic-task' import { promiseRetry } from '../../utils/retries' import { status } from '../../utils/status' -import { castTimestampOrNow, UUIDT } from '../../utils/utils' +import { castTimestampOrNow } from '../../utils/utils' import { captureIngestionWarning } from './utils' export const mergeFinalFailuresCounter = new Counter({ @@ -33,6 +34,15 @@ export const mergeTxnSuccessCounter = new Counter({ labelNames: ['call', 'oldPersonIdentified', 'newPersonIdentified', 'poEEmbraceJoin'], }) +// UUIDv5 requires a namespace, which is itself a UUID. This was a randomly generated UUIDv4 +// that must be used to deterministrically generate UUIDv5s for Person rows. +const PERSON_UUIDV5_NAMESPACE = parseUuid('932979b4-65c3-4424-8467-0b66ec27bc22') + +function uuidFromDistinctId(teamId: number, distinctId: string): string { + // Deterministcally create a UUIDv5 based on the (team_id, distinct_id) pair. + return uuidv5(`${teamId}:${distinctId}`, PERSON_UUIDV5_NAMESPACE) +} + // used to prevent identify from being used with generic IDs // that we can safely assume stem from a bug or mistake // used to prevent identify from being used with generic IDs @@ -81,7 +91,6 @@ const isDistinctIdIllegal = (id: string): boolean => { // This class is responsible for creating/updating a single person through the process-event pipeline export class PersonState { private eventProperties: Properties - private newUuid: string public updateIsIdentified: boolean // TODO: remove this from the class and being hidden @@ -92,11 +101,9 @@ export class PersonState { private timestamp: DateTime, private processPerson: boolean, // $process_person_profile flag from the event private db: DB, - private personOverrideWriter?: DeferredPersonOverrideWriter, - uuid: UUIDT | undefined = undefined + private personOverrideWriter?: DeferredPersonOverrideWriter ) { this.eventProperties = event.properties! - this.newUuid = (uuid || new UUIDT()).toString() // If set to true, we'll update `is_identified` at the end of `updateProperties` // :KLUDGE: This is an indirect communication channel between `handleIdentifyOrAlias` and `updateProperties` @@ -172,7 +179,6 @@ export class PersonState { null, // :NOTE: This should never be set in this branch, but adding this for logical consistency this.updateIsIdentified, - this.newUuid, this.event.uuid, [this.distinctId] ) @@ -186,10 +192,14 @@ export class PersonState { teamId: number, isUserId: number | null, isIdentified: boolean, - uuid: string, creatorEventUuid: string, - distinctIds?: string[] + distinctIds: string[] ): Promise { + if (distinctIds.length < 1) { + throw new Error('at least 1 distinctId is required in `createPerson`') + } + const uuid = uuidFromDistinctId(teamId, distinctIds[0]) + const props = { ...propertiesOnce, ...properties, ...{ $creator_event_uuid: creatorEventUuid } } const propertiesLastOperation: Record = {} const propertiesLastUpdatedAt: Record = {} @@ -389,6 +399,7 @@ export class PersonState { otherPersonDistinctId: otherPersonDistinctId, }) } + // The last case: (!oldPerson && !newPerson) return await this.createPerson( // TODO: in this case we could skip the properties updates later @@ -398,7 +409,6 @@ export class PersonState { teamId, null, true, - this.newUuid, this.event.uuid, [mergeIntoDistinctId, otherPersonDistinctId] ) diff --git a/plugin-server/tests/worker/ingestion/person-state.test.ts b/plugin-server/tests/worker/ingestion/person-state.test.ts index 09d5e7e58784b..9b634cd59fa1e 100644 --- a/plugin-server/tests/worker/ingestion/person-state.test.ts +++ b/plugin-server/tests/worker/ingestion/person-state.test.ts @@ -1,5 +1,6 @@ import { PluginEvent } from '@posthog/plugin-scaffold' import { DateTime } from 'luxon' +import { parse as parseUuid, v5 as uuidv5 } from 'uuid' import { waitForExpect } from '../../../functional_tests/expectations' import { Database, Hub, Person } from '../../../src/types' @@ -33,6 +34,17 @@ interface PersonOverridesMode { ): Promise> } +function uuidFromDistinctId(teamId: number, distinctId: string): string { + // The UUID generation code here is deliberately copied from `person-state` rather than imported, + // so that someone can't accidentally change how `person-state` UUID generation works and still + // have the tests pass. + // + // It is very important that Person UUIDs are deterministically generated and that this format + // doesn't change without a lot of thought and planning about side effects! + const namespace = parseUuid('932979b4-65c3-4424-8467-0b66ec27bc22') + return uuidv5(`${teamId}:${distinctId}`, namespace) +} + const PersonOverridesModes: Record = { disabled: undefined, 'deferred, without mappings (flat)': { @@ -59,12 +71,20 @@ describe('PersonState.update()', () => { let hub: Hub let closeHub: () => Promise - let uuid: UUIDT - let uuid2: UUIDT let teamId: number let overridesMode: PersonOverridesMode | undefined let organizationId: string + // Common Distinct IDs (and their deterministic UUIDs) used in tests below. + const newUserDistinctId = 'new-user' + let newUserUuid: string + const oldUserDistinctId = 'old-user' + let oldUserUuid: string + const firstUserDistinctId = 'first' + let firstUserUuid: string + const secondUserDistinctId = 'second' + let secondUserUuid: string + beforeAll(async () => { ;[hub, closeHub] = await createHub({}) await hub.db.clickhouseQuery('SYSTEM STOP MERGES') @@ -74,11 +94,14 @@ describe('PersonState.update()', () => { beforeEach(async () => { overridesMode = undefined - uuid = new UUIDT() - uuid2 = new UUIDT() teamId = await createTeam(hub.db.postgres, organizationId) + newUserUuid = uuidFromDistinctId(teamId, newUserDistinctId) + oldUserUuid = uuidFromDistinctId(teamId, oldUserDistinctId) + firstUserUuid = uuidFromDistinctId(teamId, firstUserDistinctId) + secondUserUuid = uuidFromDistinctId(teamId, secondUserDistinctId) + jest.spyOn(hub.db, 'fetchPerson') jest.spyOn(hub.db, 'updatePersonDeprecated') @@ -109,8 +132,7 @@ describe('PersonState.update()', () => { timestamp, processPerson, customHub ? customHub.db : hub.db, - overridesMode?.getWriter(customHub ?? hub), - uuid + overridesMode?.getWriter(customHub ?? hub) ) } @@ -138,11 +160,35 @@ describe('PersonState.update()', () => { } describe('on person creation', () => { + it('creates deterministic person uuids that are different between teams', async () => { + const event_uuid = new UUIDT().toString() + const primaryTeamId = teamId + const personPrimaryTeam = await personState({ + event: '$pageview', + distinct_id: newUserDistinctId, + uuid: event_uuid, + }).updateProperties() + + const otherTeamId = await createTeam(hub.db.postgres, organizationId) + teamId = otherTeamId + const personOtherTeam = await personState({ + event: '$pageview', + distinct_id: newUserDistinctId, + uuid: event_uuid, + }).updateProperties() + + await hub.db.kafkaProducer.flush() + + expect(personPrimaryTeam.uuid).toEqual(uuidFromDistinctId(primaryTeamId, newUserDistinctId)) + expect(personOtherTeam.uuid).toEqual(uuidFromDistinctId(otherTeamId, newUserDistinctId)) + expect(personPrimaryTeam.uuid).not.toEqual(personOtherTeam.uuid) + }) + it('creates person if they are new', async () => { const event_uuid = new UUIDT().toString() const person = await personState({ event: '$pageview', - distinct_id: 'new-user', + distinct_id: newUserDistinctId, uuid: event_uuid, // `null_byte` validates that `sanitizeJsonbValue` is working as expected properties: { $set: { null_byte: '\u0000' } }, @@ -152,7 +198,7 @@ describe('PersonState.update()', () => { expect(person).toEqual( expect.objectContaining({ id: expect.any(Number), - uuid: uuid.toString(), + uuid: newUserUuid, properties: { $creator_event_uuid: event_uuid, null_byte: '\uFFFD' }, created_at: timestamp, version: 0, @@ -170,7 +216,7 @@ describe('PersonState.update()', () => { // verify Postgres distinct_ids const distinctIds = await hub.db.fetchDistinctIdValues(persons[0]) - expect(distinctIds).toEqual(expect.arrayContaining(['new-user'])) + expect(distinctIds).toEqual(expect.arrayContaining([newUserDistinctId])) }) it('creates person if they are new and $process_person_profile=false', async () => { @@ -181,7 +227,7 @@ describe('PersonState.update()', () => { const person = await personState( { event: '$pageview', - distinct_id: 'new-user', + distinct_id: newUserDistinctId, uuid: event_uuid, properties: { $process_person_profile: false, $set: { a: 1 }, $set_once: { b: 2 } }, }, @@ -193,7 +239,7 @@ describe('PersonState.update()', () => { expect(person).toEqual( expect.objectContaining({ id: expect.any(Number), - uuid: uuid.toString(), + uuid: newUserUuid, properties: {}, created_at: timestamp, version: 0, @@ -213,14 +259,14 @@ describe('PersonState.update()', () => { // verify Postgres distinct_ids const distinctIds = await hub.db.fetchDistinctIdValues(persons[0]) - expect(distinctIds).toEqual(expect.arrayContaining(['new-user'])) + expect(distinctIds).toEqual(expect.arrayContaining([newUserDistinctId])) }) it('does not attach existing person properties to $process_person_profile=false events', async () => { const originalEventUuid = new UUIDT().toString() const person = await personState({ event: '$pageview', - distinct_id: 'new-user', + distinct_id: newUserDistinctId, uuid: originalEventUuid, properties: { $set: { c: 420 } }, }).update() @@ -229,7 +275,7 @@ describe('PersonState.update()', () => { expect(person).toEqual( expect.objectContaining({ id: expect.any(Number), - uuid: uuid.toString(), + uuid: newUserUuid, properties: { $creator_event_uuid: originalEventUuid, c: 420 }, created_at: timestamp, version: 0, @@ -244,13 +290,13 @@ describe('PersonState.update()', () => { // verify Postgres distinct_ids const distinctIds = await hub.db.fetchDistinctIdValues(persons[0]) - expect(distinctIds).toEqual(expect.arrayContaining(['new-user'])) + expect(distinctIds).toEqual(expect.arrayContaining([newUserDistinctId])) // OK, a person now exists with { c: 420 }, let's prove the properties come back out // of the DB. const personVerifyProps = await personState({ event: '$pageview', - distinct_id: 'new-user', + distinct_id: newUserDistinctId, uuid: new UUIDT().toString(), properties: {}, }).update() @@ -260,7 +306,7 @@ describe('PersonState.update()', () => { const processPersonFalseResult = await personState( { event: '$pageview', - distinct_id: 'new-user', + distinct_id: newUserDistinctId, uuid: new UUIDT().toString(), properties: {}, }, @@ -271,20 +317,20 @@ describe('PersonState.update()', () => { }) it('handles person being created in a race condition', async () => { - await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, false, uuid.toString(), ['new-user']) + await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, false, newUserUuid, [newUserDistinctId]) jest.spyOn(hub.db, 'fetchPerson').mockImplementationOnce(() => { return Promise.resolve(undefined) }) - const person = await personState({ event: '$pageview', distinct_id: 'new-user' }).handleUpdate() + const person = await personState({ event: '$pageview', distinct_id: newUserDistinctId }).handleUpdate() await hub.db.kafkaProducer.flush() // if creation fails we should return the person that another thread already created expect(person).toEqual( expect.objectContaining({ id: expect.any(Number), - uuid: uuid.toString(), + uuid: newUserUuid, properties: {}, created_at: timestamp, version: 0, @@ -299,12 +345,12 @@ describe('PersonState.update()', () => { // verify Postgres distinct_ids const distinctIds = await hub.db.fetchDistinctIdValues(person) - expect(distinctIds).toEqual(expect.arrayContaining(['new-user'])) + expect(distinctIds).toEqual(expect.arrayContaining([newUserDistinctId])) }) it('handles person being created in a race condition updates properties if needed', async () => { - await hub.db.createPerson(timestamp, { b: 3, c: 4 }, {}, {}, teamId, null, false, uuid.toString(), [ - 'new-user', + await hub.db.createPerson(timestamp, { b: 3, c: 4 }, {}, {}, teamId, null, false, newUserUuid, [ + newUserDistinctId, ]) jest.spyOn(hub.db, 'fetchPerson').mockImplementationOnce(() => { @@ -313,7 +359,7 @@ describe('PersonState.update()', () => { const person = await personState({ event: '$pageview', - distinct_id: 'new-user', + distinct_id: newUserDistinctId, properties: { $set_once: { c: 3, e: 4 }, $set: { b: 4 }, @@ -325,7 +371,7 @@ describe('PersonState.update()', () => { expect(person).toEqual( expect.objectContaining({ id: expect.any(Number), - uuid: uuid.toString(), + uuid: newUserUuid, properties: { b: 4, c: 4, e: 4 }, created_at: timestamp, version: 1, @@ -340,13 +386,13 @@ describe('PersonState.update()', () => { // verify Postgres distinct_ids const distinctIds = await hub.db.fetchDistinctIdValues(person) - expect(distinctIds).toEqual(expect.arrayContaining(['new-user'])) + expect(distinctIds).toEqual(expect.arrayContaining([newUserDistinctId])) }) it('creates person with properties', async () => { const person = await personState({ event: '$pageview', - distinct_id: 'new-user', + distinct_id: newUserDistinctId, properties: { $set_once: { a: 1, b: 2 }, $set: { b: 3, c: 4 }, @@ -357,7 +403,7 @@ describe('PersonState.update()', () => { expect(person).toEqual( expect.objectContaining({ id: expect.any(Number), - uuid: uuid.toString(), + uuid: newUserUuid, properties: { a: 1, b: 3, c: 4 }, created_at: timestamp, version: 0, @@ -375,7 +421,7 @@ describe('PersonState.update()', () => { // verify Postgres distinct_ids const distinctIds = await hub.db.fetchDistinctIdValues(persons[0]) - expect(distinctIds).toEqual(expect.arrayContaining(['new-user'])) + expect(distinctIds).toEqual(expect.arrayContaining([newUserDistinctId])) }) }) @@ -389,13 +435,13 @@ describe('PersonState.update()', () => { teamId, null, false, - uuid.toString(), - ['new-user'] + newUserUuid, + [newUserDistinctId] ) const person = await personState({ event: '$pageview', - distinct_id: 'new-user', + distinct_id: newUserDistinctId, properties: { $set_once: { c: 3, e: 4 }, $set: { b: 4, toString: 1, null_byte: '\u0000' }, @@ -406,7 +452,7 @@ describe('PersonState.update()', () => { expect(person).toEqual( expect.objectContaining({ id: expect.any(Number), - uuid: uuid.toString(), + uuid: newUserUuid, // `null_byte` validates that `sanitizeJsonbValue` is working as expected properties: { b: 4, c: 4, e: 4, toString: 1, null_byte: '\uFFFD' }, created_at: timestamp, @@ -432,13 +478,13 @@ describe('PersonState.update()', () => { teamId, null, false, - uuid.toString(), - ['new-user'] + newUserUuid, + [newUserDistinctId] ) const personS = personState({ event: '$pageview', - distinct_id: 'new-user', + distinct_id: newUserDistinctId, properties: { $set_once: { c: 3, e: 4 }, $set: { b: 4 }, @@ -451,7 +497,7 @@ describe('PersonState.update()', () => { expect(person).toEqual( expect.objectContaining({ id: expect.any(Number), - uuid: uuid.toString(), + uuid: newUserUuid, properties: { b: 4, c: 4, e: 4 }, created_at: timestamp, version: 1, @@ -468,13 +514,13 @@ describe('PersonState.update()', () => { }) it('does not update person if not needed', async () => { - await hub.db.createPerson(timestamp, { b: 3, c: 4 }, {}, {}, teamId, null, false, uuid.toString(), [ - 'new-user', + await hub.db.createPerson(timestamp, { b: 3, c: 4 }, {}, {}, teamId, null, false, newUserUuid, [ + newUserDistinctId, ]) const person = await personState({ event: '$pageview', - distinct_id: 'new-user', + distinct_id: newUserDistinctId, properties: { $set_once: { c: 3 }, $set: { b: 3 }, @@ -485,7 +531,7 @@ describe('PersonState.update()', () => { expect(person).toEqual( expect.objectContaining({ id: expect.any(Number), - uuid: uuid.toString(), + uuid: newUserUuid, properties: { b: 3, c: 4 }, created_at: timestamp, version: 0, @@ -503,10 +549,10 @@ describe('PersonState.update()', () => { }) it('marks user as is_identified', async () => { - await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, false, uuid.toString(), ['new-user']) + await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, false, newUserUuid, [newUserDistinctId]) const personS = personState({ event: '$pageview', - distinct_id: 'new-user', + distinct_id: newUserDistinctId, properties: {}, }) personS.updateIsIdentified = true @@ -516,7 +562,7 @@ describe('PersonState.update()', () => { expect(person).toEqual( expect.objectContaining({ id: expect.any(Number), - uuid: uuid.toString(), + uuid: newUserUuid, properties: {}, created_at: timestamp, version: 1, @@ -549,18 +595,18 @@ describe('PersonState.update()', () => { properties: { a: 5, b: 7 }, is_user_id: 0, is_identified: false, - uuid: uuid2.toString(), + uuid: uuidFromDistinctId(teamId, 'deleted-user'), properties_last_updated_at: {}, properties_last_operation: {}, } - await hub.db.createPerson(timestamp, { a: 6, c: 8 }, {}, {}, teamId, null, true, uuid.toString(), [ - 'new-user', - 'old-user', + await hub.db.createPerson(timestamp, { a: 6, c: 8 }, {}, {}, teamId, null, true, newUserUuid, [ + newUserDistinctId, + oldUserDistinctId, ]) // the merged Person const personS = personState({ event: '$pageview', - distinct_id: 'new-user', + distinct_id: newUserDistinctId, properties: { $set: { a: 7, d: 9 } }, }) jest.spyOn(personS, 'handleIdentifyOrAlias').mockReturnValue(Promise.resolve(mergeDeletedPerson)) @@ -571,7 +617,7 @@ describe('PersonState.update()', () => { expect(person).toEqual( expect.objectContaining({ id: expect.any(Number), - uuid: uuid.toString(), + uuid: newUserUuid, properties: { a: 7, c: 8, d: 9 }, created_at: timestamp, version: 1, @@ -598,7 +644,7 @@ describe('PersonState.update()', () => { it(`no-op when $anon_distinct_id not passed`, async () => { const person = await personState({ event: '$identify', - distinct_id: 'new-user', + distinct_id: newUserDistinctId, properties: { $set: { foo: 'bar' }, }, @@ -613,10 +659,10 @@ describe('PersonState.update()', () => { it(`creates person with both distinct_ids and marks user as is_identified when $anon_distinct_id passed`, async () => { const person = await personState({ event: '$identify', - distinct_id: 'new-user', + distinct_id: newUserDistinctId, properties: { $set: { foo: 'bar' }, - $anon_distinct_id: 'old-user', + $anon_distinct_id: oldUserDistinctId, }, }).handleIdentifyOrAlias() await hub.db.kafkaProducer.flush() @@ -624,7 +670,7 @@ describe('PersonState.update()', () => { expect(person).toEqual( expect.objectContaining({ id: expect.any(Number), - uuid: uuid.toString(), + uuid: newUserUuid, properties: { foo: 'bar' }, created_at: timestamp, version: 0, @@ -641,20 +687,20 @@ describe('PersonState.update()', () => { // verify Postgres distinct_ids const distinctIds = await hub.db.fetchDistinctIdValues(persons[0]) - expect(distinctIds).toEqual(expect.arrayContaining(['old-user', 'new-user'])) + expect(distinctIds).toEqual(expect.arrayContaining([oldUserDistinctId, newUserDistinctId])) }) it(`marks is_identified to be updated when no changes to distinct_ids but $anon_distinct_id passe`, async () => { - await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, false, uuid.toString(), [ - 'new-user', - 'old-user', + await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, false, newUserUuid, [ + newUserDistinctId, + oldUserDistinctId, ]) const personS = personState({ event: '$identify', - distinct_id: 'new-user', + distinct_id: newUserDistinctId, properties: { - $anon_distinct_id: 'old-user', + $anon_distinct_id: oldUserDistinctId, }, }) const person = await personS.handleIdentifyOrAlias() @@ -663,7 +709,7 @@ describe('PersonState.update()', () => { expect(person).toEqual( expect.objectContaining({ id: expect.any(Number), - uuid: uuid.toString(), + uuid: newUserUuid, properties: {}, created_at: timestamp, version: 0, @@ -679,13 +725,13 @@ describe('PersonState.update()', () => { }) it(`add distinct id and marks user is_identified when passed $anon_distinct_id person does not exists and distinct_id does`, async () => { - await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, false, uuid.toString(), ['new-user']) + await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, false, newUserUuid, [newUserDistinctId]) const personS = personState({ event: '$identify', - distinct_id: 'new-user', + distinct_id: newUserDistinctId, properties: { - $anon_distinct_id: 'old-user', + $anon_distinct_id: oldUserDistinctId, }, }) const person = await personS.handleIdentifyOrAlias() @@ -695,7 +741,7 @@ describe('PersonState.update()', () => { expect(person).toEqual( expect.objectContaining({ id: expect.any(Number), - uuid: uuid.toString(), + uuid: newUserUuid, properties: {}, created_at: timestamp, version: 0, @@ -710,17 +756,17 @@ describe('PersonState.update()', () => { // verify Postgres distinct_ids const distinctIds = await hub.db.fetchDistinctIdValues(persons[0]) - expect(distinctIds).toEqual(expect.arrayContaining(['old-user', 'new-user'])) + expect(distinctIds).toEqual(expect.arrayContaining([oldUserDistinctId, newUserDistinctId])) }) it(`add distinct id and marks user as is_identified when passed $anon_distinct_id person exists and distinct_id does not`, async () => { - await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, false, uuid.toString(), ['old-user']) + await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, false, oldUserUuid, [oldUserDistinctId]) const personS = personState({ event: '$identify', - distinct_id: 'new-user', + distinct_id: newUserDistinctId, properties: { - $anon_distinct_id: 'old-user', + $anon_distinct_id: oldUserDistinctId, }, }) const person = await personS.handleIdentifyOrAlias() @@ -731,7 +777,7 @@ describe('PersonState.update()', () => { expect(person).toEqual( expect.objectContaining({ id: expect.any(Number), - uuid: uuid.toString(), + uuid: oldUserUuid, properties: {}, created_at: timestamp, version: 0, @@ -746,18 +792,18 @@ describe('PersonState.update()', () => { // verify Postgres distinct_ids const distinctIds = await hub.db.fetchDistinctIdValues(persons[0]) - expect(distinctIds).toEqual(expect.arrayContaining(['old-user', 'new-user'])) + expect(distinctIds).toEqual(expect.arrayContaining([oldUserDistinctId, newUserDistinctId])) }) it(`merge into distinct_id person and marks user as is_identified when both persons have is_identified false`, async () => { - await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, false, uuid.toString(), ['old-user']) - await hub.db.createPerson(timestamp2, {}, {}, {}, teamId, null, false, uuid2.toString(), ['new-user']) + await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, false, oldUserUuid, [oldUserDistinctId]) + await hub.db.createPerson(timestamp2, {}, {}, {}, teamId, null, false, newUserUuid, [newUserDistinctId]) const person = await personState({ event: '$identify', - distinct_id: 'new-user', + distinct_id: newUserDistinctId, properties: { - $anon_distinct_id: 'old-user', + $anon_distinct_id: oldUserDistinctId, }, }).handleIdentifyOrAlias() await hub.db.kafkaProducer.flush() @@ -777,11 +823,11 @@ describe('PersonState.update()', () => { const persons = await fetchPostgresPersonsH() expect(persons.length).toEqual(1) expect(persons[0]).toEqual(person) - expect([uuid.toString(), uuid2.toString()]).toContain(persons[0].uuid) + expect([newUserUuid, oldUserUuid]).toContain(persons[0].uuid) // verify Postgres distinct_ids const distinctIds = await hub.db.fetchDistinctIdValues(persons[0]) - expect(distinctIds).toEqual(expect.arrayContaining(['old-user', 'new-user'])) + expect(distinctIds).toEqual(expect.arrayContaining([oldUserDistinctId, newUserDistinctId])) // verify ClickHouse persons await delayUntilEventIngested(() => fetchPersonsRowsWithVersionHigerEqualThan(), 2) // wait until merge and delete processed @@ -803,25 +849,23 @@ describe('PersonState.update()', () => { }), ]) ) - expect(new Set(clickhousePersons.map((p) => p.id))).toEqual( - new Set([uuid.toString(), uuid2.toString()]) - ) + expect(new Set(clickhousePersons.map((p) => p.id))).toEqual(new Set([newUserUuid, oldUserUuid])) // verify ClickHouse distinct_ids await delayUntilEventIngested(() => fetchDistinctIdsClickhouseVersion1()) const clickHouseDistinctIds = await fetchDistinctIdsClickhouse(persons[0]) - expect(clickHouseDistinctIds).toEqual(expect.arrayContaining(['old-user', 'new-user'])) + expect(clickHouseDistinctIds).toEqual(expect.arrayContaining([oldUserDistinctId, newUserDistinctId])) }) it(`merge into distinct_id person and marks user as is_identified when distinct_id user is identified and $anon_distinct_id user is not`, async () => { - await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, false, uuid.toString(), ['old-user']) - await hub.db.createPerson(timestamp2, {}, {}, {}, teamId, null, true, uuid2.toString(), ['new-user']) + await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, false, oldUserUuid, [oldUserDistinctId]) + await hub.db.createPerson(timestamp2, {}, {}, {}, teamId, null, true, newUserUuid, [newUserDistinctId]) const person = await personState({ event: '$identify', - distinct_id: 'new-user', + distinct_id: newUserDistinctId, properties: { - $anon_distinct_id: 'old-user', + $anon_distinct_id: oldUserDistinctId, }, }).handleIdentifyOrAlias() await hub.db.kafkaProducer.flush() @@ -841,11 +885,11 @@ describe('PersonState.update()', () => { const persons = await fetchPostgresPersonsH() expect(persons.length).toEqual(1) expect(persons[0]).toEqual(person) - expect([uuid.toString(), uuid2.toString()]).toContain(persons[0].uuid) + expect([newUserUuid, oldUserUuid]).toContain(persons[0].uuid) // verify Postgres distinct_ids const distinctIds = await hub.db.fetchDistinctIdValues(persons[0]) - expect(distinctIds).toEqual(expect.arrayContaining(['old-user', 'new-user'])) + expect(distinctIds).toEqual(expect.arrayContaining([oldUserDistinctId, newUserDistinctId])) // verify ClickHouse persons await delayUntilEventIngested(() => fetchPersonsRowsWithVersionHigerEqualThan(), 2) // wait until merge and delete processed @@ -867,25 +911,23 @@ describe('PersonState.update()', () => { }), ]) ) - expect(new Set(clickhousePersons.map((p) => p.id))).toEqual( - new Set([uuid.toString(), uuid2.toString()]) - ) + expect(new Set(clickhousePersons.map((p) => p.id))).toEqual(new Set([newUserUuid, oldUserUuid])) // verify ClickHouse distinct_ids await delayUntilEventIngested(() => fetchDistinctIdsClickhouseVersion1()) const clickHouseDistinctIds = await fetchDistinctIdsClickhouse(persons[0]) - expect(clickHouseDistinctIds).toEqual(expect.arrayContaining(['old-user', 'new-user'])) + expect(clickHouseDistinctIds).toEqual(expect.arrayContaining([oldUserDistinctId, newUserDistinctId])) }) it(`does not merge people when distinct_id user is not identified and $anon_distinct_id user is`, async () => { - await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, true, uuid.toString(), ['old-user']) - await hub.db.createPerson(timestamp2, {}, {}, {}, teamId, null, false, uuid2.toString(), ['new-user']) + await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, true, oldUserUuid, [oldUserDistinctId]) + await hub.db.createPerson(timestamp2, {}, {}, {}, teamId, null, false, newUserUuid, [newUserDistinctId]) const personS = personState({ event: '$identify', - distinct_id: 'new-user', + distinct_id: newUserDistinctId, properties: { - $anon_distinct_id: 'old-user', + $anon_distinct_id: oldUserDistinctId, }, }) const person = await personS.handleIdentifyOrAlias() @@ -895,7 +937,7 @@ describe('PersonState.update()', () => { expect(person).toEqual( expect.objectContaining({ id: expect.any(Number), - uuid: uuid2.toString(), + uuid: newUserUuid, properties: {}, created_at: timestamp2, version: 0, @@ -909,7 +951,7 @@ describe('PersonState.update()', () => { expect(persons[0]).toEqual( expect.objectContaining({ id: expect.any(Number), - uuid: uuid.toString(), + uuid: oldUserUuid, properties: {}, created_at: timestamp, version: 0, @@ -920,20 +962,20 @@ describe('PersonState.update()', () => { // verify Postgres distinct_ids const distinctIds = await hub.db.fetchDistinctIdValues(persons[0]) - expect(distinctIds).toEqual(expect.arrayContaining(['old-user'])) + expect(distinctIds).toEqual(expect.arrayContaining([oldUserDistinctId])) const distinctIds2 = await hub.db.fetchDistinctIdValues(persons[1]) - expect(distinctIds2).toEqual(expect.arrayContaining(['new-user'])) + expect(distinctIds2).toEqual(expect.arrayContaining([newUserDistinctId])) }) it(`does not merge people when both users are identified`, async () => { - await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, true, uuid.toString(), ['old-user']) - await hub.db.createPerson(timestamp2, {}, {}, {}, teamId, null, true, uuid2.toString(), ['new-user']) + await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, true, oldUserUuid, [oldUserDistinctId]) + await hub.db.createPerson(timestamp2, {}, {}, {}, teamId, null, true, newUserUuid, [newUserDistinctId]) const person = await personState({ event: '$identify', - distinct_id: 'new-user', + distinct_id: newUserDistinctId, properties: { - $anon_distinct_id: 'old-user', + $anon_distinct_id: oldUserDistinctId, }, }).handleIdentifyOrAlias() await hub.db.kafkaProducer.flush() @@ -941,7 +983,7 @@ describe('PersonState.update()', () => { expect(person).toEqual( expect.objectContaining({ id: expect.any(Number), - uuid: uuid2.toString(), + uuid: newUserUuid, properties: {}, created_at: timestamp2, version: 0, @@ -955,7 +997,7 @@ describe('PersonState.update()', () => { expect(persons[0]).toEqual( expect.objectContaining({ id: expect.any(Number), - uuid: uuid.toString(), + uuid: oldUserUuid, properties: {}, created_at: timestamp, version: 0, @@ -966,34 +1008,26 @@ describe('PersonState.update()', () => { // verify Postgres distinct_ids const distinctIds = await hub.db.fetchDistinctIdValues(persons[0]) - expect(distinctIds).toEqual(expect.arrayContaining(['old-user'])) + expect(distinctIds).toEqual(expect.arrayContaining([oldUserDistinctId])) const distinctIds2 = await hub.db.fetchDistinctIdValues(persons[1]) - expect(distinctIds2).toEqual(expect.arrayContaining(['new-user'])) + expect(distinctIds2).toEqual(expect.arrayContaining([newUserDistinctId])) }) it(`merge into distinct_id person and updates properties with $set/$set_once`, async () => { - await hub.db.createPerson(timestamp, { a: 1, b: 2 }, {}, {}, teamId, null, false, uuid.toString(), [ - 'old-user', + await hub.db.createPerson(timestamp, { a: 1, b: 2 }, {}, {}, teamId, null, false, oldUserUuid, [ + oldUserDistinctId, + ]) + await hub.db.createPerson(timestamp2, { b: 3, c: 4, d: 5 }, {}, {}, teamId, null, false, newUserUuid, [ + newUserDistinctId, ]) - await hub.db.createPerson( - timestamp2, - { b: 3, c: 4, d: 5 }, - {}, - {}, - teamId, - null, - false, - uuid2.toString(), - ['new-user'] - ) const person = await personState({ event: '$identify', - distinct_id: 'new-user', + distinct_id: newUserDistinctId, properties: { $set: { d: 6, e: 7 }, $set_once: { a: 8, f: 9 }, - $anon_distinct_id: 'old-user', + $anon_distinct_id: oldUserDistinctId, }, }).handleIdentifyOrAlias() await hub.db.kafkaProducer.flush() @@ -1013,11 +1047,11 @@ describe('PersonState.update()', () => { const persons = await fetchPostgresPersonsH() expect(persons.length).toEqual(1) expect(persons[0]).toEqual(person) - expect([uuid.toString(), uuid2.toString()]).toContain(persons[0].uuid) + expect([newUserUuid, oldUserUuid]).toContain(persons[0].uuid) // verify Postgres distinct_ids const distinctIds = await hub.db.fetchDistinctIdValues(persons[0]) - expect(distinctIds).toEqual(expect.arrayContaining(['old-user', 'new-user'])) + expect(distinctIds).toEqual(expect.arrayContaining([oldUserDistinctId, newUserDistinctId])) // verify ClickHouse persons await delayUntilEventIngested(() => fetchPersonsRowsWithVersionHigerEqualThan(), 2) // wait until merge and delete processed @@ -1039,32 +1073,38 @@ describe('PersonState.update()', () => { }), ]) ) - expect(new Set(clickhousePersons.map((p) => p.id))).toEqual( - new Set([uuid.toString(), uuid2.toString()]) - ) + expect(new Set(clickhousePersons.map((p) => p.id))).toEqual(new Set([newUserUuid, oldUserUuid])) // verify ClickHouse distinct_ids await delayUntilEventIngested(() => fetchDistinctIdsClickhouseVersion1()) const clickHouseDistinctIds = await fetchDistinctIdsClickhouse(persons[0]) - expect(clickHouseDistinctIds).toEqual(expect.arrayContaining(['old-user', 'new-user'])) + expect(clickHouseDistinctIds).toEqual(expect.arrayContaining([oldUserDistinctId, newUserDistinctId])) }) it(`handles race condition when other thread creates the user`, async () => { - await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, false, uuid.toString(), ['old-user']) + await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, false, oldUserUuid, [oldUserDistinctId]) // Fake the race by assuming createPerson was called before the addDistinctId creation above jest.spyOn(hub.db, 'addDistinctId').mockImplementation(async (person, distinctId) => { - await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, false, uuid2.toString(), [ - distinctId, - ]) + await hub.db.createPerson( + timestamp, + {}, + {}, + {}, + teamId, + null, + false, + uuidFromDistinctId(teamId, distinctId), + [distinctId] + ) await hub.db.addDistinctId(person, distinctId) // this throws }) const person = await personState({ event: '$identify', - distinct_id: 'old-user', + distinct_id: oldUserDistinctId, properties: { - $anon_distinct_id: 'new-user', + $anon_distinct_id: newUserDistinctId, }, }).handleIdentifyOrAlias() await hub.db.kafkaProducer.flush() @@ -1074,7 +1114,7 @@ describe('PersonState.update()', () => { expect(person).toEqual( expect.objectContaining({ id: expect.any(Number), - uuid: uuid.toString(), + uuid: oldUserUuid, properties: {}, created_at: timestamp, version: 1, @@ -1089,7 +1129,7 @@ describe('PersonState.update()', () => { // verify Postgres distinct_ids const distinctIds = await hub.db.fetchDistinctIdValues(persons[0]) - expect(distinctIds).toEqual(expect.arrayContaining(['new-user'])) + expect(distinctIds).toEqual(expect.arrayContaining([newUserDistinctId])) }) }) }) @@ -1100,8 +1140,8 @@ describe('PersonState.update()', () => { const state: PersonState = personState( { event: '$identify', - distinct_id: 'new-user', - properties: { $anon_distinct_id: 'old-user' }, + distinct_id: newUserDistinctId, + properties: { $anon_distinct_id: oldUserDistinctId }, }, hub ) @@ -1109,7 +1149,7 @@ describe('PersonState.update()', () => { return Promise.resolve(undefined) }) await state.handleIdentifyOrAlias() - expect(state.merge).toHaveBeenCalledWith('old-user', 'new-user', teamId, timestamp) + expect(state.merge).toHaveBeenCalledWith(oldUserDistinctId, newUserDistinctId, teamId, timestamp) jest.spyOn(state, 'merge').mockRestore() }) @@ -1117,8 +1157,8 @@ describe('PersonState.update()', () => { const state: PersonState = personState( { event: '$create_alias', - distinct_id: 'new-user', - properties: { alias: 'old-user' }, + distinct_id: newUserDistinctId, + properties: { alias: oldUserDistinctId }, }, hub ) @@ -1127,7 +1167,7 @@ describe('PersonState.update()', () => { }) await state.handleIdentifyOrAlias() - expect(state.merge).toHaveBeenCalledWith('old-user', 'new-user', teamId, timestamp) + expect(state.merge).toHaveBeenCalledWith(oldUserDistinctId, newUserDistinctId, teamId, timestamp) jest.spyOn(state, 'merge').mockRestore() }) @@ -1135,8 +1175,8 @@ describe('PersonState.update()', () => { const state: PersonState = personState( { event: '$merge_dangerously', - distinct_id: 'new-user', - properties: { alias: 'old-user' }, + distinct_id: newUserDistinctId, + properties: { alias: oldUserDistinctId }, }, hub ) @@ -1145,7 +1185,7 @@ describe('PersonState.update()', () => { }) await state.handleIdentifyOrAlias() - expect(state.merge).toHaveBeenCalledWith('old-user', 'new-user', teamId, timestamp) + expect(state.merge).toHaveBeenCalledWith(oldUserDistinctId, newUserDistinctId, teamId, timestamp) jest.spyOn(state, 'merge').mockRestore() }) }) @@ -1158,14 +1198,14 @@ describe('PersonState.update()', () => { describe(`overrides: ${useOverridesMode}`, () => { // only difference between $merge_dangerously and $identify it(`merge_dangerously can merge people when alias id user is identified`, async () => { - await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, true, uuid.toString(), ['old-user']) - await hub.db.createPerson(timestamp2, {}, {}, {}, teamId, null, true, uuid2.toString(), ['new-user']) + await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, true, oldUserUuid, [oldUserDistinctId]) + await hub.db.createPerson(timestamp2, {}, {}, {}, teamId, null, true, newUserUuid, [newUserDistinctId]) const person = await personState({ event: '$merge_dangerously', - distinct_id: 'new-user', + distinct_id: newUserDistinctId, properties: { - alias: 'old-user', + alias: oldUserDistinctId, }, }).handleIdentifyOrAlias() await hub.db.kafkaProducer.flush() @@ -1185,11 +1225,11 @@ describe('PersonState.update()', () => { const persons = await fetchPostgresPersonsH() expect(persons.length).toEqual(1) expect(persons[0]).toEqual(person) - expect([uuid.toString(), uuid2.toString()]).toContain(persons[0].uuid) + expect([newUserUuid, oldUserUuid]).toContain(persons[0].uuid) // verify Postgres distinct_ids const distinctIds = await hub.db.fetchDistinctIdValues(persons[0]) - expect(distinctIds).toEqual(expect.arrayContaining(['old-user', 'new-user'])) + expect(distinctIds).toEqual(expect.arrayContaining([oldUserDistinctId, newUserDistinctId])) // verify ClickHouse persons await delayUntilEventIngested(() => fetchPersonsRowsWithVersionHigerEqualThan(), 2) // wait until merge and delete processed @@ -1211,14 +1251,12 @@ describe('PersonState.update()', () => { }), ]) ) - expect(new Set(clickhousePersons.map((p) => p.id))).toEqual( - new Set([uuid.toString(), uuid2.toString()]) - ) + expect(new Set(clickhousePersons.map((p) => p.id))).toEqual(new Set([newUserUuid, oldUserUuid])) // verify ClickHouse distinct_ids await delayUntilEventIngested(() => fetchDistinctIdsClickhouseVersion1()) const clickHouseDistinctIds = await fetchDistinctIdsClickhouse(persons[0]) - expect(clickHouseDistinctIds).toEqual(expect.arrayContaining(['old-user', 'new-user'])) + expect(clickHouseDistinctIds).toEqual(expect.arrayContaining([oldUserDistinctId, newUserDistinctId])) }) }) }) @@ -1292,7 +1330,7 @@ describe('PersonState.update()', () => { teamId, null, false, - uuid.toString(), + uuidFromDistinctId(teamId, 'anonymous_id'), ['anonymous_id'] ) const identifiedPerson = await hub.db.createPerson( @@ -1303,7 +1341,7 @@ describe('PersonState.update()', () => { teamId, null, false, - uuid2.toString(), + uuidFromDistinctId(teamId, 'new_distinct_id'), ['new_distinct_id'] ) @@ -1369,7 +1407,7 @@ describe('PersonState.update()', () => { teamId, null, false, - uuid.toString(), + uuidFromDistinctId(teamId, 'anonymous_id'), ['anonymous_id'] ) const identifiedPerson = await hub.db.createPerson( @@ -1380,7 +1418,7 @@ describe('PersonState.update()', () => { teamId, null, false, - uuid2.toString(), + uuidFromDistinctId(teamId, 'new_distinct_id'), ['new_distinct_id'] ) @@ -1454,7 +1492,7 @@ describe('PersonState.update()', () => { teamId, null, false, - uuid.toString(), + uuidFromDistinctId(teamId, 'anonymous_id'), ['anonymous_id'] ) const identifiedPerson = await hub.db.createPerson( @@ -1465,7 +1503,7 @@ describe('PersonState.update()', () => { teamId, null, false, - uuid2.toString(), + uuidFromDistinctId(teamId, 'new_distinct_id'), ['new_distinct_id'] ) @@ -1537,19 +1575,19 @@ describe('PersonState.update()', () => { }) describe(`overrides: ${useOverridesMode}`, () => { it(`no-op if persons already merged`, async () => { - await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, true, uuid.toString(), [ - 'first', - 'second', + await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, true, firstUserUuid, [ + firstUserDistinctId, + secondUserDistinctId, ]) const state: PersonState = personState({}, hub) jest.spyOn(hub.db.kafkaProducer, 'queueMessages') - const person = await state.merge('second', 'first', teamId, timestamp) + const person = await state.merge(secondUserDistinctId, firstUserDistinctId, teamId, timestamp) await hub.db.kafkaProducer.flush() expect(person).toEqual( expect.objectContaining({ id: expect.any(Number), - uuid: uuid.toString(), + uuid: firstUserUuid, properties: {}, created_at: timestamp, version: 0, @@ -1569,8 +1607,8 @@ describe('PersonState.update()', () => { teamId, null, false, - uuid.toString(), - ['first'] + firstUserUuid, + [firstUserDistinctId] ) const second: Person = await hub.db.createPerson( timestamp, @@ -1580,24 +1618,24 @@ describe('PersonState.update()', () => { teamId, null, false, - uuid2.toString(), - ['second'] + secondUserUuid, + [secondUserDistinctId] ) const state: PersonState = personState({}, hub) jest.spyOn(hub.db.kafkaProducer, 'queueMessages') const person = await state.mergePeople({ mergeInto: first, - mergeIntoDistinctId: 'first', + mergeIntoDistinctId: firstUserDistinctId, otherPerson: second, - otherPersonDistinctId: 'second', + otherPersonDistinctId: secondUserDistinctId, }) await hub.db.kafkaProducer.flush() expect(person).toEqual( expect.objectContaining({ id: expect.any(Number), - uuid: uuid.toString(), + uuid: firstUserUuid, properties: {}, created_at: timestamp, version: 1, @@ -1614,7 +1652,7 @@ describe('PersonState.update()', () => { // verify Postgres distinct_ids const distinctIds = await hub.db.fetchDistinctIdValues(person) - expect(distinctIds).toEqual(expect.arrayContaining(['first', 'second'])) + expect(distinctIds).toEqual(expect.arrayContaining([firstUserDistinctId, secondUserDistinctId])) // verify ClickHouse persons await delayUntilEventIngested(() => fetchPersonsRowsWithVersionHigerEqualThan(), 2) // wait until merge and delete processed @@ -1622,14 +1660,14 @@ describe('PersonState.update()', () => { expect(clickhousePersons).toEqual( expect.arrayContaining([ expect.objectContaining({ - id: uuid.toString(), + id: firstUserUuid, properties: '{}', created_at: timestampch, version: 1, is_identified: 1, }), expect.objectContaining({ - id: uuid2.toString(), + id: secondUserUuid, is_deleted: 1, version: 100, }), @@ -1639,7 +1677,9 @@ describe('PersonState.update()', () => { // verify ClickHouse distinct_ids await delayUntilEventIngested(() => fetchDistinctIdsClickhouseVersion1()) const clickHouseDistinctIds = await fetchDistinctIdsClickhouse(person) - expect(clickHouseDistinctIds).toEqual(expect.arrayContaining(['first', 'second'])) + expect(clickHouseDistinctIds).toEqual( + expect.arrayContaining([firstUserDistinctId, secondUserDistinctId]) + ) // verify Postgres person_id overrides, if applicable if (overridesMode) { @@ -1659,8 +1699,8 @@ describe('PersonState.update()', () => { teamId, null, false, - uuid.toString(), - ['first'] + firstUserUuid, + [firstUserDistinctId] ) const second: Person = await hub.db.createPerson( timestamp, @@ -1670,8 +1710,8 @@ describe('PersonState.update()', () => { teamId, null, false, - uuid2.toString(), - ['second'] + secondUserUuid, + [secondUserDistinctId] ) const state: PersonState = personState({}, hub) @@ -1684,9 +1724,9 @@ describe('PersonState.update()', () => { await expect( state.mergePeople({ mergeInto: first, - mergeIntoDistinctId: 'first', + mergeIntoDistinctId: firstUserDistinctId, otherPerson: second, - otherPersonDistinctId: 'second', + otherPersonDistinctId: secondUserDistinctId, }) ).rejects.toThrow(error) await hub.db.kafkaProducer.flush() @@ -1700,7 +1740,7 @@ describe('PersonState.update()', () => { expect.arrayContaining([ expect.objectContaining({ id: expect.any(Number), - uuid: uuid.toString(), + uuid: firstUserUuid, properties: {}, created_at: timestamp, version: 0, @@ -1708,7 +1748,7 @@ describe('PersonState.update()', () => { }), expect.objectContaining({ id: expect.any(Number), - uuid: uuid2.toString(), + uuid: secondUserUuid, properties: {}, created_at: timestamp, version: 0, @@ -1719,8 +1759,12 @@ describe('PersonState.update()', () => { }) it(`retries merges up to retry limit if postgres down`, async () => { - await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, false, uuid.toString(), ['first']) - await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, false, uuid2.toString(), ['second']) + await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, false, firstUserUuid, [ + firstUserDistinctId, + ]) + await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, false, secondUserUuid, [ + secondUserDistinctId, + ]) const state: PersonState = personState({}, hub) // break postgres @@ -1729,7 +1773,9 @@ describe('PersonState.update()', () => { throw error }) jest.spyOn(hub.db.kafkaProducer, 'queueMessages') - await expect(state.merge('second', 'first', teamId, timestamp)).rejects.toThrow(error) + await expect(state.merge(secondUserDistinctId, firstUserDistinctId, teamId, timestamp)).rejects.toThrow( + error + ) await hub.db.kafkaProducer.flush() @@ -1742,7 +1788,7 @@ describe('PersonState.update()', () => { expect.arrayContaining([ expect.objectContaining({ id: expect.any(Number), - uuid: uuid.toString(), + uuid: firstUserUuid, properties: {}, created_at: timestamp, version: 0, @@ -1750,7 +1796,7 @@ describe('PersonState.update()', () => { }), expect.objectContaining({ id: expect.any(Number), - uuid: uuid2.toString(), + uuid: secondUserUuid, properties: {}, created_at: timestamp, version: 0, @@ -1762,11 +1808,19 @@ describe('PersonState.update()', () => { it(`handleIdentifyOrAlias does not throw on merge failure`, async () => { // TODO: This the current state, we should probably change it - await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, false, uuid.toString(), ['first']) - await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, false, uuid2.toString(), ['second']) + await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, false, firstUserUuid, [ + firstUserDistinctId, + ]) + await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, false, secondUserUuid, [ + secondUserDistinctId, + ]) const state: PersonState = personState( - { event: '$merge_dangerously', distinct_id: 'first', properties: { alias: 'second' } }, + { + event: '$merge_dangerously', + distinct_id: firstUserDistinctId, + properties: { alias: secondUserDistinctId }, + }, hub ) // break postgres @@ -1787,7 +1841,7 @@ describe('PersonState.update()', () => { expect.arrayContaining([ expect.objectContaining({ id: expect.any(Number), - uuid: uuid.toString(), + uuid: firstUserUuid, properties: {}, created_at: timestamp, version: 0, @@ -1795,7 +1849,7 @@ describe('PersonState.update()', () => { }), expect.objectContaining({ id: expect.any(Number), - uuid: uuid2.toString(), + uuid: secondUserUuid, properties: {}, created_at: timestamp, version: 0, @@ -1817,8 +1871,8 @@ describe('PersonState.update()', () => { teamId, null, false, - uuid.toString(), - ['first'] + firstUserUuid, + [firstUserDistinctId] ) const second: Person = await hub.db.createPerson( timestamp, @@ -1828,8 +1882,8 @@ describe('PersonState.update()', () => { teamId, null, false, - uuid2.toString(), - ['second'] + secondUserUuid, + [secondUserDistinctId] ) const state: PersonState = personState({}, hub) @@ -1856,9 +1910,9 @@ describe('PersonState.update()', () => { await expect( state.mergePeople({ mergeInto: first, - mergeIntoDistinctId: 'first', + mergeIntoDistinctId: firstUserDistinctId, otherPerson: second, - otherPersonDistinctId: 'second', + otherPersonDistinctId: secondUserDistinctId, }) ).rejects.toThrow(error) await hub.db.kafkaProducer.flush() @@ -1869,7 +1923,7 @@ describe('PersonState.update()', () => { expect.arrayContaining([ expect.objectContaining({ id: expect.any(Number), - uuid: uuid.toString(), + uuid: firstUserUuid, properties: {}, created_at: timestamp, version: 0, @@ -1877,7 +1931,7 @@ describe('PersonState.update()', () => { }), expect.objectContaining({ id: expect.any(Number), - uuid: uuid2.toString(), + uuid: secondUserUuid, properties: {}, created_at: timestamp, version: 0, @@ -1891,7 +1945,9 @@ describe('PersonState.update()', () => { await hub.db.fetchDistinctIdValues(personsAfterFailure[0]), await hub.db.fetchDistinctIdValues(personsAfterFailure[1]), ] - expect(distinctIdsAfterFailure).toEqual(expect.arrayContaining([['first'], ['second']])) + expect(distinctIdsAfterFailure).toEqual( + expect.arrayContaining([[firstUserDistinctId], [secondUserDistinctId]]) + ) // verify Postgres person_id overrides const overridesAfterFailure = await overridesMode!.fetchPostgresPersonIdOverrides(hub, teamId) @@ -1902,16 +1958,16 @@ describe('PersonState.update()', () => { mockPostgresQuery.mockRestore() const person = await state.mergePeople({ mergeInto: first, - mergeIntoDistinctId: 'first', + mergeIntoDistinctId: firstUserDistinctId, otherPerson: second, - otherPersonDistinctId: 'second', + otherPersonDistinctId: secondUserDistinctId, }) await hub.db.kafkaProducer.flush() expect(person).toEqual( expect.objectContaining({ id: expect.any(Number), - uuid: uuid.toString(), + uuid: firstUserUuid, properties: {}, created_at: timestamp, version: 1, @@ -1926,7 +1982,7 @@ describe('PersonState.update()', () => { // verify Postgres distinct_ids const distinctIds = await hub.db.fetchDistinctIdValues(person) - expect(distinctIds).toEqual(expect.arrayContaining(['first', 'second'])) + expect(distinctIds).toEqual(expect.arrayContaining([firstUserDistinctId, secondUserDistinctId])) // verify Postgres person_id overrides const overrides = await overridesMode!.fetchPostgresPersonIdOverrides(hub, teamId) @@ -1942,8 +1998,8 @@ describe('PersonState.update()', () => { teamId, null, false, - uuid.toString(), - ['first'] + firstUserUuid, + [firstUserDistinctId] ) const second: Person = await hub.db.createPerson( timestamp.plus({ minutes: 2 }), @@ -1953,8 +2009,8 @@ describe('PersonState.update()', () => { teamId, null, false, - uuid2.toString(), - ['second'] + secondUserUuid, + [secondUserDistinctId] ) const third: Person = await hub.db.createPerson( timestamp.plus({ minutes: 5 }), @@ -1999,9 +2055,9 @@ describe('PersonState.update()', () => { personState( { event: '$merge_dangerously', - distinct_id: 'first', + distinct_id: firstUserDistinctId, properties: { - alias: 'second', + alias: secondUserDistinctId, }, }, hub @@ -2009,7 +2065,7 @@ describe('PersonState.update()', () => { personState( { event: '$merge_dangerously', - distinct_id: 'second', + distinct_id: secondUserDistinctId, properties: { alias: 'third', }, @@ -2024,9 +2080,9 @@ describe('PersonState.update()', () => { personState( { event: '$merge_dangerously', - distinct_id: 'first', + distinct_id: firstUserDistinctId, properties: { - alias: 'second', + alias: secondUserDistinctId, }, }, hub @@ -2034,7 +2090,7 @@ describe('PersonState.update()', () => { personState( { event: '$merge_dangerously', - distinct_id: 'second', + distinct_id: secondUserDistinctId, properties: { alias: 'third', }, @@ -2049,7 +2105,7 @@ describe('PersonState.update()', () => { expect(persons[0]).toEqual( expect.objectContaining({ id: expect.any(Number), - uuid: uuid.toString(), // guaranteed to be merged into this based on timestamps + uuid: firstUserUuid, // guaranteed to be merged into this based on timestamps // There's a race condition in our code where // if different distinctIDs are used same time, // then pros can be dropped, see https://docs.google.com/presentation/d/1Osz7r8bKkDD5yFzw0cCtsGVf1LTEifXS-dzuwaS8JGY @@ -2062,7 +2118,9 @@ describe('PersonState.update()', () => { // verify Postgres distinct_ids const distinctIds = await hub.db.fetchDistinctIdValues(persons[0]) - expect(distinctIds).toEqual(expect.arrayContaining(['first', 'second', 'third'])) + expect(distinctIds).toEqual( + expect.arrayContaining([firstUserDistinctId, secondUserDistinctId, 'third']) + ) // verify Postgres person_id overrides, if applicable if (overridesMode) { @@ -2085,8 +2143,8 @@ describe('PersonState.update()', () => { teamId, null, false, - uuid.toString(), - ['first'] + firstUserUuid, + [firstUserDistinctId] ) const second: Person = await hub.db.createPerson( timestamp.plus({ minutes: 2 }), @@ -2096,8 +2154,8 @@ describe('PersonState.update()', () => { teamId, null, false, - uuid2.toString(), - ['second'] + secondUserUuid, + [secondUserDistinctId] ) const third: Person = await hub.db.createPerson( timestamp.plus({ minutes: 5 }), @@ -2114,7 +2172,7 @@ describe('PersonState.update()', () => { await personState( { event: '$merge_dangerously', - distinct_id: 'second', + distinct_id: secondUserDistinctId, properties: { alias: 'third', }, @@ -2125,9 +2183,9 @@ describe('PersonState.update()', () => { await personState( { event: '$merge_dangerously', - distinct_id: 'first', + distinct_id: firstUserDistinctId, properties: { - alias: 'second', + alias: secondUserDistinctId, }, }, hub @@ -2139,7 +2197,7 @@ describe('PersonState.update()', () => { expect(persons[0]).toEqual( expect.objectContaining({ id: expect.any(Number), - uuid: uuid.toString(), // guaranteed to be merged into this based on timestamps + uuid: firstUserUuid, // guaranteed to be merged into this based on timestamps properties: { first: true, second: true, third: true }, created_at: timestamp, version: 1, // the test intends for it to be a chain, so must get v1, we get v2 if second->first and third->first, but we want it to be third->second->first @@ -2149,7 +2207,9 @@ describe('PersonState.update()', () => { // verify Postgres distinct_ids const distinctIds = await hub.db.fetchDistinctIdValues(persons[0]) - expect(distinctIds).toEqual(expect.arrayContaining(['first', 'second', 'third'])) + expect(distinctIds).toEqual( + expect.arrayContaining([firstUserDistinctId, secondUserDistinctId, 'third']) + ) // verify Postgres person_id overrides, if applicable if (overridesMode) {