diff --git a/.github/workflows/ci-plugin-server.yml b/.github/workflows/ci-plugin-server.yml index 8dc118c854ba1..a62bd4a66851a 100644 --- a/.github/workflows/ci-plugin-server.yml +++ b/.github/workflows/ci-plugin-server.yml @@ -175,16 +175,11 @@ jobs: run: cd plugin-server && pnpm test -- --runInBand --forceExit tests/ --shard=${{matrix.shard}} functional-tests: - name: Functional tests (POE=${{matrix.POE_EMBRACE_JOIN_FOR_TEAMS}},RDK=${{matrix.KAFKA_CONSUMPTION_USE_RDKAFKA}}) + name: Functional tests needs: changes if: needs.changes.outputs.plugin-server == 'true' runs-on: ubuntu-latest - strategy: - fail-fast: false - matrix: - POE_EMBRACE_JOIN_FOR_TEAMS: ['', '*'] - env: REDIS_URL: 'redis://localhost' CLICKHOUSE_HOST: 'localhost' @@ -192,7 +187,6 @@ jobs: KAFKA_HOSTS: 'kafka:9092' DATABASE_URL: 'postgres://posthog:posthog@localhost:5432/posthog' RELOAD_PLUGIN_JITTER_MAX_MS: 0 - POE_EMBRACE_JOIN_FOR_TEAMS: ${{matrix.POE_EMBRACE_JOIN_FOR_TEAMS}} steps: - name: Code check out diff --git a/plugin-server/functional_tests/analytics-ingestion/happy-path.test.ts b/plugin-server/functional_tests/analytics-ingestion/happy-path.test.ts index 98806e6aebafd..c1e45d476eb09 100644 --- a/plugin-server/functional_tests/analytics-ingestion/happy-path.test.ts +++ b/plugin-server/functional_tests/analytics-ingestion/happy-path.test.ts @@ -618,9 +618,7 @@ test.concurrent(`properties still $set even if merge fails`, async () => { }) }) -const testIfPoEEmbraceJoinEnabled = - process.env.POE_EMBRACE_JOIN_FOR_TEAMS === '*' ? test.concurrent : test.concurrent.skip -testIfPoEEmbraceJoinEnabled(`single merge results in all events resolving to the same person id`, async () => { +test.concurrent(`single merge results in all events resolving to the same person id`, async () => { const teamId = await createTeam(organizationId) const initialDistinctId = new UUIDT().toString() const secondDistinctId = new UUIDT().toString() @@ -680,7 +678,7 @@ testIfPoEEmbraceJoinEnabled(`single merge results in all events resolving to the }, 10000) }) -testIfPoEEmbraceJoinEnabled(`chained merge results in all events resolving to the same person id`, async () => { +test.concurrent(`chained merge results in all events resolving to the same person id`, async () => { const teamId = await createTeam(organizationId) const initialDistinctId = new UUIDT().toString() const secondDistinctId = new UUIDT().toString() @@ -735,76 +733,73 @@ testIfPoEEmbraceJoinEnabled(`chained merge results in all events resolving to th }, 20000) }) -testIfPoEEmbraceJoinEnabled( - `complex chained merge adds results in all events resolving to the same person id`, - async () => { - // let's assume we have 4 persons 1234, we'll first merge 1-2 & 3-4, then we'll merge 2-3 - // this should still result in all events having the same person_id or override[person_id] +test.concurrent(`complex chained merge adds results in all events resolving to the same person id`, async () => { + // let's assume we have 4 persons 1234, we'll first merge 1-2 & 3-4, then we'll merge 2-3 + // this should still result in all events having the same person_id or override[person_id] - const teamId = await createTeam(organizationId) - const initialDistinctId = new UUIDT().toString() - const secondDistinctId = new UUIDT().toString() - const thirdDistinctId = new UUIDT().toString() - const forthDistinctId = new UUIDT().toString() - - // First we emit anoymous events and wait for the persons to be created. - await capture({ teamId, distinctId: initialDistinctId, uuid: new UUIDT().toString(), event: 'custom event' }) - await capture({ teamId, distinctId: secondDistinctId, uuid: new UUIDT().toString(), event: 'custom event 2' }) - await capture({ teamId, distinctId: thirdDistinctId, uuid: new UUIDT().toString(), event: 'custom event 3' }) - await capture({ teamId, distinctId: forthDistinctId, uuid: new UUIDT().toString(), event: 'custom event 3' }) - await waitForExpect(async () => { - const persons = await fetchPersons(teamId) - expect(persons.length).toBe(4) - }, 10000) + const teamId = await createTeam(organizationId) + const initialDistinctId = new UUIDT().toString() + const secondDistinctId = new UUIDT().toString() + const thirdDistinctId = new UUIDT().toString() + const forthDistinctId = new UUIDT().toString() - // Then we identify 1-2 and 3-4 - await capture({ - teamId, - distinctId: initialDistinctId, - uuid: new UUIDT().toString(), - event: '$identify', - properties: { - distinct_id: initialDistinctId, - $anon_distinct_id: secondDistinctId, - }, - }) - await capture({ - teamId, - distinctId: thirdDistinctId, - uuid: new UUIDT().toString(), - event: '$identify', - properties: { - distinct_id: thirdDistinctId, - $anon_distinct_id: forthDistinctId, - }, - }) + // First we emit anoymous events and wait for the persons to be created. + await capture({ teamId, distinctId: initialDistinctId, uuid: new UUIDT().toString(), event: 'custom event' }) + await capture({ teamId, distinctId: secondDistinctId, uuid: new UUIDT().toString(), event: 'custom event 2' }) + await capture({ teamId, distinctId: thirdDistinctId, uuid: new UUIDT().toString(), event: 'custom event 3' }) + await capture({ teamId, distinctId: forthDistinctId, uuid: new UUIDT().toString(), event: 'custom event 3' }) + await waitForExpect(async () => { + const persons = await fetchPersons(teamId) + expect(persons.length).toBe(4) + }, 10000) - await waitForExpect(async () => { - const events = await fetchEvents(teamId) - expect(events.length).toBe(6) - }, 10000) + // Then we identify 1-2 and 3-4 + await capture({ + teamId, + distinctId: initialDistinctId, + uuid: new UUIDT().toString(), + event: '$identify', + properties: { + distinct_id: initialDistinctId, + $anon_distinct_id: secondDistinctId, + }, + }) + await capture({ + teamId, + distinctId: thirdDistinctId, + uuid: new UUIDT().toString(), + event: '$identify', + properties: { + distinct_id: thirdDistinctId, + $anon_distinct_id: forthDistinctId, + }, + }) - // Then we merge 2-3 - await capture({ - teamId, - distinctId: initialDistinctId, - uuid: new UUIDT().toString(), - event: '$merge_dangerously', - properties: { - distinct_id: secondDistinctId, - alias: thirdDistinctId, - }, - }) + await waitForExpect(async () => { + const events = await fetchEvents(teamId) + expect(events.length).toBe(6) + }, 10000) - await waitForExpect(async () => { - const events = await fetchEvents(teamId) - expect(events.length).toBe(7) - expect(events[0].person_id).toBeDefined() - expect(events[0].person_id).not.toBe('00000000-0000-0000-0000-000000000000') - expect(new Set(events.map((event) => event.person_id)).size).toBe(1) - }, 20000) - } -) + // Then we merge 2-3 + await capture({ + teamId, + distinctId: initialDistinctId, + uuid: new UUIDT().toString(), + event: '$merge_dangerously', + properties: { + distinct_id: secondDistinctId, + alias: thirdDistinctId, + }, + }) + + await waitForExpect(async () => { + const events = await fetchEvents(teamId) + expect(events.length).toBe(7) + expect(events[0].person_id).toBeDefined() + expect(events[0].person_id).not.toBe('00000000-0000-0000-0000-000000000000') + expect(new Set(events.map((event) => event.person_id)).size).toBe(1) + }, 20000) +}) // TODO: adjust this test to poEEmbraceJoin test.skip(`person properties don't see properties from descendents`, async () => { diff --git a/plugin-server/functional_tests/api.ts b/plugin-server/functional_tests/api.ts index 4e6c01c37385f..35e728843dbb2 100644 --- a/plugin-server/functional_tests/api.ts +++ b/plugin-server/functional_tests/api.ts @@ -271,12 +271,14 @@ export const fetchEvents = async (teamId: number, uuid?: string) => { SELECT *, if(notEmpty(overrides.person_id), overrides.person_id, e.person_id) as person_id FROM events e - LEFT OUTER JOIN - (SELECT argMax(override_person_id, version) as person_id, - old_person_id - FROM person_overrides + LEFT OUTER JOIN ( + SELECT + distinct_id, + argMax(person_id, version) as person_id + FROM person_distinct_id_overrides WHERE team_id = ${teamId} - GROUP BY old_person_id) AS overrides ON e.person_id = overrides.old_person_id + GROUP BY distinct_id + ) AS overrides USING distinct_id WHERE team_id = ${teamId} ${uuid ? `AND uuid = '${uuid}'` : ``} ORDER BY timestamp ASC `)) as unknown as ClickHouse.ObjectQueryResult diff --git a/plugin-server/src/capabilities.ts b/plugin-server/src/capabilities.ts index 4d971e4709cf7..8f45d91d9cc75 100644 --- a/plugin-server/src/capabilities.ts +++ b/plugin-server/src/capabilities.ts @@ -20,7 +20,6 @@ export function getPluginServerCapabilities(config: PluginsServerConfig): Plugin processAsyncWebhooksHandlers: true, sessionRecordingBlobIngestion: true, sessionRecordingBlobOverflowIngestion: config.SESSION_RECORDING_OVERFLOW_ENABLED, - personOverrides: true, appManagementSingleton: true, preflightSchedules: true, cdpProcessedEvents: true, @@ -85,12 +84,6 @@ export function getPluginServerCapabilities(config: PluginsServerConfig): Plugin appManagementSingleton: true, ...sharedCapabilities, } - case PluginServerMode.person_overrides: - return { - personOverrides: true, - ...sharedCapabilities, - } - case PluginServerMode.cdp_processed_events: return { cdpProcessedEvents: true, diff --git a/plugin-server/src/config/config.ts b/plugin-server/src/config/config.ts index f3d1a47eaac29..db1833552a1b8 100644 --- a/plugin-server/src/config/config.ts +++ b/plugin-server/src/config/config.ts @@ -128,9 +128,6 @@ export function getDefaultConfig(): PluginsServerConfig { EXTERNAL_REQUEST_TIMEOUT_MS: 10 * 1000, // 10 seconds DROP_EVENTS_BY_TOKEN_DISTINCT_ID: '', DROP_EVENTS_BY_TOKEN: '', - POE_EMBRACE_JOIN_FOR_TEAMS: '', - POE_WRITES_ENABLED_MAX_TEAM_ID: 0, - POE_WRITES_EXCLUDE_TEAMS: '', PIPELINE_STEP_STALLED_LOG_TIMEOUT: 30, RELOAD_PLUGIN_JITTER_MAX_MS: 60000, RUSTY_HOOK_FOR_TEAMS: '', diff --git a/plugin-server/src/config/kafka-topics.ts b/plugin-server/src/config/kafka-topics.ts index 4865693e81112..83ca0f07ea424 100644 --- a/plugin-server/src/config/kafka-topics.ts +++ b/plugin-server/src/config/kafka-topics.ts @@ -25,7 +25,6 @@ export const KAFKA_JOBS_DLQ = `${prefix}jobs_dlq${suffix}` export const KAFKA_SCHEDULED_TASKS = `${prefix}scheduled_tasks${suffix}` export const KAFKA_SCHEDULED_TASKS_DLQ = `${prefix}scheduled_tasks_dlq${suffix}` export const KAFKA_METRICS_TIME_TO_SEE_DATA = `${prefix}clickhouse_metrics_time_to_see_data${suffix}` -export const KAFKA_PERSON_OVERRIDE = `${prefix}clickhouse_person_override${suffix}` // read session recording snapshot items export const KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS = `${prefix}session_recording_snapshot_item_events${suffix}` diff --git a/plugin-server/src/main/pluginsServer.ts b/plugin-server/src/main/pluginsServer.ts index 8921aa4b2e094..54ddfaff521ba 100644 --- a/plugin-server/src/main/pluginsServer.ts +++ b/plugin-server/src/main/pluginsServer.ts @@ -17,7 +17,6 @@ import { Hub, PluginServerCapabilities, PluginsServerConfig } from '../types' import { createHub, createKafkaClient, createKafkaProducerWrapper } from '../utils/db/hub' import { PostgresRouter } from '../utils/db/postgres' import { cancelAllScheduledJobs } from '../utils/node-schedule' -import { PeriodicTask } from '../utils/periodic-task' import { PubSub } from '../utils/pubsub' import { status } from '../utils/status' import { createRedisClient, delay } from '../utils/utils' @@ -26,7 +25,6 @@ import { ActionMatcher } from '../worker/ingestion/action-matcher' import { AppMetrics } from '../worker/ingestion/app-metrics' import { GroupTypeManager } from '../worker/ingestion/group-type-manager' import { OrganizationManager } from '../worker/ingestion/organization-manager' -import { DeferredPersonOverrideWorker, FlatPersonOverrideWriter } from '../worker/ingestion/person-state' import { TeamManager } from '../worker/ingestion/team-manager' import Piscina, { makePiscina as defaultMakePiscina } from '../worker/piscina' import { RustyHook } from '../worker/rusty-hook' @@ -119,8 +117,6 @@ export async function startPluginsServer( let jobsConsumer: Consumer | undefined let schedulerTasksConsumer: Consumer | undefined - let personOverridesPeriodicTask: PeriodicTask | undefined - let httpServer: Server | undefined // server let graphileWorker: GraphileWorker | undefined @@ -160,7 +156,6 @@ export async function startPluginsServer( stopSessionRecordingBlobConsumer?.(), stopSessionRecordingBlobOverflowConsumer?.(), schedulerTasksConsumer?.disconnect(), - personOverridesPeriodicTask?.stop(), ...shutdownCallbacks.map((cb) => cb()), ]) @@ -529,22 +524,6 @@ export async function startPluginsServer( healthChecks['cdp-overflow'] = () => consumer.isHealthy() ?? false } - if (capabilities.personOverrides) { - const postgres = hub?.postgres ?? new PostgresRouter(serverConfig) - const kafkaProducer = hub?.kafkaProducer ?? (await createKafkaProducerWrapper(serverConfig)) - - personOverridesPeriodicTask = new DeferredPersonOverrideWorker( - postgres, - kafkaProducer, - new FlatPersonOverrideWriter(postgres) - ).runTask(5000) - personOverridesPeriodicTask.promise.catch(async () => { - status.error('⚠️', 'Person override worker task crashed! Requesting shutdown...') - await closeJobs() - process.exit(1) - }) - } - if (capabilities.http) { const app = setupCommonRoutes(healthChecks, analyticsEventsIngestionConsumer) diff --git a/plugin-server/src/types.ts b/plugin-server/src/types.ts index 6eb7b4bb102e5..0de06ba25262b 100644 --- a/plugin-server/src/types.ts +++ b/plugin-server/src/types.ts @@ -81,7 +81,6 @@ export enum PluginServerMode { analytics_ingestion = 'analytics-ingestion', recordings_blob_ingestion = 'recordings-blob-ingestion', recordings_blob_ingestion_overflow = 'recordings-blob-ingestion-overflow', - person_overrides = 'person-overrides', cdp_processed_events = 'cdp-processed-events', cdp_function_callbacks = 'cdp-function-callbacks', cdp_function_overflow = 'cdp-function-overflow', @@ -216,9 +215,6 @@ export interface PluginsServerConfig extends CdpConfig { EXTERNAL_REQUEST_TIMEOUT_MS: number DROP_EVENTS_BY_TOKEN_DISTINCT_ID: string DROP_EVENTS_BY_TOKEN: string - POE_EMBRACE_JOIN_FOR_TEAMS: string - POE_WRITES_ENABLED_MAX_TEAM_ID: number - POE_WRITES_EXCLUDE_TEAMS: string RELOAD_PLUGIN_JITTER_MAX_MS: number RUSTY_HOOK_FOR_TEAMS: string RUSTY_HOOK_ROLLOUT_PERCENTAGE: number @@ -316,8 +312,6 @@ export interface Hub extends PluginsServerConfig { enqueuePluginJob: (job: EnqueuedPluginJob) => Promise // ValueMatchers used for various opt-in/out features pluginConfigsToSkipElementsParsing: ValueMatcher - poeEmbraceJoinForTeams: ValueMatcher - poeWritesExcludeTeams: ValueMatcher // lookups eventsToDropByToken: Map } @@ -337,7 +331,6 @@ export interface PluginServerCapabilities { cdpProcessedEvents?: boolean cdpFunctionCallbacks?: boolean cdpFunctionOverflow?: boolean - personOverrides?: boolean appManagementSingleton?: boolean preflightSchedules?: boolean // Used for instance health checks on hobby deploy, not useful on cloud http?: boolean diff --git a/plugin-server/src/utils/db/hub.ts b/plugin-server/src/utils/db/hub.ts index d0084c7087cd7..69ad05296abf0 100644 --- a/plugin-server/src/utils/db/hub.ts +++ b/plugin-server/src/utils/db/hub.ts @@ -205,8 +205,6 @@ export async function createHub( actionManager, conversionBufferEnabledTeams, pluginConfigsToSkipElementsParsing: buildIntegerMatcher(process.env.SKIP_ELEMENTS_PARSING_PLUGINS, true), - poeEmbraceJoinForTeams: buildIntegerMatcher(process.env.POE_EMBRACE_JOIN_FOR_TEAMS, true), - poeWritesExcludeTeams: buildIntegerMatcher(process.env.POE_WRITES_EXCLUDE_TEAMS, false), eventsToDropByToken: createEventsToDropByToken(process.env.DROP_EVENTS_BY_TOKEN_DISTINCT_ID), } diff --git a/plugin-server/src/worker/ingestion/event-pipeline/processPersonsStep.ts b/plugin-server/src/worker/ingestion/event-pipeline/processPersonsStep.ts index 377981fe64b09..28f6961e36940 100644 --- a/plugin-server/src/worker/ingestion/event-pipeline/processPersonsStep.ts +++ b/plugin-server/src/worker/ingestion/event-pipeline/processPersonsStep.ts @@ -2,7 +2,7 @@ import { PluginEvent } from '@posthog/plugin-scaffold' import { DateTime } from 'luxon' import { Person } from 'types' -import { DeferredPersonOverrideWriter, PersonState } from '../person-state' +import { PersonState } from '../person-state' import { EventPipelineRunner } from './runner' export async function processPersonsStep( @@ -11,19 +11,13 @@ export async function processPersonsStep( timestamp: DateTime, processPerson: boolean ): Promise<[PluginEvent, Person, Promise]> { - let overridesWriter: DeferredPersonOverrideWriter | undefined = undefined - if (runner.poEEmbraceJoin) { - overridesWriter = new DeferredPersonOverrideWriter(runner.hub.db.postgres) - } - const [person, kafkaAck] = await new PersonState( event, event.team_id, String(event.distinct_id), timestamp, processPerson, - runner.hub.db, - overridesWriter + runner.hub.db ).update() return [event, person, kafkaAck] diff --git a/plugin-server/src/worker/ingestion/event-pipeline/runner.ts b/plugin-server/src/worker/ingestion/event-pipeline/runner.ts index 5861aff1e2fb3..0518902516410 100644 --- a/plugin-server/src/worker/ingestion/event-pipeline/runner.ts +++ b/plugin-server/src/worker/ingestion/event-pipeline/runner.ts @@ -51,12 +51,8 @@ export class EventPipelineRunner { hub: Hub originalEvent: PipelineEvent - // See https://docs.google.com/document/d/12Q1KcJ41TicIwySCfNJV5ZPKXWVtxT7pzpB3r9ivz_0 - poEEmbraceJoin: boolean - - constructor(hub: Hub, event: PipelineEvent, poEEmbraceJoin = false) { + constructor(hub: Hub, event: PipelineEvent) { this.hub = hub - this.poEEmbraceJoin = poEEmbraceJoin this.originalEvent = event } @@ -140,19 +136,6 @@ export class EventPipelineRunner { } async runEventPipelineSteps(event: PluginEvent): Promise { - if ( - this.hub.poeEmbraceJoinForTeams?.(event.team_id) || - (event.team_id <= this.hub.POE_WRITES_ENABLED_MAX_TEAM_ID && !this.hub.poeWritesExcludeTeams(event.team_id)) - ) { - // https://docs.google.com/document/d/12Q1KcJ41TicIwySCfNJV5ZPKXWVtxT7pzpB3r9ivz_0 - // We're not using the buffer anymore - // instead we'll (if within timeframe) merge into the newer personId - - // TODO: remove this step and runner env once we're confident that the new - // ingestion pipeline is working well for all teams. - this.poEEmbraceJoin = true - } - const kafkaAcks: Promise[] = [] let processPerson = true // The default. diff --git a/plugin-server/src/worker/ingestion/person-state.ts b/plugin-server/src/worker/ingestion/person-state.ts index 59f7c9699378c..2a4aa85592547 100644 --- a/plugin-server/src/worker/ingestion/person-state.ts +++ b/plugin-server/src/worker/ingestion/person-state.ts @@ -4,18 +4,14 @@ import { ProducerRecord } from 'kafkajs' import LRU from 'lru-cache' import { DateTime } from 'luxon' import { Counter } from 'prom-client' -import { KafkaProducerWrapper } from 'utils/db/kafka-producer-wrapper' import { ONE_HOUR } from '../../config/constants' -import { KAFKA_PERSON_OVERRIDE } from '../../config/kafka-topics' -import { InternalPerson, Person, PropertyUpdateOperation, TimestampFormat } from '../../types' +import { InternalPerson, Person, PropertyUpdateOperation } from '../../types' import { DB } from '../../utils/db/db' -import { PostgresRouter, PostgresUse, TransactionClient } from '../../utils/db/postgres' +import { PostgresUse, TransactionClient } from '../../utils/db/postgres' import { eventToPersonProperties, initialEventToPersonProperties, timeoutGuard } from '../../utils/db/utils' -import { PeriodicTask } from '../../utils/periodic-task' import { promiseRetry } from '../../utils/retries' import { status } from '../../utils/status' -import { castTimestampOrNow } from '../../utils/utils' import { uuidFromDistinctId } from './person-uuid' import { captureIngestionWarning } from './utils' @@ -27,13 +23,13 @@ export const mergeFinalFailuresCounter = new Counter({ export const mergeTxnAttemptCounter = new Counter({ name: 'person_merge_txn_attempt_total', help: 'Number of person merge attempts.', - labelNames: ['call', 'oldPersonIdentified', 'newPersonIdentified', 'poEEmbraceJoin'], + labelNames: ['call', 'oldPersonIdentified', 'newPersonIdentified'], }) export const mergeTxnSuccessCounter = new Counter({ name: 'person_merge_txn_success_total', help: 'Number of person merges that succeeded.', - labelNames: ['call', 'oldPersonIdentified', 'newPersonIdentified', 'poEEmbraceJoin'], + labelNames: ['call', 'oldPersonIdentified', 'newPersonIdentified'], }) export const personPropertyKeyUpdateCounter = new Counter({ @@ -113,8 +109,7 @@ export class PersonState { private distinctId: string, private timestamp: DateTime, private processPerson: boolean, // $process_person_profile flag from the event - private db: DB, - private personOverrideWriter?: DeferredPersonOverrideWriter + private db: DB ) { this.eventProperties = event.properties! @@ -714,7 +709,6 @@ export class PersonState { call: this.event.event, // $identify, $create_alias or $merge_dangerously oldPersonIdentified: String(otherPerson.is_identified), newPersonIdentified: String(mergeInto.is_identified), - poEEmbraceJoin: String(!!this.personOverrideWriter), }) .inc() @@ -762,13 +756,6 @@ export class PersonState { const deletePersonMessages = await this.db.deletePerson(otherPerson, tx) - if (this.personOverrideWriter) { - await this.personOverrideWriter.addPersonOverride( - tx, - getPersonOverrideDetails(this.teamId, otherPerson, mergeInto) - ) - } - return [person, [...updatePersonMessages, ...distinctIdMessages, ...deletePersonMessages]] } ) @@ -778,7 +765,6 @@ export class PersonState { call: this.event.event, // $identify, $create_alias or $merge_dangerously oldPersonIdentified: String(otherPerson.is_identified), newPersonIdentified: String(mergeInto.is_identified), - poEEmbraceJoin: String(!!this.personOverrideWriter), }) .inc() @@ -787,283 +773,3 @@ export class PersonState { return [mergedPerson, kafkaAck] } } - -/** - * A record of a merge operation occurring. - * - * These property names need to be kept in sync with the ``PersonOverride`` - * Django model (and ``posthog_personoverride`` table schema) as defined in - * ``posthog/models/person/person.py``. - */ -type PersonOverrideDetails = { - team_id: number - old_person_id: string - override_person_id: string - oldest_event: DateTime -} - -function getPersonOverrideDetails( - teamId: number, - oldPerson: InternalPerson, - overridePerson: InternalPerson -): PersonOverrideDetails { - if (teamId != oldPerson.team_id || teamId != overridePerson.team_id) { - throw new Error('cannot merge persons across different teams') - } - return { - team_id: teamId, - old_person_id: oldPerson.uuid, - override_person_id: overridePerson.uuid, - oldest_event: overridePerson.created_at, - } -} - -export class FlatPersonOverrideWriter { - constructor(private postgres: PostgresRouter) {} - - public async addPersonOverride( - tx: TransactionClient, - overrideDetails: PersonOverrideDetails - ): Promise { - const mergedAt = DateTime.now() - - await this.postgres.query( - tx, - SQL` - INSERT INTO posthog_flatpersonoverride ( - team_id, - old_person_id, - override_person_id, - oldest_event, - version - ) VALUES ( - ${overrideDetails.team_id}, - ${overrideDetails.old_person_id}, - ${overrideDetails.override_person_id}, - ${overrideDetails.oldest_event}, - 0 - ) - `, - undefined, - 'personOverride' - ) - - const { rows: transitiveUpdates } = await this.postgres.query( - tx, - SQL` - UPDATE - posthog_flatpersonoverride - SET - override_person_id = ${overrideDetails.override_person_id}, - version = COALESCE(version, 0)::numeric + 1 - WHERE - team_id = ${overrideDetails.team_id} AND override_person_id = ${overrideDetails.old_person_id} - RETURNING - old_person_id, - version, - oldest_event - `, - undefined, - 'transitivePersonOverrides' - ) - - status.debug('🔁', 'person_overrides_updated', { transitiveUpdates }) - - const personOverrideMessages: ProducerRecord[] = [ - { - topic: KAFKA_PERSON_OVERRIDE, - messages: [ - { - value: JSON.stringify({ - team_id: overrideDetails.team_id, - old_person_id: overrideDetails.old_person_id, - override_person_id: overrideDetails.override_person_id, - oldest_event: castTimestampOrNow(overrideDetails.oldest_event, TimestampFormat.ClickHouse), - merged_at: castTimestampOrNow(mergedAt, TimestampFormat.ClickHouse), - version: 0, - }), - }, - ...transitiveUpdates.map(({ old_person_id, version, oldest_event }) => ({ - value: JSON.stringify({ - team_id: overrideDetails.team_id, - old_person_id: old_person_id, - override_person_id: overrideDetails.override_person_id, - oldest_event: castTimestampOrNow(oldest_event, TimestampFormat.ClickHouse), - merged_at: castTimestampOrNow(mergedAt, TimestampFormat.ClickHouse), - version: version, - }), - })), - ], - }, - ] - - return personOverrideMessages - } - - public async getPersonOverrides(teamId: number): Promise { - const { rows } = await this.postgres.query( - PostgresUse.COMMON_WRITE, - SQL` - SELECT - team_id, - old_person_id, - override_person_id, - oldest_event - FROM posthog_flatpersonoverride - WHERE team_id = ${teamId} - `, - undefined, - 'getPersonOverrides' - ) - return rows.map((row) => ({ - ...row, - team_id: parseInt(row.team_id), // XXX: pg returns bigint as str (reasonably so) - oldest_event: DateTime.fromISO(row.oldest_event), - })) - } -} - -const deferredPersonOverridesWrittenCounter = new Counter({ - name: 'deferred_person_overrides_written', - help: 'Number of person overrides that have been written as pending', -}) -export class DeferredPersonOverrideWriter { - constructor(private postgres: PostgresRouter) {} - - /** - * Enqueue an override for deferred processing. - */ - public async addPersonOverride(tx: TransactionClient, overrideDetails: PersonOverrideDetails): Promise { - await this.postgres.query( - tx, - SQL` - INSERT INTO posthog_pendingpersonoverride ( - team_id, - old_person_id, - override_person_id, - oldest_event - ) VALUES ( - ${overrideDetails.team_id}, - ${overrideDetails.old_person_id}, - ${overrideDetails.override_person_id}, - ${overrideDetails.oldest_event} - )`, - undefined, - 'pendingPersonOverride' - ) - deferredPersonOverridesWrittenCounter.inc() - } -} - -const deferredPersonOverridesProcessedCounter = new Counter({ - name: 'deferred_person_overrides_processed', - help: 'Number of pending person overrides that have been successfully processed', -}) - -export class DeferredPersonOverrideWorker { - // This lock ID is used as an advisory lock identifier/key for a lock that - // ensures only one worker is able to update the overrides table at a time. - // (We do this to make it simpler to ensure that we maintain the consistency - // of transitive updates.) There isn't any special significance to this - // particular value (other than Postgres requires it to be a numeric one), - // it just needs to be consistent across all processes. - public readonly lockId = 567 - - constructor( - private postgres: PostgresRouter, - private kafkaProducer: KafkaProducerWrapper, - private writer: FlatPersonOverrideWriter - ) {} - - /** - * Process all (or up to the given limit) pending overrides. - * - * An advisory lock is acquired prior to processing to ensure that this - * function has exclusive access to the pending overrides during the update - * process. - * - * @returns the number of overrides processed - */ - public async processPendingOverrides(limit?: number): Promise { - const overridesCount = await this.postgres.transaction( - PostgresUse.COMMON_WRITE, - 'processPendingOverrides', - async (tx) => { - const { - rows: [{ acquired }], - } = await this.postgres.query( - tx, - SQL`SELECT pg_try_advisory_xact_lock(${this.lockId}) as acquired`, - undefined, - 'processPendingOverrides' - ) - if (!acquired) { - throw new Error('could not acquire lock') - } - - // n.b.: Ordering by id ensures we are processing in (roughly) FIFO order - const { rows } = await this.postgres.query( - tx, - `SELECT * FROM posthog_pendingpersonoverride ORDER BY id` + - (limit !== undefined ? ` LIMIT ${limit}` : ''), - undefined, - 'processPendingOverrides' - ) - - const messages: ProducerRecord[] = [] - for (const { id, ...mergeOperation } of rows) { - messages.push(...(await this.writer.addPersonOverride(tx, mergeOperation))) - await this.postgres.query( - tx, - SQL`DELETE FROM posthog_pendingpersonoverride WHERE id = ${id}`, - undefined, - 'processPendingOverrides' - ) - } - - // n.b.: We publish the messages here (and wait for acks) to ensure - // that all of our override updates are sent to Kafka prior to - // committing the transaction. If we're unable to publish, we should - // discard updates and try again later when it's available -- not - // doing so would cause the copy of this data in ClickHouse to - // slowly drift out of sync with the copy in Postgres. This write is - // safe to retry if we write to Kafka but then fail to commit to - // Postgres for some reason -- the same row state should be - // generated each call, and the receiving ReplacingMergeTree will - // ensure we keep only the latest version after all writes settle.) - await this.kafkaProducer.queueMessages({ kafkaMessages: messages, waitForAck: true }) - - return rows.length - } - ) - - deferredPersonOverridesProcessedCounter.inc(overridesCount) - - return overridesCount - } - - public runTask(intervalMs: number): PeriodicTask { - return new PeriodicTask( - 'processPendingOverrides', - async () => { - status.debug('👥', 'Processing pending overrides...') - const overridesCount = await this.processPendingOverrides(5000) - ;(overridesCount > 0 ? status.info : status.debug)( - '👥', - `Processed ${overridesCount} pending overrides.` - ) - }, - intervalMs - ) - } -} - -function SQL(sqlParts: TemplateStringsArray, ...args: any[]): { text: string; values: any[] } { - // Generates a node-pq compatible query object given a tagged - // template literal. The intention is to remove the need to match up - // the positional arguments with the $1, $2, etc. placeholders in - // the query string. - const text = sqlParts.reduce((acc, part, i) => acc + '$' + i + part) - const values = args - return { text, values } -} diff --git a/plugin-server/tests/worker/ingestion/event-pipeline/processPersonsStep.test.ts b/plugin-server/tests/worker/ingestion/event-pipeline/processPersonsStep.test.ts index 771a6900edc05..ea3a592f79ed4 100644 --- a/plugin-server/tests/worker/ingestion/event-pipeline/processPersonsStep.test.ts +++ b/plugin-server/tests/worker/ingestion/event-pipeline/processPersonsStep.test.ts @@ -8,7 +8,7 @@ import { normalizeEventStep } from '../../../../src/worker/ingestion/event-pipel import { processPersonsStep } from '../../../../src/worker/ingestion/event-pipeline/processPersonsStep' import { createOrganization, createTeam, fetchPostgresPersons, resetTestDatabase } from '../../../helpers/sql' -describe.each([[true], [false]])('processPersonsStep()', (poEEmbraceJoin) => { +describe('processPersonsStep()', () => { let runner: any let hub: Hub let closeHub: () => Promise @@ -24,7 +24,6 @@ describe.each([[true], [false]])('processPersonsStep()', (poEEmbraceJoin) => { runner = { nextStep: (...args: any[]) => args, hub: hub, - poEEmbraceJoin: poEEmbraceJoin, } const organizationId = await createOrganization(runner.hub.db.postgres) teamId = await createTeam(runner.hub.db.postgres, organizationId) diff --git a/plugin-server/tests/worker/ingestion/person-state.test.ts b/plugin-server/tests/worker/ingestion/person-state.test.ts index ee46483405c50..7405f6799dab4 100644 --- a/plugin-server/tests/worker/ingestion/person-state.test.ts +++ b/plugin-server/tests/worker/ingestion/person-state.test.ts @@ -1,22 +1,15 @@ import { PluginEvent } from '@posthog/plugin-scaffold' import { DateTime } from 'luxon' -import { waitForExpect } from '../../../functional_tests/expectations' import { Database, Hub, InternalPerson } from '../../../src/types' import { DependencyUnavailableError } from '../../../src/utils/db/error' import { createHub } from '../../../src/utils/db/hub' import { PostgresUse } from '../../../src/utils/db/postgres' import { defaultRetryConfig } from '../../../src/utils/retries' import { UUIDT } from '../../../src/utils/utils' -import { - DeferredPersonOverrideWorker, - DeferredPersonOverrideWriter, - FlatPersonOverrideWriter, - PersonState, -} from '../../../src/worker/ingestion/person-state' +import { PersonState } from '../../../src/worker/ingestion/person-state' import { uuidFromDistinctId } from '../../../src/worker/ingestion/person-uuid' import { delayUntilEventIngested } from '../../helpers/clickhouse' -import { WaitEvent } from '../../helpers/promises' import { createOrganization, createTeam, fetchPostgresPersons, insertRow } from '../../helpers/sql' jest.setTimeout(5000) // 5 sec timeout @@ -25,43 +18,11 @@ const timestamp = DateTime.fromISO('2020-01-01T12:00:05.200Z').toUTC() const timestamp2 = DateTime.fromISO('2020-02-02T12:00:05.200Z').toUTC() const timestampch = '2020-01-01 12:00:05.000' -interface PersonOverridesMode { - supportsSyncTransaction: boolean - getWriter(hub: Hub): DeferredPersonOverrideWriter - fetchPostgresPersonIdOverrides( - hub: Hub, - teamId: number - ): Promise> -} - -const PersonOverridesModes: Record = { - disabled: undefined, - 'deferred, without mappings (flat)': { - supportsSyncTransaction: false, - getWriter: (hub) => new DeferredPersonOverrideWriter(hub.db.postgres), - fetchPostgresPersonIdOverrides: async (hub, teamId) => { - const syncWriter = new FlatPersonOverrideWriter(hub.db.postgres) - await new DeferredPersonOverrideWorker( - hub.db.postgres, - hub.db.kafkaProducer, - syncWriter - ).processPendingOverrides() - return new Set( - (await syncWriter.getPersonOverrides(teamId)).map(({ old_person_id, override_person_id }) => ({ - old_person_id, - override_person_id, - })) - ) - }, - }, -} - describe('PersonState.update()', () => { let hub: Hub let closeHub: () => Promise let teamId: number - let overridesMode: PersonOverridesMode | undefined let organizationId: string // Common Distinct IDs (and their deterministic UUIDs) used in tests below. @@ -82,8 +43,6 @@ describe('PersonState.update()', () => { }) beforeEach(async () => { - overridesMode = undefined - teamId = await createTeam(hub.db.postgres, organizationId) newUserUuid = uuidFromDistinctId(teamId, newUserDistinctId) @@ -125,8 +84,7 @@ describe('PersonState.update()', () => { event.distinct_id!, timestampParam, processPerson, - customHub ? customHub.db : hub.db, - overridesMode?.getWriter(customHub ?? hub) + customHub ? customHub.db : hub.db ) } @@ -952,535 +910,529 @@ describe('PersonState.update()', () => { }) }) - describe.each(Object.keys(PersonOverridesModes))('on $identify event', (useOverridesMode) => { - beforeEach(() => { - overridesMode = PersonOverridesModes[useOverridesMode] // n.b. mutating outer scope here -- be careful + describe('on $identify event', () => { + it(`no-op when $anon_distinct_id not passed`, async () => { + const [person, kafkaAcks] = await personState({ + event: '$identify', + distinct_id: newUserDistinctId, + properties: { + $set: { foo: 'bar' }, + }, + }).handleIdentifyOrAlias() + await hub.db.kafkaProducer.flush() + await kafkaAcks + + expect(person).toEqual(undefined) + const persons = await fetchPostgresPersonsH() + expect(persons.length).toEqual(0) }) - describe(`overrides: ${useOverridesMode}`, () => { - it(`no-op when $anon_distinct_id not passed`, async () => { - const [person, kafkaAcks] = await personState({ - event: '$identify', - distinct_id: newUserDistinctId, - properties: { - $set: { foo: 'bar' }, - }, - }).handleIdentifyOrAlias() - await hub.db.kafkaProducer.flush() - await kafkaAcks + it(`creates person with both distinct_ids and marks user as is_identified when $anon_distinct_id passed`, async () => { + const [person, kafkaAcks] = await personState({ + event: '$identify', + distinct_id: newUserDistinctId, + properties: { + $set: { foo: 'bar' }, + $anon_distinct_id: oldUserDistinctId, + }, + }).handleIdentifyOrAlias() + await hub.db.kafkaProducer.flush() + await kafkaAcks - expect(person).toEqual(undefined) - const persons = await fetchPostgresPersonsH() - expect(persons.length).toEqual(0) - }) + expect(person).toEqual( + expect.objectContaining({ + id: expect.any(Number), + uuid: newUserUuid, + properties: { foo: 'bar' }, + created_at: timestamp, + version: 0, + is_identified: true, + }) + ) - it(`creates person with both distinct_ids and marks user as is_identified when $anon_distinct_id passed`, async () => { - const [person, kafkaAcks] = await personState({ - event: '$identify', - distinct_id: newUserDistinctId, - properties: { - $set: { foo: 'bar' }, - $anon_distinct_id: oldUserDistinctId, - }, - }).handleIdentifyOrAlias() - await hub.db.kafkaProducer.flush() - await kafkaAcks + expect(hub.db.updatePersonDeprecated).not.toHaveBeenCalled() - expect(person).toEqual( - expect.objectContaining({ - id: expect.any(Number), - uuid: newUserUuid, - properties: { foo: 'bar' }, - created_at: timestamp, - version: 0, - is_identified: true, - }) - ) + // verify Postgres persons + const persons = await fetchPostgresPersonsH() + expect(persons.length).toEqual(1) + expect(persons[0]).toEqual(person) - expect(hub.db.updatePersonDeprecated).not.toHaveBeenCalled() + // verify Postgres distinct_ids + const distinctIds = await hub.db.fetchDistinctIdValues(persons[0]) + expect(distinctIds).toEqual(expect.arrayContaining([oldUserDistinctId, newUserDistinctId])) + }) - // verify Postgres persons - const persons = await fetchPostgresPersonsH() - expect(persons.length).toEqual(1) - expect(persons[0]).toEqual(person) + it(`marks is_identified to be updated when no changes to distinct_ids but $anon_distinct_id passe`, async () => { + await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, false, newUserUuid, [ + { distinctId: newUserDistinctId }, + { distinctId: oldUserDistinctId }, + ]) - // verify Postgres distinct_ids - const distinctIds = await hub.db.fetchDistinctIdValues(persons[0]) - expect(distinctIds).toEqual(expect.arrayContaining([oldUserDistinctId, newUserDistinctId])) + const personS = personState({ + event: '$identify', + distinct_id: newUserDistinctId, + properties: { + $anon_distinct_id: oldUserDistinctId, + }, }) + const [person, kafkaAcks] = await personS.handleIdentifyOrAlias() + await hub.db.kafkaProducer.flush() + await kafkaAcks - it(`marks is_identified to be updated when no changes to distinct_ids but $anon_distinct_id passe`, async () => { - await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, false, newUserUuid, [ - { distinctId: newUserDistinctId }, - { distinctId: oldUserDistinctId }, - ]) - - const personS = personState({ - event: '$identify', - distinct_id: newUserDistinctId, - properties: { - $anon_distinct_id: oldUserDistinctId, - }, + expect(person).toEqual( + expect.objectContaining({ + id: expect.any(Number), + uuid: newUserUuid, + properties: {}, + created_at: timestamp, + version: 0, + is_identified: false, }) - const [person, kafkaAcks] = await personS.handleIdentifyOrAlias() - await hub.db.kafkaProducer.flush() - await kafkaAcks + ) + expect(personS.updateIsIdentified).toBeTruthy() - expect(person).toEqual( - expect.objectContaining({ - id: expect.any(Number), - uuid: newUserUuid, - properties: {}, - created_at: timestamp, - version: 0, - is_identified: false, - }) - ) - expect(personS.updateIsIdentified).toBeTruthy() + // verify Postgres persons + const persons = await fetchPostgresPersonsH() + expect(persons.length).toEqual(1) + expect(persons[0]).toEqual(person) + }) - // verify Postgres persons - const persons = await fetchPostgresPersonsH() - expect(persons.length).toEqual(1) - expect(persons[0]).toEqual(person) - }) + it(`add distinct id and marks user is_identified when passed $anon_distinct_id person does not exists and distinct_id does`, async () => { + await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, false, newUserUuid, [ + { distinctId: newUserDistinctId }, + ]) - it(`add distinct id and marks user is_identified when passed $anon_distinct_id person does not exists and distinct_id does`, async () => { - await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, false, newUserUuid, [ - { distinctId: newUserDistinctId }, - ]) + const personS = personState({ + event: '$identify', + distinct_id: newUserDistinctId, + properties: { + $anon_distinct_id: oldUserDistinctId, + }, + }) + const [person, kafkaAcks] = await personS.handleIdentifyOrAlias() + await hub.db.kafkaProducer.flush() + await kafkaAcks - const personS = personState({ - event: '$identify', - distinct_id: newUserDistinctId, - properties: { - $anon_distinct_id: oldUserDistinctId, - }, + const persons = await fetchPostgresPersonsH() + expect(person).toEqual( + expect.objectContaining({ + id: expect.any(Number), + uuid: newUserUuid, + properties: {}, + created_at: timestamp, + version: 0, + is_identified: false, }) - const [person, kafkaAcks] = await personS.handleIdentifyOrAlias() - await hub.db.kafkaProducer.flush() - await kafkaAcks + ) + expect(personS.updateIsIdentified).toBeTruthy() - const persons = await fetchPostgresPersonsH() - expect(person).toEqual( - expect.objectContaining({ - id: expect.any(Number), - uuid: newUserUuid, - properties: {}, - created_at: timestamp, - version: 0, - is_identified: false, - }) - ) - expect(personS.updateIsIdentified).toBeTruthy() + // verify Postgres persons + expect(persons.length).toEqual(1) + expect(persons[0]).toEqual(person) + + // verify Postgres distinct_ids + const distinctIds = await hub.db.fetchDistinctIdValues(persons[0]) + expect(distinctIds).toEqual(expect.arrayContaining([oldUserDistinctId, newUserDistinctId])) + }) - // verify Postgres persons - expect(persons.length).toEqual(1) - expect(persons[0]).toEqual(person) + it(`add distinct id and marks user as is_identified when passed $anon_distinct_id person exists and distinct_id does not`, async () => { + await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, false, oldUserUuid, [ + { distinctId: oldUserDistinctId }, + ]) - // verify Postgres distinct_ids - const distinctIds = await hub.db.fetchDistinctIdValues(persons[0]) - expect(distinctIds).toEqual(expect.arrayContaining([oldUserDistinctId, newUserDistinctId])) + const personS = personState({ + event: '$identify', + distinct_id: newUserDistinctId, + properties: { + $anon_distinct_id: oldUserDistinctId, + }, }) + const [person, kafkaAcks] = await personS.handleIdentifyOrAlias() + await hub.db.kafkaProducer.flush() + await kafkaAcks - it(`add distinct id and marks user as is_identified when passed $anon_distinct_id person exists and distinct_id does not`, async () => { - await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, false, oldUserUuid, [ - { distinctId: oldUserDistinctId }, - ]) + const persons = await fetchPostgresPersonsH() - const personS = personState({ - event: '$identify', - distinct_id: newUserDistinctId, - properties: { - $anon_distinct_id: oldUserDistinctId, - }, + expect(person).toEqual( + expect.objectContaining({ + id: expect.any(Number), + uuid: oldUserUuid, + properties: {}, + created_at: timestamp, + version: 0, + is_identified: false, }) - const [person, kafkaAcks] = await personS.handleIdentifyOrAlias() - await hub.db.kafkaProducer.flush() - await kafkaAcks - - const persons = await fetchPostgresPersonsH() - - expect(person).toEqual( - expect.objectContaining({ - id: expect.any(Number), - uuid: oldUserUuid, - properties: {}, - created_at: timestamp, - version: 0, - is_identified: false, - }) - ) - expect(personS.updateIsIdentified).toBeTruthy() - - // verify Postgres persons - expect(persons.length).toEqual(1) - expect(persons[0]).toEqual(person) - - // verify Postgres distinct_ids - const distinctIds = await hub.db.fetchDistinctIdValues(persons[0]) - expect(distinctIds).toEqual(expect.arrayContaining([oldUserDistinctId, newUserDistinctId])) - }) + ) + expect(personS.updateIsIdentified).toBeTruthy() - it(`merge into distinct_id person and marks user as is_identified when both persons have is_identified false`, async () => { - await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, false, oldUserUuid, [ - { distinctId: oldUserDistinctId }, - ]) - await hub.db.createPerson(timestamp2, {}, {}, {}, teamId, null, false, newUserUuid, [ - { distinctId: newUserDistinctId }, - ]) + // verify Postgres persons + expect(persons.length).toEqual(1) + expect(persons[0]).toEqual(person) - const [person, kafkaAcks] = await personState({ - event: '$identify', - distinct_id: newUserDistinctId, - properties: { - $anon_distinct_id: oldUserDistinctId, - }, - }).handleIdentifyOrAlias() - await hub.db.kafkaProducer.flush() - await kafkaAcks + // verify Postgres distinct_ids + const distinctIds = await hub.db.fetchDistinctIdValues(persons[0]) + expect(distinctIds).toEqual(expect.arrayContaining([oldUserDistinctId, newUserDistinctId])) + }) - expect(person).toEqual( - expect.objectContaining({ - id: expect.any(Number), - uuid: expect.any(String), - properties: {}, - created_at: timestamp, - version: 1, - is_identified: true, - }) - ) + it(`merge into distinct_id person and marks user as is_identified when both persons have is_identified false`, async () => { + await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, false, oldUserUuid, [ + { distinctId: oldUserDistinctId }, + ]) + await hub.db.createPerson(timestamp2, {}, {}, {}, teamId, null, false, newUserUuid, [ + { distinctId: newUserDistinctId }, + ]) - // verify Postgres persons - const persons = await fetchPostgresPersonsH() - expect(persons.length).toEqual(1) - expect(persons[0]).toEqual(person) - expect([newUserUuid, oldUserUuid]).toContain(persons[0].uuid) - - // verify Postgres distinct_ids - const distinctIds = await hub.db.fetchDistinctIdValues(persons[0]) - expect(distinctIds).toEqual(expect.arrayContaining([oldUserDistinctId, newUserDistinctId])) - - // verify ClickHouse persons - await delayUntilEventIngested(() => fetchPersonsRowsWithVersionHigerEqualThan(), 2) // wait until merge and delete processed - const clickhousePersons = await fetchPersonsRows() // but verify full state - expect(clickhousePersons.length).toEqual(2) - expect(clickhousePersons).toEqual( - expect.arrayContaining([ - expect.objectContaining({ - id: expect.any(String), - properties: '{}', - created_at: timestampch, - version: 1, - is_identified: 1, - }), - expect.objectContaining({ - id: expect.any(String), - is_deleted: 1, - version: 100, - }), - ]) - ) - expect(new Set(clickhousePersons.map((p) => p.id))).toEqual(new Set([newUserUuid, oldUserUuid])) + const [person, kafkaAcks] = await personState({ + event: '$identify', + distinct_id: newUserDistinctId, + properties: { + $anon_distinct_id: oldUserDistinctId, + }, + }).handleIdentifyOrAlias() + await hub.db.kafkaProducer.flush() + await kafkaAcks - // verify ClickHouse distinct_ids - await delayUntilEventIngested(() => fetchDistinctIdsClickhouseVersion1()) - const clickHouseDistinctIds = await fetchDistinctIdsClickhouse(persons[0]) - expect(clickHouseDistinctIds).toEqual(expect.arrayContaining([oldUserDistinctId, newUserDistinctId])) - }) + expect(person).toEqual( + expect.objectContaining({ + id: expect.any(Number), + uuid: expect.any(String), + properties: {}, + created_at: timestamp, + version: 1, + is_identified: true, + }) + ) - it(`merge into distinct_id person and marks user as is_identified when distinct_id user is identified and $anon_distinct_id user is not`, async () => { - await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, false, oldUserUuid, [ - { distinctId: oldUserDistinctId }, - ]) - await hub.db.createPerson(timestamp2, {}, {}, {}, teamId, null, true, newUserUuid, [ - { distinctId: newUserDistinctId }, - ]) + // verify Postgres persons + const persons = await fetchPostgresPersonsH() + expect(persons.length).toEqual(1) + expect(persons[0]).toEqual(person) + expect([newUserUuid, oldUserUuid]).toContain(persons[0].uuid) - const [person, kafkaAcks] = await personState({ - event: '$identify', - distinct_id: newUserDistinctId, - properties: { - $anon_distinct_id: oldUserDistinctId, - }, - }).handleIdentifyOrAlias() - await hub.db.kafkaProducer.flush() - await kafkaAcks + // verify Postgres distinct_ids + const distinctIds = await hub.db.fetchDistinctIdValues(persons[0]) + expect(distinctIds).toEqual(expect.arrayContaining([oldUserDistinctId, newUserDistinctId])) - expect(person).toEqual( + // verify ClickHouse persons + await delayUntilEventIngested(() => fetchPersonsRowsWithVersionHigerEqualThan(), 2) // wait until merge and delete processed + const clickhousePersons = await fetchPersonsRows() // but verify full state + expect(clickhousePersons.length).toEqual(2) + expect(clickhousePersons).toEqual( + expect.arrayContaining([ expect.objectContaining({ - id: expect.any(Number), - uuid: expect.any(String), - properties: {}, - created_at: timestamp, + id: expect.any(String), + properties: '{}', + created_at: timestampch, version: 1, - is_identified: true, - }) - ) + is_identified: 1, + }), + expect.objectContaining({ + id: expect.any(String), + is_deleted: 1, + version: 100, + }), + ]) + ) + expect(new Set(clickhousePersons.map((p) => p.id))).toEqual(new Set([newUserUuid, oldUserUuid])) - // verify Postgres persons - const persons = await fetchPostgresPersonsH() - expect(persons.length).toEqual(1) - expect(persons[0]).toEqual(person) - expect([newUserUuid, oldUserUuid]).toContain(persons[0].uuid) - - // verify Postgres distinct_ids - const distinctIds = await hub.db.fetchDistinctIdValues(persons[0]) - expect(distinctIds).toEqual(expect.arrayContaining([oldUserDistinctId, newUserDistinctId])) - - // verify ClickHouse persons - await delayUntilEventIngested(() => fetchPersonsRowsWithVersionHigerEqualThan(), 2) // wait until merge and delete processed - const clickhousePersons = await fetchPersonsRows() // but verify full state - expect(clickhousePersons.length).toEqual(2) - expect(clickhousePersons).toEqual( - expect.arrayContaining([ - expect.objectContaining({ - id: expect.any(String), - properties: '{}', - created_at: timestampch, - version: 1, - is_identified: 1, - }), - expect.objectContaining({ - id: expect.any(String), - is_deleted: 1, - version: 100, - }), - ]) - ) - expect(new Set(clickhousePersons.map((p) => p.id))).toEqual(new Set([newUserUuid, oldUserUuid])) + // verify ClickHouse distinct_ids + await delayUntilEventIngested(() => fetchDistinctIdsClickhouseVersion1()) + const clickHouseDistinctIds = await fetchDistinctIdsClickhouse(persons[0]) + expect(clickHouseDistinctIds).toEqual(expect.arrayContaining([oldUserDistinctId, newUserDistinctId])) + }) - // verify ClickHouse distinct_ids - await delayUntilEventIngested(() => fetchDistinctIdsClickhouseVersion1()) - const clickHouseDistinctIds = await fetchDistinctIdsClickhouse(persons[0]) - expect(clickHouseDistinctIds).toEqual(expect.arrayContaining([oldUserDistinctId, newUserDistinctId])) - }) + it(`merge into distinct_id person and marks user as is_identified when distinct_id user is identified and $anon_distinct_id user is not`, async () => { + await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, false, oldUserUuid, [ + { distinctId: oldUserDistinctId }, + ]) + await hub.db.createPerson(timestamp2, {}, {}, {}, teamId, null, true, newUserUuid, [ + { distinctId: newUserDistinctId }, + ]) - it(`does not merge people when distinct_id user is not identified and $anon_distinct_id user is`, async () => { - await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, true, oldUserUuid, [ - { distinctId: oldUserDistinctId }, - ]) - await hub.db.createPerson(timestamp2, {}, {}, {}, teamId, null, false, newUserUuid, [ - { distinctId: newUserDistinctId }, - ]) + const [person, kafkaAcks] = await personState({ + event: '$identify', + distinct_id: newUserDistinctId, + properties: { + $anon_distinct_id: oldUserDistinctId, + }, + }).handleIdentifyOrAlias() + await hub.db.kafkaProducer.flush() + await kafkaAcks - const personS = personState({ - event: '$identify', - distinct_id: newUserDistinctId, - properties: { - $anon_distinct_id: oldUserDistinctId, - }, + expect(person).toEqual( + expect.objectContaining({ + id: expect.any(Number), + uuid: expect.any(String), + properties: {}, + created_at: timestamp, + version: 1, + is_identified: true, }) - const [person, kafkaAcks] = await personS.handleIdentifyOrAlias() - await hub.db.kafkaProducer.flush() - await kafkaAcks + ) - expect(personS.updateIsIdentified).toBeTruthy() - expect(person).toEqual( - expect.objectContaining({ - id: expect.any(Number), - uuid: newUserUuid, - properties: {}, - created_at: timestamp2, - version: 0, - is_identified: false, - }) - ) + // verify Postgres persons + const persons = await fetchPostgresPersonsH() + expect(persons.length).toEqual(1) + expect(persons[0]).toEqual(person) + expect([newUserUuid, oldUserUuid]).toContain(persons[0].uuid) + + // verify Postgres distinct_ids + const distinctIds = await hub.db.fetchDistinctIdValues(persons[0]) + expect(distinctIds).toEqual(expect.arrayContaining([oldUserDistinctId, newUserDistinctId])) - // verify Postgres persons - const persons = (await fetchPostgresPersonsH()).sort((a, b) => a.id - b.id) - expect(persons.length).toEqual(2) - expect(persons[0]).toEqual( + // verify ClickHouse persons + await delayUntilEventIngested(() => fetchPersonsRowsWithVersionHigerEqualThan(), 2) // wait until merge and delete processed + const clickhousePersons = await fetchPersonsRows() // but verify full state + expect(clickhousePersons.length).toEqual(2) + expect(clickhousePersons).toEqual( + expect.arrayContaining([ expect.objectContaining({ - id: expect.any(Number), - uuid: oldUserUuid, - properties: {}, - created_at: timestamp, - version: 0, - is_identified: true, - }) - ) - expect(persons[1]).toEqual(person) + id: expect.any(String), + properties: '{}', + created_at: timestampch, + version: 1, + is_identified: 1, + }), + expect.objectContaining({ + id: expect.any(String), + is_deleted: 1, + version: 100, + }), + ]) + ) + expect(new Set(clickhousePersons.map((p) => p.id))).toEqual(new Set([newUserUuid, oldUserUuid])) - // verify Postgres distinct_ids - const distinctIds = await hub.db.fetchDistinctIdValues(persons[0]) - expect(distinctIds).toEqual(expect.arrayContaining([oldUserDistinctId])) - const distinctIds2 = await hub.db.fetchDistinctIdValues(persons[1]) - expect(distinctIds2).toEqual(expect.arrayContaining([newUserDistinctId])) - }) + // verify ClickHouse distinct_ids + await delayUntilEventIngested(() => fetchDistinctIdsClickhouseVersion1()) + const clickHouseDistinctIds = await fetchDistinctIdsClickhouse(persons[0]) + expect(clickHouseDistinctIds).toEqual(expect.arrayContaining([oldUserDistinctId, newUserDistinctId])) + }) - it(`does not merge people when both users are identified`, async () => { - await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, true, oldUserUuid, [ - { distinctId: oldUserDistinctId }, - ]) - await hub.db.createPerson(timestamp2, {}, {}, {}, teamId, null, true, newUserUuid, [ - { distinctId: newUserDistinctId }, - ]) + it(`does not merge people when distinct_id user is not identified and $anon_distinct_id user is`, async () => { + await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, true, oldUserUuid, [ + { distinctId: oldUserDistinctId }, + ]) + await hub.db.createPerson(timestamp2, {}, {}, {}, teamId, null, false, newUserUuid, [ + { distinctId: newUserDistinctId }, + ]) - const [person, kafkaAcks] = await personState({ - event: '$identify', - distinct_id: newUserDistinctId, - properties: { - $anon_distinct_id: oldUserDistinctId, - }, - }).handleIdentifyOrAlias() - await hub.db.kafkaProducer.flush() - await kafkaAcks + const personS = personState({ + event: '$identify', + distinct_id: newUserDistinctId, + properties: { + $anon_distinct_id: oldUserDistinctId, + }, + }) + const [person, kafkaAcks] = await personS.handleIdentifyOrAlias() + await hub.db.kafkaProducer.flush() + await kafkaAcks - expect(person).toEqual( - expect.objectContaining({ - id: expect.any(Number), - uuid: newUserUuid, - properties: {}, - created_at: timestamp2, - version: 0, - is_identified: true, - }) - ) + expect(personS.updateIsIdentified).toBeTruthy() + expect(person).toEqual( + expect.objectContaining({ + id: expect.any(Number), + uuid: newUserUuid, + properties: {}, + created_at: timestamp2, + version: 0, + is_identified: false, + }) + ) - // verify Postgres persons - const persons = (await fetchPostgresPersonsH()).sort((a, b) => a.id - b.id) - expect(persons.length).toEqual(2) - expect(persons[0]).toEqual( - expect.objectContaining({ - id: expect.any(Number), - uuid: oldUserUuid, - properties: {}, - created_at: timestamp, - version: 0, - is_identified: true, - }) - ) - expect(persons[1]).toEqual(person) + // verify Postgres persons + const persons = (await fetchPostgresPersonsH()).sort((a, b) => a.id - b.id) + expect(persons.length).toEqual(2) + expect(persons[0]).toEqual( + expect.objectContaining({ + id: expect.any(Number), + uuid: oldUserUuid, + properties: {}, + created_at: timestamp, + version: 0, + is_identified: true, + }) + ) + expect(persons[1]).toEqual(person) - // verify Postgres distinct_ids - const distinctIds = await hub.db.fetchDistinctIdValues(persons[0]) - expect(distinctIds).toEqual(expect.arrayContaining([oldUserDistinctId])) - const distinctIds2 = await hub.db.fetchDistinctIdValues(persons[1]) - expect(distinctIds2).toEqual(expect.arrayContaining([newUserDistinctId])) - }) + // verify Postgres distinct_ids + const distinctIds = await hub.db.fetchDistinctIdValues(persons[0]) + expect(distinctIds).toEqual(expect.arrayContaining([oldUserDistinctId])) + const distinctIds2 = await hub.db.fetchDistinctIdValues(persons[1]) + expect(distinctIds2).toEqual(expect.arrayContaining([newUserDistinctId])) + }) - it(`merge into distinct_id person and updates properties with $set/$set_once`, async () => { - await hub.db.createPerson(timestamp, { a: 1, b: 2 }, {}, {}, teamId, null, false, oldUserUuid, [ - { distinctId: oldUserDistinctId }, - ]) - await hub.db.createPerson(timestamp2, { b: 3, c: 4, d: 5 }, {}, {}, teamId, null, false, newUserUuid, [ - { distinctId: newUserDistinctId }, - ]) + it(`does not merge people when both users are identified`, async () => { + await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, true, oldUserUuid, [ + { distinctId: oldUserDistinctId }, + ]) + await hub.db.createPerson(timestamp2, {}, {}, {}, teamId, null, true, newUserUuid, [ + { distinctId: newUserDistinctId }, + ]) - const [person, kafkaAcks] = await personState({ - event: '$identify', - distinct_id: newUserDistinctId, - properties: { - $set: { d: 6, e: 7 }, - $set_once: { a: 8, f: 9 }, - $anon_distinct_id: oldUserDistinctId, - }, - }).handleIdentifyOrAlias() - await hub.db.kafkaProducer.flush() - await kafkaAcks + const [person, kafkaAcks] = await personState({ + event: '$identify', + distinct_id: newUserDistinctId, + properties: { + $anon_distinct_id: oldUserDistinctId, + }, + }).handleIdentifyOrAlias() + await hub.db.kafkaProducer.flush() + await kafkaAcks - expect(person).toEqual( - expect.objectContaining({ - id: expect.any(Number), - uuid: expect.any(String), - properties: { a: 1, b: 3, c: 4, d: 6, e: 7, f: 9 }, - created_at: timestamp, - version: 1, - is_identified: true, - }) - ) + expect(person).toEqual( + expect.objectContaining({ + id: expect.any(Number), + uuid: newUserUuid, + properties: {}, + created_at: timestamp2, + version: 0, + is_identified: true, + }) + ) - // verify Postgres persons - const persons = await fetchPostgresPersonsH() - expect(persons.length).toEqual(1) - expect(persons[0]).toEqual(person) - expect([newUserUuid, oldUserUuid]).toContain(persons[0].uuid) - - // verify Postgres distinct_ids - const distinctIds = await hub.db.fetchDistinctIdValues(persons[0]) - expect(distinctIds).toEqual(expect.arrayContaining([oldUserDistinctId, newUserDistinctId])) - - // verify ClickHouse persons - await delayUntilEventIngested(() => fetchPersonsRowsWithVersionHigerEqualThan(), 2) // wait until merge and delete processed - const clickhousePersons = await fetchPersonsRows() // but verify full state - expect(clickhousePersons.length).toEqual(2) - expect(clickhousePersons).toEqual( - expect.arrayContaining([ - expect.objectContaining({ - id: expect.any(String), - properties: JSON.stringify({ a: 1, b: 3, c: 4, d: 6, e: 7, f: 9 }), - created_at: timestampch, - version: 1, - is_identified: 1, - }), - expect.objectContaining({ - id: expect.any(String), - is_deleted: 1, - version: 100, - }), - ]) - ) - expect(new Set(clickhousePersons.map((p) => p.id))).toEqual(new Set([newUserUuid, oldUserUuid])) + // verify Postgres persons + const persons = (await fetchPostgresPersonsH()).sort((a, b) => a.id - b.id) + expect(persons.length).toEqual(2) + expect(persons[0]).toEqual( + expect.objectContaining({ + id: expect.any(Number), + uuid: oldUserUuid, + properties: {}, + created_at: timestamp, + version: 0, + is_identified: true, + }) + ) + expect(persons[1]).toEqual(person) - // verify ClickHouse distinct_ids - await delayUntilEventIngested(() => fetchDistinctIdsClickhouseVersion1()) - const clickHouseDistinctIds = await fetchDistinctIdsClickhouse(persons[0]) - expect(clickHouseDistinctIds).toEqual(expect.arrayContaining([oldUserDistinctId, newUserDistinctId])) - }) + // verify Postgres distinct_ids + const distinctIds = await hub.db.fetchDistinctIdValues(persons[0]) + expect(distinctIds).toEqual(expect.arrayContaining([oldUserDistinctId])) + const distinctIds2 = await hub.db.fetchDistinctIdValues(persons[1]) + expect(distinctIds2).toEqual(expect.arrayContaining([newUserDistinctId])) + }) - it(`handles race condition when other thread creates the user`, async () => { - await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, false, oldUserUuid, [ - { distinctId: oldUserDistinctId }, - ]) + it(`merge into distinct_id person and updates properties with $set/$set_once`, async () => { + await hub.db.createPerson(timestamp, { a: 1, b: 2 }, {}, {}, teamId, null, false, oldUserUuid, [ + { distinctId: oldUserDistinctId }, + ]) + await hub.db.createPerson(timestamp2, { b: 3, c: 4, d: 5 }, {}, {}, teamId, null, false, newUserUuid, [ + { distinctId: newUserDistinctId }, + ]) - // Fake the race by assuming createPerson was called before the addDistinctId creation above - jest.spyOn(hub.db, 'addDistinctId').mockImplementation(async (person, distinctId) => { - await hub.db.createPerson( - timestamp, - {}, - {}, - {}, - teamId, - null, - false, - uuidFromDistinctId(teamId, distinctId), - [{ distinctId }] - ) - await hub.db.addDistinctId(person, distinctId, 0) // this throws + const [person, kafkaAcks] = await personState({ + event: '$identify', + distinct_id: newUserDistinctId, + properties: { + $set: { d: 6, e: 7 }, + $set_once: { a: 8, f: 9 }, + $anon_distinct_id: oldUserDistinctId, + }, + }).handleIdentifyOrAlias() + await hub.db.kafkaProducer.flush() + await kafkaAcks + + expect(person).toEqual( + expect.objectContaining({ + id: expect.any(Number), + uuid: expect.any(String), + properties: { a: 1, b: 3, c: 4, d: 6, e: 7, f: 9 }, + created_at: timestamp, + version: 1, + is_identified: true, }) + ) - const [person, kafkaAcks] = await personState({ - event: '$identify', - distinct_id: oldUserDistinctId, - properties: { - $anon_distinct_id: newUserDistinctId, - }, - }).handleIdentifyOrAlias() - await hub.db.kafkaProducer.flush() - await kafkaAcks - jest.spyOn(hub.db, 'addDistinctId').mockRestore() // Necessary for other tests not to fail + // verify Postgres persons + const persons = await fetchPostgresPersonsH() + expect(persons.length).toEqual(1) + expect(persons[0]).toEqual(person) + expect([newUserUuid, oldUserUuid]).toContain(persons[0].uuid) - // if creation fails we should return the person that another thread already created - expect(person).toEqual( + // verify Postgres distinct_ids + const distinctIds = await hub.db.fetchDistinctIdValues(persons[0]) + expect(distinctIds).toEqual(expect.arrayContaining([oldUserDistinctId, newUserDistinctId])) + + // verify ClickHouse persons + await delayUntilEventIngested(() => fetchPersonsRowsWithVersionHigerEqualThan(), 2) // wait until merge and delete processed + const clickhousePersons = await fetchPersonsRows() // but verify full state + expect(clickhousePersons.length).toEqual(2) + expect(clickhousePersons).toEqual( + expect.arrayContaining([ expect.objectContaining({ - id: expect.any(Number), - uuid: oldUserUuid, - properties: {}, - created_at: timestamp, + id: expect.any(String), + properties: JSON.stringify({ a: 1, b: 3, c: 4, d: 6, e: 7, f: 9 }), + created_at: timestampch, version: 1, - is_identified: true, - }) + is_identified: 1, + }), + expect.objectContaining({ + id: expect.any(String), + is_deleted: 1, + version: 100, + }), + ]) + ) + expect(new Set(clickhousePersons.map((p) => p.id))).toEqual(new Set([newUserUuid, oldUserUuid])) + + // verify ClickHouse distinct_ids + await delayUntilEventIngested(() => fetchDistinctIdsClickhouseVersion1()) + const clickHouseDistinctIds = await fetchDistinctIdsClickhouse(persons[0]) + expect(clickHouseDistinctIds).toEqual(expect.arrayContaining([oldUserDistinctId, newUserDistinctId])) + }) + + it(`handles race condition when other thread creates the user`, async () => { + await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, false, oldUserUuid, [ + { distinctId: oldUserDistinctId }, + ]) + + // Fake the race by assuming createPerson was called before the addDistinctId creation above + jest.spyOn(hub.db, 'addDistinctId').mockImplementation(async (person, distinctId) => { + await hub.db.createPerson( + timestamp, + {}, + {}, + {}, + teamId, + null, + false, + uuidFromDistinctId(teamId, distinctId), + [{ distinctId }] ) - // expect(hub.db.updatePersonDeprecated).not.toHaveBeenCalled() - // verify Postgres persons - const persons = await fetchPostgresPersonsH() - expect(persons.length).toEqual(1) - expect(persons[0]).toEqual(person) - - // verify Postgres distinct_ids - const distinctIds = await hub.db.fetchDistinctIdValues(persons[0]) - expect(distinctIds).toEqual(expect.arrayContaining([newUserDistinctId])) + await hub.db.addDistinctId(person, distinctId, 0) // this throws }) + + const [person, kafkaAcks] = await personState({ + event: '$identify', + distinct_id: oldUserDistinctId, + properties: { + $anon_distinct_id: newUserDistinctId, + }, + }).handleIdentifyOrAlias() + await hub.db.kafkaProducer.flush() + await kafkaAcks + jest.spyOn(hub.db, 'addDistinctId').mockRestore() // Necessary for other tests not to fail + + // if creation fails we should return the person that another thread already created + expect(person).toEqual( + expect.objectContaining({ + id: expect.any(Number), + uuid: oldUserUuid, + properties: {}, + created_at: timestamp, + version: 1, + is_identified: true, + }) + ) + // expect(hub.db.updatePersonDeprecated).not.toHaveBeenCalled() + // verify Postgres persons + const persons = await fetchPostgresPersonsH() + expect(persons.length).toEqual(1) + expect(persons[0]).toEqual(person) + + // verify Postgres distinct_ids + const distinctIds = await hub.db.fetchDistinctIdValues(persons[0]) + expect(distinctIds).toEqual(expect.arrayContaining([newUserDistinctId])) }) }) @@ -1540,90 +1492,84 @@ describe('PersonState.update()', () => { }) }) - describe.each(Object.keys(PersonOverridesModes))('on $merge_dangerously events', (useOverridesMode) => { - beforeEach(() => { - overridesMode = PersonOverridesModes[useOverridesMode] // n.b. mutating outer scope here -- be careful - }) - - describe(`overrides: ${useOverridesMode}`, () => { - // only difference between $merge_dangerously and $identify - it(`merge_dangerously can merge people when alias id user is identified`, async () => { - await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, true, oldUserUuid, [ - { distinctId: oldUserDistinctId }, - ]) - await hub.db.createPerson(timestamp2, {}, {}, {}, teamId, null, true, newUserUuid, [ - { distinctId: newUserDistinctId }, - ]) - - const [person, kafkaAcks] = await personState({ - event: '$merge_dangerously', - distinct_id: newUserDistinctId, - properties: { - alias: oldUserDistinctId, - }, - }).handleIdentifyOrAlias() - await hub.db.kafkaProducer.flush() - await kafkaAcks - - expect(person).toEqual( - expect.objectContaining({ - id: expect.any(Number), - uuid: expect.any(String), - properties: {}, - created_at: timestamp, - version: 1, - is_identified: true, - }) - ) - - // verify Postgres persons - const persons = await fetchPostgresPersonsH() - expect(persons.length).toEqual(1) - expect(persons[0]).toEqual(person) - expect([newUserUuid, oldUserUuid]).toContain(persons[0].uuid) - - // verify Postgres distinct_ids - const distinctIds = await hub.db.fetchDistinctIdValues(persons[0]) - expect(distinctIds).toEqual(expect.arrayContaining([oldUserDistinctId, newUserDistinctId])) - - // verify ClickHouse persons - await delayUntilEventIngested(() => fetchPersonsRowsWithVersionHigerEqualThan(), 2) // wait until merge and delete processed - const clickhousePersons = await fetchPersonsRows() // but verify full state - expect(clickhousePersons.length).toEqual(2) - expect(clickhousePersons).toEqual( - expect.arrayContaining([ - expect.objectContaining({ - id: expect.any(String), - properties: '{}', - created_at: timestampch, - version: 1, - is_identified: 1, - }), - expect.objectContaining({ - id: expect.any(String), - is_deleted: 1, - version: 100, - }), - ]) - ) - expect(new Set(clickhousePersons.map((p) => p.id))).toEqual(new Set([newUserUuid, oldUserUuid])) - - // verify ClickHouse distinct_ids - await delayUntilEventIngested(() => fetchDistinctIdsClickhouseVersion1()) - const clickHouseDistinctIds = await fetchDistinctIdsClickhouse(persons[0]) - expect(clickHouseDistinctIds).toEqual(expect.arrayContaining([oldUserDistinctId, newUserDistinctId])) - }) - }) - }) + describe('on $merge_dangerously events', () => { + // only difference between $merge_dangerously and $identify + it(`merge_dangerously can merge people when alias id user is identified`, async () => { + await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, true, oldUserUuid, [ + { distinctId: oldUserDistinctId }, + ]) + await hub.db.createPerson(timestamp2, {}, {}, {}, teamId, null, true, newUserUuid, [ + { distinctId: newUserDistinctId }, + ]) - describe('illegal aliasing', () => { - const illegalIds = ['', ' ', 'null', 'undefined', '"undefined"', '[object Object]', '"[object Object]"'] - it.each(illegalIds)('stops $identify if current distinct_id is illegal: `%s`', async (illegalId: string) => { - const [person] = await personState({ - event: '$identify', - distinct_id: illegalId, + const [person, kafkaAcks] = await personState({ + event: '$merge_dangerously', + distinct_id: newUserDistinctId, properties: { - $anon_distinct_id: 'anonymous_id', + alias: oldUserDistinctId, + }, + }).handleIdentifyOrAlias() + await hub.db.kafkaProducer.flush() + await kafkaAcks + + expect(person).toEqual( + expect.objectContaining({ + id: expect.any(Number), + uuid: expect.any(String), + properties: {}, + created_at: timestamp, + version: 1, + is_identified: true, + }) + ) + + // verify Postgres persons + const persons = await fetchPostgresPersonsH() + expect(persons.length).toEqual(1) + expect(persons[0]).toEqual(person) + expect([newUserUuid, oldUserUuid]).toContain(persons[0].uuid) + + // verify Postgres distinct_ids + const distinctIds = await hub.db.fetchDistinctIdValues(persons[0]) + expect(distinctIds).toEqual(expect.arrayContaining([oldUserDistinctId, newUserDistinctId])) + + // verify ClickHouse persons + await delayUntilEventIngested(() => fetchPersonsRowsWithVersionHigerEqualThan(), 2) // wait until merge and delete processed + const clickhousePersons = await fetchPersonsRows() // but verify full state + expect(clickhousePersons.length).toEqual(2) + expect(clickhousePersons).toEqual( + expect.arrayContaining([ + expect.objectContaining({ + id: expect.any(String), + properties: '{}', + created_at: timestampch, + version: 1, + is_identified: 1, + }), + expect.objectContaining({ + id: expect.any(String), + is_deleted: 1, + version: 100, + }), + ]) + ) + expect(new Set(clickhousePersons.map((p) => p.id))).toEqual(new Set([newUserUuid, oldUserUuid])) + + // verify ClickHouse distinct_ids + await delayUntilEventIngested(() => fetchDistinctIdsClickhouseVersion1()) + const clickHouseDistinctIds = await fetchDistinctIdsClickhouse(persons[0]) + expect(clickHouseDistinctIds).toEqual(expect.arrayContaining([oldUserDistinctId, newUserDistinctId])) + }) + }) + + describe('illegal aliasing', () => { + const illegalIds = ['', ' ', 'null', 'undefined', '"undefined"', '[object Object]', '"[object Object]"'] + it.each(illegalIds)('stops $identify if current distinct_id is illegal: `%s`', async (illegalId: string) => { + const [person] = await personState({ + event: '$identify', + distinct_id: illegalId, + properties: { + $anon_distinct_id: 'anonymous_id', }, }).handleIdentifyOrAlias() @@ -1911,7 +1857,7 @@ describe('PersonState.update()', () => { ) }) }) - describe.each(Object.keys(PersonOverridesModes))('on persons merges', (useOverridesMode) => { + describe('on persons merges', () => { // For some reason these tests failed if I ran them with a hub shared // with other tests, so I'm creating a new hub for each test. let hub: Hub @@ -1919,7 +1865,6 @@ describe('PersonState.update()', () => { beforeEach(async () => { ;[hub, closeHub] = await createHub({}) - overridesMode = PersonOverridesModes[useOverridesMode] // n.b. mutating outer scope here -- be careful jest.spyOn(hub.db, 'fetchPerson') jest.spyOn(hub.db, 'updatePersonDeprecated') @@ -1928,929 +1873,282 @@ describe('PersonState.update()', () => { afterEach(async () => { await closeHub() }) - describe(`overrides: ${useOverridesMode}`, () => { - it(`no-op if persons already merged`, async () => { - await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, true, firstUserUuid, [ - { distinctId: firstUserDistinctId }, - { distinctId: secondUserDistinctId }, - ]) - const state: PersonState = personState({}, hub) - jest.spyOn(hub.db.kafkaProducer, 'queueMessages') - const [person, kafkaAcks] = await state.merge( - secondUserDistinctId, - firstUserDistinctId, - teamId, - timestamp - ) - await hub.db.kafkaProducer.flush() - await kafkaAcks - - expect(person).toEqual( - expect.objectContaining({ - id: expect.any(Number), - uuid: firstUserUuid, - properties: {}, - created_at: timestamp, - version: 0, - is_identified: true, - }) - ) - expect(hub.db.updatePersonDeprecated).not.toHaveBeenCalled() - expect(hub.db.kafkaProducer.queueMessages).not.toHaveBeenCalled() - }) - it(`postgres and clickhouse get updated`, async () => { - const first: InternalPerson = await hub.db.createPerson( - timestamp, - {}, - {}, - {}, - teamId, - null, - false, - firstUserUuid, - [{ distinctId: firstUserDistinctId }] - ) - const second: InternalPerson = await hub.db.createPerson( - timestamp, - {}, - {}, - {}, - teamId, - null, - false, - secondUserUuid, - [{ distinctId: secondUserDistinctId }] - ) + it(`no-op if persons already merged`, async () => { + await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, true, firstUserUuid, [ + { distinctId: firstUserDistinctId }, + { distinctId: secondUserDistinctId }, + ]) + const state: PersonState = personState({}, hub) + jest.spyOn(hub.db.kafkaProducer, 'queueMessages') + const [person, kafkaAcks] = await state.merge(secondUserDistinctId, firstUserDistinctId, teamId, timestamp) + await hub.db.kafkaProducer.flush() + await kafkaAcks - const state: PersonState = personState({}, hub) - jest.spyOn(hub.db.kafkaProducer, 'queueMessages') - const [person, kafkaAcks] = await state.mergePeople({ - mergeInto: first, - mergeIntoDistinctId: firstUserDistinctId, - otherPerson: second, - otherPersonDistinctId: secondUserDistinctId, + expect(person).toEqual( + expect.objectContaining({ + id: expect.any(Number), + uuid: firstUserUuid, + properties: {}, + created_at: timestamp, + version: 0, + is_identified: true, }) - await hub.db.kafkaProducer.flush() - await kafkaAcks - - expect(person).toEqual( - expect.objectContaining({ - id: expect.any(Number), - uuid: firstUserUuid, - properties: {}, - created_at: timestamp, - version: 1, - is_identified: true, - }) - ) - - expect(hub.db.updatePersonDeprecated).toHaveBeenCalledTimes(1) - expect(hub.db.kafkaProducer.queueMessages).toHaveBeenCalledTimes(1) - // verify Postgres persons - const persons = await fetchPostgresPersonsH() - expect(persons.length).toEqual(1) - expect(persons[0]).toEqual(person) - - // verify Postgres distinct_ids - const distinctIds = await hub.db.fetchDistinctIdValues(person) - expect(distinctIds).toEqual(expect.arrayContaining([firstUserDistinctId, secondUserDistinctId])) - - // verify ClickHouse persons - await delayUntilEventIngested(() => fetchPersonsRowsWithVersionHigerEqualThan(), 2) // wait until merge and delete processed - const clickhousePersons = await fetchPersonsRows() // but verify full state - expect(clickhousePersons).toEqual( - expect.arrayContaining([ - expect.objectContaining({ - id: firstUserUuid, - properties: '{}', - created_at: timestampch, - version: 1, - is_identified: 1, - }), - expect.objectContaining({ - id: secondUserUuid, - is_deleted: 1, - version: 100, - }), - ]) - ) + ) + expect(hub.db.updatePersonDeprecated).not.toHaveBeenCalled() + expect(hub.db.kafkaProducer.queueMessages).not.toHaveBeenCalled() + }) - // verify ClickHouse distinct_ids - await delayUntilEventIngested(() => fetchDistinctIdsClickhouseVersion1()) - const clickHouseDistinctIds = await fetchDistinctIdsClickhouse(person) - expect(clickHouseDistinctIds).toEqual( - expect.arrayContaining([firstUserDistinctId, secondUserDistinctId]) - ) + it(`postgres and clickhouse get updated`, async () => { + const first: InternalPerson = await hub.db.createPerson( + timestamp, + {}, + {}, + {}, + teamId, + null, + false, + firstUserUuid, + [{ distinctId: firstUserDistinctId }] + ) + const second: InternalPerson = await hub.db.createPerson( + timestamp, + {}, + {}, + {}, + teamId, + null, + false, + secondUserUuid, + [{ distinctId: secondUserDistinctId }] + ) - // verify Postgres person_id overrides, if applicable - if (overridesMode) { - const overrides = await overridesMode.fetchPostgresPersonIdOverrides(hub, teamId) - expect(overrides).toEqual(new Set([{ old_person_id: second.uuid, override_person_id: first.uuid }])) - // & CH person overrides - // TODO - } + const state: PersonState = personState({}, hub) + jest.spyOn(hub.db.kafkaProducer, 'queueMessages') + const [person, kafkaAcks] = await state.mergePeople({ + mergeInto: first, + mergeIntoDistinctId: firstUserDistinctId, + otherPerson: second, + otherPersonDistinctId: secondUserDistinctId, }) + await hub.db.kafkaProducer.flush() + await kafkaAcks - it(`throws if postgres unavailable`, async () => { - const first: InternalPerson = await hub.db.createPerson( - timestamp, - {}, - {}, - {}, - teamId, - null, - false, - firstUserUuid, - [{ distinctId: firstUserDistinctId }] - ) - const second: InternalPerson = await hub.db.createPerson( - timestamp, - {}, - {}, - {}, - teamId, - null, - false, - secondUserUuid, - [{ distinctId: secondUserDistinctId }] - ) - - const state: PersonState = personState({}, hub) - // break postgres - const error = new DependencyUnavailableError('testing', 'Postgres', new Error('test')) - jest.spyOn(hub.db.postgres, 'transaction').mockImplementation(() => { - throw error + expect(person).toEqual( + expect.objectContaining({ + id: expect.any(Number), + uuid: firstUserUuid, + properties: {}, + created_at: timestamp, + version: 1, + is_identified: true, }) - jest.spyOn(hub.db.kafkaProducer, 'queueMessages') - await expect( - state.mergePeople({ - mergeInto: first, - mergeIntoDistinctId: firstUserDistinctId, - otherPerson: second, - otherPersonDistinctId: secondUserDistinctId, - }) - ).rejects.toThrow(error) - await hub.db.kafkaProducer.flush() - - expect(hub.db.postgres.transaction).toHaveBeenCalledTimes(1) - jest.spyOn(hub.db.postgres, 'transaction').mockRestore() - expect(hub.db.kafkaProducer.queueMessages).not.toBeCalled() - // verify Postgres persons - const persons = await fetchPostgresPersonsH() - expect(persons).toEqual( - expect.arrayContaining([ - expect.objectContaining({ - id: expect.any(Number), - uuid: firstUserUuid, - properties: {}, - created_at: timestamp, - version: 0, - is_identified: false, - }), - expect.objectContaining({ - id: expect.any(Number), - uuid: secondUserUuid, - properties: {}, - created_at: timestamp, - version: 0, - is_identified: false, - }), - ]) - ) - }) - - it(`retries merges up to retry limit if postgres down`, async () => { - await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, false, firstUserUuid, [ - { distinctId: firstUserDistinctId }, - ]) - await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, false, secondUserUuid, [ - { distinctId: secondUserDistinctId }, - ]) + ) - const state: PersonState = personState({}, hub) - // break postgres - const error = new DependencyUnavailableError('testing', 'Postgres', new Error('test')) - jest.spyOn(state, 'mergePeople').mockImplementation(() => { - throw error - }) - jest.spyOn(hub.db.kafkaProducer, 'queueMessages') - await expect(state.merge(secondUserDistinctId, firstUserDistinctId, teamId, timestamp)).rejects.toThrow( - error - ) + expect(hub.db.updatePersonDeprecated).toHaveBeenCalledTimes(1) + expect(hub.db.kafkaProducer.queueMessages).toHaveBeenCalledTimes(1) + // verify Postgres persons + const persons = await fetchPostgresPersonsH() + expect(persons.length).toEqual(1) + expect(persons[0]).toEqual(person) - await hub.db.kafkaProducer.flush() - - expect(state.mergePeople).toHaveBeenCalledTimes(3) - jest.spyOn(state, 'mergePeople').mockRestore() - expect(hub.db.kafkaProducer.queueMessages).not.toBeCalled() - // verify Postgres persons - const persons = await fetchPostgresPersonsH() - expect(persons).toEqual( - expect.arrayContaining([ - expect.objectContaining({ - id: expect.any(Number), - uuid: firstUserUuid, - properties: {}, - created_at: timestamp, - version: 0, - is_identified: false, - }), - expect.objectContaining({ - id: expect.any(Number), - uuid: secondUserUuid, - properties: {}, - created_at: timestamp, - version: 0, - is_identified: false, - }), - ]) - ) - }) + // verify Postgres distinct_ids + const distinctIds = await hub.db.fetchDistinctIdValues(person) + expect(distinctIds).toEqual(expect.arrayContaining([firstUserDistinctId, secondUserDistinctId])) - it(`handleIdentifyOrAlias does not throw on merge failure`, async () => { - // TODO: This the current state, we should probably change it - await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, false, firstUserUuid, [ - { distinctId: firstUserDistinctId }, - ]) - await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, false, secondUserUuid, [ - { distinctId: secondUserDistinctId }, + // verify ClickHouse persons + await delayUntilEventIngested(() => fetchPersonsRowsWithVersionHigerEqualThan(), 2) // wait until merge and delete processed + const clickhousePersons = await fetchPersonsRows() // but verify full state + expect(clickhousePersons).toEqual( + expect.arrayContaining([ + expect.objectContaining({ + id: firstUserUuid, + properties: '{}', + created_at: timestampch, + version: 1, + is_identified: 1, + }), + expect.objectContaining({ + id: secondUserUuid, + is_deleted: 1, + version: 100, + }), ]) + ) - const state: PersonState = personState( - { - event: '$merge_dangerously', - distinct_id: firstUserDistinctId, - properties: { alias: secondUserDistinctId }, - }, - hub - ) - // break postgres - const error = new DependencyUnavailableError('testing', 'Postgres', new Error('test')) - jest.spyOn(state, 'mergePeople').mockImplementation(() => { - throw error - }) - jest.spyOn(hub.db.kafkaProducer, 'queueMessages') - await state.handleIdentifyOrAlias() - await hub.db.kafkaProducer.flush() - - expect(state.mergePeople).toHaveBeenCalledTimes(3) - jest.spyOn(state, 'mergePeople').mockRestore() - expect(hub.db.kafkaProducer.queueMessages).not.toBeCalled() - // verify Postgres persons - const persons = await fetchPostgresPersonsH() - expect(persons).toEqual( - expect.arrayContaining([ - expect.objectContaining({ - id: expect.any(Number), - uuid: firstUserUuid, - properties: {}, - created_at: timestamp, - version: 0, - is_identified: false, - }), - expect.objectContaining({ - id: expect.any(Number), - uuid: secondUserUuid, - properties: {}, - created_at: timestamp, - version: 0, - is_identified: false, - }), - ]) - ) - }) - - it(`does not commit partial transactions on override conflicts`, async () => { - if (!overridesMode?.supportsSyncTransaction) { - return - } - const first: InternalPerson = await hub.db.createPerson( - timestamp, - {}, - {}, - {}, - teamId, - null, - false, - firstUserUuid, - [{ distinctId: firstUserDistinctId }] - ) - const second: InternalPerson = await hub.db.createPerson( - timestamp, - {}, - {}, - {}, - teamId, - null, - false, - secondUserUuid, - [{ distinctId: secondUserDistinctId }] - ) - - const state: PersonState = personState({}, hub) - const originalPostgresQuery = hub.db.postgres.query.bind(hub.db.postgres) - const error = new Error('Conflict') - const mockPostgresQuery = jest - .spyOn(hub.db.postgres, 'query') - .mockImplementation( - async ( - use: PostgresUse, - query: any, - values: any[] | undefined, - tag: string, - ...args: any[] - ) => { - if (tag === 'transitivePersonOverrides') { - throw error - } - return await originalPostgresQuery(use, query, values, tag, ...args) - } - ) - - jest.spyOn(hub.db.kafkaProducer, 'queueMessages') - await expect( - state.mergePeople({ - mergeInto: first, - mergeIntoDistinctId: firstUserDistinctId, - otherPerson: second, - otherPersonDistinctId: secondUserDistinctId, - }) - ).rejects.toThrow(error) - await hub.db.kafkaProducer.flush() - - // verify Postgres persons - const personsAfterFailure = await fetchPostgresPersonsH() - expect(personsAfterFailure).toEqual( - expect.arrayContaining([ - expect.objectContaining({ - id: expect.any(Number), - uuid: firstUserUuid, - properties: {}, - created_at: timestamp, - version: 0, - is_identified: false, - }), - expect.objectContaining({ - id: expect.any(Number), - uuid: secondUserUuid, - properties: {}, - created_at: timestamp, - version: 0, - is_identified: false, - }), - ]) - ) - - // verify Postgres distinct_ids - const distinctIdsAfterFailure = [ - await hub.db.fetchDistinctIdValues(personsAfterFailure[0]), - await hub.db.fetchDistinctIdValues(personsAfterFailure[1]), - ] - expect(distinctIdsAfterFailure).toEqual( - expect.arrayContaining([[firstUserDistinctId], [secondUserDistinctId]]) - ) + // verify ClickHouse distinct_ids + await delayUntilEventIngested(() => fetchDistinctIdsClickhouseVersion1()) + const clickHouseDistinctIds = await fetchDistinctIdsClickhouse(person) + expect(clickHouseDistinctIds).toEqual(expect.arrayContaining([firstUserDistinctId, secondUserDistinctId])) + }) - // verify Postgres person_id overrides - const overridesAfterFailure = await overridesMode!.fetchPostgresPersonIdOverrides(hub, teamId) - expect(overridesAfterFailure).toEqual(new Set()) + it(`throws if postgres unavailable`, async () => { + const first: InternalPerson = await hub.db.createPerson( + timestamp, + {}, + {}, + {}, + teamId, + null, + false, + firstUserUuid, + [{ distinctId: firstUserDistinctId }] + ) + const second: InternalPerson = await hub.db.createPerson( + timestamp, + {}, + {}, + {}, + teamId, + null, + false, + secondUserUuid, + [{ distinctId: secondUserDistinctId }] + ) - // Now verify we successfully get to our target state if we do not have - // any db errors. - mockPostgresQuery.mockRestore() - const [person, kafkaAcks] = await state.mergePeople({ + const state: PersonState = personState({}, hub) + // break postgres + const error = new DependencyUnavailableError('testing', 'Postgres', new Error('test')) + jest.spyOn(hub.db.postgres, 'transaction').mockImplementation(() => { + throw error + }) + jest.spyOn(hub.db.kafkaProducer, 'queueMessages') + await expect( + state.mergePeople({ mergeInto: first, mergeIntoDistinctId: firstUserDistinctId, otherPerson: second, otherPersonDistinctId: secondUserDistinctId, }) - await hub.db.kafkaProducer.flush() - await kafkaAcks + ).rejects.toThrow(error) + await hub.db.kafkaProducer.flush() - expect(person).toEqual( + expect(hub.db.postgres.transaction).toHaveBeenCalledTimes(1) + jest.spyOn(hub.db.postgres, 'transaction').mockRestore() + expect(hub.db.kafkaProducer.queueMessages).not.toBeCalled() + // verify Postgres persons + const persons = await fetchPostgresPersonsH() + expect(persons).toEqual( + expect.arrayContaining([ expect.objectContaining({ id: expect.any(Number), uuid: firstUserUuid, properties: {}, created_at: timestamp, - version: 1, - is_identified: true, - }) - ) - - // verify Postgres persons - const persons = await fetchPostgresPersonsH() - expect(persons.length).toEqual(1) - expect(persons[0]).toEqual(person) - - // verify Postgres distinct_ids - const distinctIds = await hub.db.fetchDistinctIdValues(person) - expect(distinctIds).toEqual(expect.arrayContaining([firstUserDistinctId, secondUserDistinctId])) - - // verify Postgres person_id overrides - const overrides = await overridesMode!.fetchPostgresPersonIdOverrides(hub, teamId) - expect(overrides).toEqual(new Set([{ old_person_id: second.uuid, override_person_id: first.uuid }])) - }) - - it(`handles a chain of overrides being applied concurrently`, async () => { - const first: InternalPerson = await hub.db.createPerson( - timestamp, - { first: true }, - {}, - {}, - teamId, - null, - false, - firstUserUuid, - [{ distinctId: firstUserDistinctId }] - ) - const second: InternalPerson = await hub.db.createPerson( - timestamp.plus({ minutes: 2 }), - { second: true }, - {}, - {}, - teamId, - null, - false, - secondUserUuid, - [{ distinctId: secondUserDistinctId }] - ) - const third: InternalPerson = await hub.db.createPerson( - timestamp.plus({ minutes: 5 }), - { third: true }, - {}, - {}, - teamId, - null, - false, - new UUIDT().toString(), - [{ distinctId: 'third' }] - ) - - // We want to simulate a concurrent update to person_overrides. We do - // this by first mocking the implementation to block at a certain point - // in the transaction, then running the update function twice. - // We then wait for them to block before letting them resume. - let resumeExecution: (value: unknown) => void - - const postgresTransaction = hub.db.postgres.transaction.bind(hub.db.postgres) - jest.spyOn(hub.db.postgres, 'transaction').mockImplementation( - async (use: PostgresUse, tag: string, transaction: any) => { - if (tag === 'mergePeople') { - return await postgresTransaction(use, tag, async (client) => { - if (resumeExecution) { - resumeExecution(undefined) - } else { - await new Promise((resolve) => { - resumeExecution = resolve - }) - } - - return await transaction(client) - }) - } else { - return await postgresTransaction(use, tag, transaction) - } - } - ) - - await Promise.all([ - personState( - { - event: '$merge_dangerously', - distinct_id: firstUserDistinctId, - properties: { - alias: secondUserDistinctId, - }, - }, - hub - ).handleIdentifyOrAlias(), - personState( - { - event: '$merge_dangerously', - distinct_id: secondUserDistinctId, - properties: { - alias: 'third', - }, - }, - hub - ).handleIdentifyOrAlias(), - ]) - - // Note: we can't verify anything here because the concurrency might have enabled both merges to already happen. - - await Promise.all([ - personState( - { - event: '$merge_dangerously', - distinct_id: firstUserDistinctId, - properties: { - alias: secondUserDistinctId, - }, - }, - hub - ).handleIdentifyOrAlias(), - personState( - { - event: '$merge_dangerously', - distinct_id: secondUserDistinctId, - properties: { - alias: 'third', - }, - }, - hub - ).handleIdentifyOrAlias(), - ]) - - // verify Postgres persons - const persons = await fetchPostgresPersonsH() - expect(persons.length).toEqual(1) - expect(persons[0]).toEqual( + version: 0, + is_identified: false, + }), expect.objectContaining({ id: expect.any(Number), - uuid: firstUserUuid, // guaranteed to be merged into this based on timestamps - // There's a race condition in our code where - // if different distinctIDs are used same time, - // then pros can be dropped, see https://docs.google.com/presentation/d/1Osz7r8bKkDD5yFzw0cCtsGVf1LTEifXS-dzuwaS8JGY - // properties: { first: true, second: true, third: true }, + uuid: secondUserUuid, + properties: {}, created_at: timestamp, - // This is 2 because they all start with version 0, and then: x - // third -> second = max(third(0), second(0)) + 1 == version 1 - // second -> first = max(second(1), first(0)) + 1 == version 2 - version: 2, - is_identified: true, - }) - ) + version: 0, + is_identified: false, + }), + ]) + ) + }) - // verify Postgres distinct_ids - const distinctIds = await hub.db.fetchDistinctIdValues(persons[0]) - expect(distinctIds).toEqual( - expect.arrayContaining([firstUserDistinctId, secondUserDistinctId, 'third']) - ) + it(`retries merges up to retry limit if postgres down`, async () => { + await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, false, firstUserUuid, [ + { distinctId: firstUserDistinctId }, + ]) + await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, false, secondUserUuid, [ + { distinctId: secondUserDistinctId }, + ]) - // verify Postgres person_id overrides, if applicable - if (overridesMode) { - const overrides = await overridesMode.fetchPostgresPersonIdOverrides(hub, teamId) - expect(overrides).toEqual( - new Set([ - { old_person_id: second.uuid, override_person_id: first.uuid }, - { old_person_id: third.uuid, override_person_id: first.uuid }, - ]) - ) - } + const state: PersonState = personState({}, hub) + // break postgres + const error = new DependencyUnavailableError('testing', 'Postgres', new Error('test')) + jest.spyOn(state, 'mergePeople').mockImplementation(() => { + throw error }) + jest.spyOn(hub.db.kafkaProducer, 'queueMessages') + await expect(state.merge(secondUserDistinctId, firstUserDistinctId, teamId, timestamp)).rejects.toThrow( + error + ) - it(`handles a chain of overrides being applied out of order`, async () => { - const first: InternalPerson = await hub.db.createPerson( - timestamp, - { first: true }, - {}, - {}, - teamId, - null, - false, - firstUserUuid, - [{ distinctId: firstUserDistinctId }] - ) - const second: InternalPerson = await hub.db.createPerson( - timestamp.plus({ minutes: 2 }), - { second: true }, - {}, - {}, - teamId, - null, - false, - secondUserUuid, - [{ distinctId: secondUserDistinctId }] - ) - const third: InternalPerson = await hub.db.createPerson( - timestamp.plus({ minutes: 5 }), - { third: true }, - {}, - {}, - teamId, - null, - false, - new UUIDT().toString(), - [{ distinctId: 'third' }] - ) - - await personState( - { - event: '$merge_dangerously', - distinct_id: secondUserDistinctId, - properties: { - alias: 'third', - }, - }, - hub - ).handleIdentifyOrAlias() - - await personState( - { - event: '$merge_dangerously', - distinct_id: firstUserDistinctId, - properties: { - alias: secondUserDistinctId, - }, - }, - hub - ).handleIdentifyOrAlias() + await hub.db.kafkaProducer.flush() - // verify Postgres persons - const persons = await fetchPostgresPersonsH() - expect(persons.length).toEqual(1) - expect(persons[0]).toEqual( + expect(state.mergePeople).toHaveBeenCalledTimes(3) + jest.spyOn(state, 'mergePeople').mockRestore() + expect(hub.db.kafkaProducer.queueMessages).not.toBeCalled() + // verify Postgres persons + const persons = await fetchPostgresPersonsH() + expect(persons).toEqual( + expect.arrayContaining([ expect.objectContaining({ id: expect.any(Number), - uuid: firstUserUuid, // guaranteed to be merged into this based on timestamps - properties: { first: true, second: true, third: true }, + uuid: firstUserUuid, + properties: {}, created_at: timestamp, - // This is 2 because they all start with version 0, and then: - // third -> second = max(third(0), second(0)) + 1 == version 1 - // second -> first = max(second(1), first(0)) + 1 == version 2 - version: 2, - is_identified: true, - }) - ) - - // verify Postgres distinct_ids - const distinctIds = await hub.db.fetchDistinctIdValues(persons[0]) - expect(distinctIds).toEqual( - expect.arrayContaining([firstUserDistinctId, secondUserDistinctId, 'third']) - ) - - // verify Postgres person_id overrides, if applicable - if (overridesMode) { - const overrides = await overridesMode.fetchPostgresPersonIdOverrides(hub, teamId) - expect(overrides).toEqual( - new Set([ - { old_person_id: second.uuid, override_person_id: first.uuid }, - { old_person_id: third.uuid, override_person_id: first.uuid }, - ]) - ) - } - }) - }) - }) -}) - -describe('flat person overrides writer', () => { - let hub: Hub - let closeHub: () => Promise - - let organizationId: string - let teamId: number - let writer: FlatPersonOverrideWriter - - beforeAll(async () => { - ;[hub, closeHub] = await createHub({}) - organizationId = await createOrganization(hub.db.postgres) - writer = new FlatPersonOverrideWriter(hub.db.postgres) - }) - - beforeEach(async () => { - teamId = await createTeam(hub.db.postgres, organizationId) - }) - - afterAll(async () => { - await closeHub() - }) - - it('handles direct overrides', async () => { - const { postgres } = hub.db - - const defaults = { - team_id: teamId, - oldest_event: DateTime.fromMillis(0), - } - - const override = { - old_person_id: new UUIDT().toString(), - override_person_id: new UUIDT().toString(), - } - - await postgres.transaction(PostgresUse.COMMON_WRITE, '', async (tx) => { - await writer.addPersonOverride(tx, { ...defaults, ...override }) - }) - - expect(await writer.getPersonOverrides(teamId)).toEqual([{ ...defaults, ...override }]) - }) - - it('handles transitive overrides', async () => { - const { postgres } = hub.db - - const defaults = { - team_id: teamId, - oldest_event: DateTime.fromMillis(0), - } - - const overrides = [ - { - old_person_id: new UUIDT().toString(), - override_person_id: new UUIDT().toString(), - }, - ] - - overrides.push({ - old_person_id: overrides[0].override_person_id, - override_person_id: new UUIDT().toString(), - }) - - await postgres.transaction(PostgresUse.COMMON_WRITE, '', async (tx) => { - for (const override of overrides) { - await writer.addPersonOverride(tx, { ...defaults, ...override }) - } - }) - - expect(new Set(await writer.getPersonOverrides(teamId))).toEqual( - new Set( - overrides.map(({ old_person_id }) => ({ - old_person_id, - override_person_id: overrides.at(-1)!.override_person_id, - ...defaults, - })) + version: 0, + is_identified: false, + }), + expect.objectContaining({ + id: expect.any(Number), + uuid: secondUserUuid, + properties: {}, + created_at: timestamp, + version: 0, + is_identified: false, + }), + ]) ) - ) - }) -}) - -describe('deferred person overrides', () => { - let hub: Hub - let closeHub: () => Promise - - // not always used, but used more often then not - let organizationId: string - let teamId: number - - let writer: DeferredPersonOverrideWriter - let syncWriter: FlatPersonOverrideWriter - let worker: DeferredPersonOverrideWorker - - beforeAll(async () => { - ;[hub, closeHub] = await createHub({}) - organizationId = await createOrganization(hub.db.postgres) - writer = new DeferredPersonOverrideWriter(hub.db.postgres) - syncWriter = new FlatPersonOverrideWriter(hub.db.postgres) - worker = new DeferredPersonOverrideWorker(hub.db.postgres, hub.db.kafkaProducer, syncWriter) - }) - - beforeEach(async () => { - teamId = await createTeam(hub.db.postgres, organizationId) - await hub.db.postgres.query( - PostgresUse.COMMON_WRITE, - 'TRUNCATE TABLE posthog_pendingpersonoverride', - undefined, - '' - ) - }) - - afterEach(() => { - jest.restoreAllMocks() - }) - - afterAll(async () => { - await closeHub() - }) - - const getPendingPersonOverrides = async () => { - const { rows } = await hub.db.postgres.query( - PostgresUse.COMMON_WRITE, - `SELECT old_person_id, override_person_id - FROM posthog_pendingpersonoverride - WHERE team_id = ${teamId}`, - undefined, - '' - ) - return rows - } - - it('moves overrides from the pending table to the overrides table', async () => { - const { postgres } = hub.db - - const override = { - old_person_id: new UUIDT().toString(), - override_person_id: new UUIDT().toString(), - } - - await postgres.transaction(PostgresUse.COMMON_WRITE, '', async (tx) => { - await writer.addPersonOverride(tx, { team_id: teamId, ...override, oldest_event: DateTime.fromMillis(0) }) }) - expect(await getPendingPersonOverrides()).toEqual([override]) - - expect(await worker.processPendingOverrides()).toEqual(1) - - expect(await getPendingPersonOverrides()).toMatchObject([]) - - expect( - (await syncWriter.getPersonOverrides(teamId)).map(({ old_person_id, override_person_id }) => [ - old_person_id, - override_person_id, + it(`handleIdentifyOrAlias does not throw on merge failure`, async () => { + // TODO: This the current state, we should probably change it + await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, false, firstUserUuid, [ + { distinctId: firstUserDistinctId }, + ]) + await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, false, secondUserUuid, [ + { distinctId: secondUserDistinctId }, ]) - ).toEqual([[override.old_person_id, override.override_person_id]]) - - const clickhouseOverrides = await waitForExpect(async () => { - const { data } = await hub.db.clickhouse.querying( - ` - SELECT old_person_id, override_person_id - FROM person_overrides - WHERE team_id = ${teamId} - `, - { dataObjects: true } - ) - expect(data).toHaveLength(1) - return data - }) - expect(clickhouseOverrides).toEqual([override]) - }) - - it('rolls back on kafka producer error', async () => { - const { postgres } = hub.db - - const override = { - old_person_id: new UUIDT().toString(), - override_person_id: new UUIDT().toString(), - } - - await postgres.transaction(PostgresUse.COMMON_WRITE, '', async (tx) => { - await writer.addPersonOverride(tx, { team_id: teamId, ...override, oldest_event: DateTime.fromMillis(0) }) - }) - - expect(await getPendingPersonOverrides()).toEqual([override]) - - jest.spyOn(hub.db.kafkaProducer, 'queueMessages').mockImplementation(() => { - throw new Error('something bad happened') - }) - - await expect(worker.processPendingOverrides()).rejects.toThrow() - - expect(await getPendingPersonOverrides()).toEqual([override]) - }) - - it('ensures advisory lock is held before processing', async () => { - const { postgres } = hub.db - - let acquiredLock: boolean - const tryLockComplete = new WaitEvent() - const readyToReleaseLock = new WaitEvent() - const transactionHolder = postgres - .transaction(PostgresUse.COMMON_WRITE, '', async (tx) => { - const { rows } = await postgres.query( - tx, - `SELECT pg_try_advisory_lock(${worker.lockId}) as acquired, pg_backend_pid()`, - undefined, - '' - ) - ;[{ acquired: acquiredLock }] = rows - tryLockComplete.set() - await readyToReleaseLock.wait() - }) - .then(() => { - acquiredLock = false + const state: PersonState = personState( + { + event: '$merge_dangerously', + distinct_id: firstUserDistinctId, + properties: { alias: secondUserDistinctId }, + }, + hub + ) + // break postgres + const error = new DependencyUnavailableError('testing', 'Postgres', new Error('test')) + jest.spyOn(state, 'mergePeople').mockImplementation(() => { + throw error }) + jest.spyOn(hub.db.kafkaProducer, 'queueMessages') + await state.handleIdentifyOrAlias() + await hub.db.kafkaProducer.flush() - try { - await tryLockComplete.wait() - expect(acquiredLock!).toBe(true) - await expect(worker.processPendingOverrides()).rejects.toThrow(Error('could not acquire lock')) - } finally { - readyToReleaseLock.set() - await transactionHolder - } - - expect(acquiredLock!).toBe(false) - await expect(worker.processPendingOverrides()).resolves.toEqual(0) - }) - - it('respects limit if provided', async () => { - const { postgres } = hub.db - - const overrides = [...Array(3)].map(() => ({ - old_person_id: new UUIDT().toString(), - override_person_id: new UUIDT().toString(), - })) - - await postgres.transaction(PostgresUse.COMMON_WRITE, '', async (tx) => { - await Promise.all( - overrides.map( - async (override) => - await writer.addPersonOverride(tx, { - team_id: teamId, - ...override, - oldest_event: DateTime.fromMillis(0), - }) - ) + expect(state.mergePeople).toHaveBeenCalledTimes(3) + jest.spyOn(state, 'mergePeople').mockRestore() + expect(hub.db.kafkaProducer.queueMessages).not.toBeCalled() + // verify Postgres persons + const persons = await fetchPostgresPersonsH() + expect(persons).toEqual( + expect.arrayContaining([ + expect.objectContaining({ + id: expect.any(Number), + uuid: firstUserUuid, + properties: {}, + created_at: timestamp, + version: 0, + is_identified: false, + }), + expect.objectContaining({ + id: expect.any(Number), + uuid: secondUserUuid, + properties: {}, + created_at: timestamp, + version: 0, + is_identified: false, + }), + ]) ) }) - - expect(await getPendingPersonOverrides()).toEqual(overrides) - - expect(await worker.processPendingOverrides(2)).toEqual(2) - expect(await getPendingPersonOverrides()).toMatchObject(overrides.slice(-1)) - - expect(await worker.processPendingOverrides(2)).toEqual(1) - expect(await getPendingPersonOverrides()).toEqual([]) }) })