From 35753a7b0ce910d1f8ddfdf48842ef293ee5aef3 Mon Sep 17 00:00:00 2001 From: Brett Hoerner Date: Mon, 1 Apr 2024 15:24:54 -0600 Subject: [PATCH] feat(plugin-server): handle process_person=false --- .../analytics-ingestion/happy-path.test.ts | 70 ++++++++++++ plugin-server/src/backfill.ts | 10 +- plugin-server/src/types.ts | 2 + plugin-server/src/utils/event.ts | 28 +++++ .../event-pipeline/createEventStep.ts | 5 +- .../event-pipeline/normalizeEventStep.ts | 5 +- .../event-pipeline/prepareEventStep.ts | 9 +- .../event-pipeline/processPersonsStep.ts | 4 +- .../worker/ingestion/event-pipeline/runner.ts | 41 +++++-- .../src/worker/ingestion/person-state.ts | 25 ++++- .../src/worker/ingestion/process-event.ts | 60 ++++++++--- .../__snapshots__/runner.test.ts.snap | 17 ++- .../event-pipeline/normalizeEventStep.test.ts | 53 +++++++++- .../event-pipeline/processPersonsStep.test.ts | 8 +- .../ingestion/event-pipeline/runner.test.ts | 40 ++++++- .../worker/ingestion/person-state.test.ts | 100 +++++++++++++++++- .../worker/ingestion/process-event.test.ts | 48 ++++++++- 17 files changed, 477 insertions(+), 48 deletions(-) diff --git a/plugin-server/functional_tests/analytics-ingestion/happy-path.test.ts b/plugin-server/functional_tests/analytics-ingestion/happy-path.test.ts index c8b3020c72d04c..4dbe769dd92b23 100644 --- a/plugin-server/functional_tests/analytics-ingestion/happy-path.test.ts +++ b/plugin-server/functional_tests/analytics-ingestion/happy-path.test.ts @@ -202,6 +202,76 @@ test.concurrent(`event ingestion: can $set and update person properties`, async }) }) +test.concurrent( + `event ingestion: $process_person=false drops expected fields, doesn't include person properties`, + async () => { + const teamId = await createTeam(organizationId) + const distinctId = new UUIDT().toString() + + // Normal ("full") event creates person with a property. + await capture({ + teamId, + distinctId, + uuid: new UUIDT().toString(), + event: '$identify', + properties: { + distinct_id: distinctId, + $set: { prop: 'value' }, + }, + }) + + // Propertyless event tries to $set, $set_once, $unset and use groups, but none of these + // should work. + const properylessUuid = new UUIDT().toString() + await capture({ + teamId, + distinctId, + uuid: properylessUuid, + event: 'custom event', + properties: { + $process_person: false, + $group_0: 'group_key', + $set: { + c: 3, + }, + $set_once: { + d: 4, + }, + $unset: ['prop'], + }, + $set: { + a: 1, + }, + $set_once: { + b: 2, + }, + }) + await waitForExpect(async () => { + const [event] = await fetchEvents(teamId, properylessUuid) + expect(event).toEqual( + expect.objectContaining({ + person_properties: {}, + properties: { $process_person: false, uuid: properylessUuid, $sent_at: expect.any(String) }, + }) + ) + }) + + // Another normal ("full") event sees the existing person property (it wasn't $unset) + const secondUuid = new UUIDT().toString() + await capture({ teamId, distinctId, uuid: secondUuid, event: 'custom event', properties: {} }) + await waitForExpect(async () => { + const [event] = await fetchEvents(teamId, secondUuid) + expect(event).toEqual( + expect.objectContaining({ + person_properties: expect.objectContaining({ + prop: 'value', + }), + }) + ) + }) + } +) + test.concurrent(`event ingestion: can $set and update person properties with top level $set`, async () => { // We support $set at the top level. This is as the time of writing how the // posthog-js library works. diff --git a/plugin-server/src/backfill.ts b/plugin-server/src/backfill.ts index e04a93e6f3b279..dd2bd1b118a3ee 100644 --- a/plugin-server/src/backfill.ts +++ b/plugin-server/src/backfill.ts @@ -150,6 +150,14 @@ async function handleEvent(db: DB, event: RawClickHouseEvent): Promise { // single CH event handlin const pluginEvent = formPluginEvent(event) const ts: DateTime = DateTime.fromISO(pluginEvent.timestamp as string) - const personState = new PersonState(pluginEvent, pluginEvent.team_id, pluginEvent.distinct_id, ts, db) + const processPerson = true + const personState = new PersonState( + pluginEvent, + pluginEvent.team_id, + pluginEvent.distinct_id, + ts, + processPerson, + db + ) await personState.handleIdentifyOrAlias() } diff --git a/plugin-server/src/types.ts b/plugin-server/src/types.ts index 98b656e37a18d9..1ff7de0d9902fd 100644 --- a/plugin-server/src/types.ts +++ b/plugin-server/src/types.ts @@ -635,6 +635,7 @@ export interface RawClickHouseEvent extends BaseEvent { group2_created_at?: ClickHouseTimestamp group3_created_at?: ClickHouseTimestamp group4_created_at?: ClickHouseTimestamp + person_mode: string } /** Parsed event row from ClickHouse. */ @@ -655,6 +656,7 @@ export interface ClickHouseEvent extends BaseEvent { group2_created_at?: DateTime | null group3_created_at?: DateTime | null group4_created_at?: DateTime | null + person_mode: string } /** Event in a database-agnostic shape, AKA an ingestion event. diff --git a/plugin-server/src/utils/event.ts b/plugin-server/src/utils/event.ts index 1b97c1baa7bf42..0fe917e0aebfee 100644 --- a/plugin-server/src/utils/event.ts +++ b/plugin-server/src/utils/event.ts @@ -111,6 +111,34 @@ export function convertToIngestionEvent(event: RawClickHouseEvent, skipElementsC } } +/// Does normalization steps involving the $process_person property. This is currently a separate +/// function because `normalizeEvent` is called from multiple places, some early in the pipeline, +/// and we want to have one trusted place where `$process_person` is handled and passed through +/// all of the processing steps. +/// +/// If `formPipelineEvent` is removed this can easily be combined with `normalizeEvent`. +export function normalizeProcessPerson(event: PluginEvent, processPerson: boolean): PluginEvent { + const properties = event.properties ?? {} + + // $process_person steps: + // 1. If person processing is disabled, $set, $set_once and $unset are dropped + // 2. Normalize the $process_person property on the event, if true, drop it since true is + // the default. If it was false before plugins ran, ensure it's still set to false. + if (!processPerson) { + delete event.$set + delete event.$set_once + delete properties.$set + delete properties.$set_once + delete properties.$unset + properties.$process_person = false + } else { + delete properties.$process_person + } + + event.properties = properties + return event +} + export function normalizeEvent(event: PluginEvent): PluginEvent { event.distinct_id = event.distinct_id?.toString() diff --git a/plugin-server/src/worker/ingestion/event-pipeline/createEventStep.ts b/plugin-server/src/worker/ingestion/event-pipeline/createEventStep.ts index f1056e175484f6..89dabade2d30b1 100644 --- a/plugin-server/src/worker/ingestion/event-pipeline/createEventStep.ts +++ b/plugin-server/src/worker/ingestion/event-pipeline/createEventStep.ts @@ -4,7 +4,8 @@ import { EventPipelineRunner } from './runner' export async function createEventStep( runner: EventPipelineRunner, event: PreIngestionEvent, - person: Person + person: Person, + processPerson: boolean ): Promise<[RawClickHouseEvent, Promise]> { - return await runner.hub.eventsProcessor.createEvent(event, person) + return await runner.hub.eventsProcessor.createEvent(event, person, processPerson) } diff --git a/plugin-server/src/worker/ingestion/event-pipeline/normalizeEventStep.ts b/plugin-server/src/worker/ingestion/event-pipeline/normalizeEventStep.ts index cdd0ea5705e958..d88c2bce25ab50 100644 --- a/plugin-server/src/worker/ingestion/event-pipeline/normalizeEventStep.ts +++ b/plugin-server/src/worker/ingestion/event-pipeline/normalizeEventStep.ts @@ -1,14 +1,15 @@ import { PluginEvent } from '@posthog/plugin-scaffold' import { DateTime } from 'luxon' -import { normalizeEvent } from '../../../utils/event' +import { normalizeEvent, normalizeProcessPerson } from '../../../utils/event' import { status } from '../../../utils/status' import { parseEventTimestamp } from '../timestamps' -export function normalizeEventStep(event: PluginEvent): [PluginEvent, DateTime] { +export function normalizeEventStep(event: PluginEvent, processPerson: boolean): [PluginEvent, DateTime] { let timestamp: DateTime try { event = normalizeEvent(event) + event = normalizeProcessPerson(event, processPerson) timestamp = parseEventTimestamp(event) } catch (error) { status.warn('⚠️', 'Failed normalizing event', { diff --git a/plugin-server/src/worker/ingestion/event-pipeline/prepareEventStep.ts b/plugin-server/src/worker/ingestion/event-pipeline/prepareEventStep.ts index 879941e4d18380..b097643e0ca5fe 100644 --- a/plugin-server/src/worker/ingestion/event-pipeline/prepareEventStep.ts +++ b/plugin-server/src/worker/ingestion/event-pipeline/prepareEventStep.ts @@ -6,7 +6,11 @@ import { captureIngestionWarning } from '../utils' import { invalidTimestampCounter } from './metrics' import { EventPipelineRunner } from './runner' -export async function prepareEventStep(runner: EventPipelineRunner, event: PluginEvent): Promise { +export async function prepareEventStep( + runner: EventPipelineRunner, + event: PluginEvent, + processPerson: boolean +): Promise { const { team_id, uuid } = event const tsParsingIngestionWarnings: Promise[] = [] const invalidTimestampCallback = function (type: string, details: Record) { @@ -20,7 +24,8 @@ export async function prepareEventStep(runner: EventPipelineRunner, event: Plugi event, team_id, parseEventTimestamp(event, invalidTimestampCallback), - uuid! // it will throw if it's undefined, + uuid!, // it will throw if it's undefined, + processPerson ) await Promise.all(tsParsingIngestionWarnings) diff --git a/plugin-server/src/worker/ingestion/event-pipeline/processPersonsStep.ts b/plugin-server/src/worker/ingestion/event-pipeline/processPersonsStep.ts index 92f36c7195a0a2..3cc38242736d6c 100644 --- a/plugin-server/src/worker/ingestion/event-pipeline/processPersonsStep.ts +++ b/plugin-server/src/worker/ingestion/event-pipeline/processPersonsStep.ts @@ -8,7 +8,8 @@ import { EventPipelineRunner } from './runner' export async function processPersonsStep( runner: EventPipelineRunner, event: PluginEvent, - timestamp: DateTime + timestamp: DateTime, + processPerson: boolean ): Promise<[PluginEvent, Person]> { let overridesWriter: DeferredPersonOverrideWriter | undefined = undefined if (runner.poEEmbraceJoin) { @@ -20,6 +21,7 @@ export async function processPersonsStep( event.team_id, String(event.distinct_id), timestamp, + processPerson, runner.hub.db, overridesWriter ).update() diff --git a/plugin-server/src/worker/ingestion/event-pipeline/runner.ts b/plugin-server/src/worker/ingestion/event-pipeline/runner.ts index 189cdd5d9ad1dd..1a36cd1988431a 100644 --- a/plugin-server/src/worker/ingestion/event-pipeline/runner.ts +++ b/plugin-server/src/worker/ingestion/event-pipeline/runner.ts @@ -7,7 +7,7 @@ import { Hub, PipelineEvent } from '../../../types' import { DependencyUnavailableError } from '../../../utils/db/error' import { timeoutGuard } from '../../../utils/db/utils' import { status } from '../../../utils/status' -import { generateEventDeadLetterQueueMessage } from '../utils' +import { captureIngestionWarning, generateEventDeadLetterQueueMessage } from '../utils' import { createEventStep } from './createEventStep' import { eventProcessedAndIngestedCounter, @@ -118,6 +118,31 @@ export class EventPipelineRunner { // ingestion pipeline is working well for all teams. this.poEEmbraceJoin = true } + + let processPerson = true + if (event.properties && event.properties.$process_person === false) { + // We are purposefully being very explicit here. The `$process_person` property *must* + // exist and be set to `false` (not missing, or null, or any other value) to disable + // person processing. + processPerson = false + + if (['$identify', '$create_alias', '$merge_dangerously', '$groupidentify'].includes(event.event)) { + const warningAck = captureIngestionWarning( + this.hub.db.kafkaProducer, + event.team_id, + 'invalid_event_when_process_person_is_false', + { + eventUuid: event.uuid, + event: event.event, + distinctId: event.distinct_id, + }, + { alwaysSend: true } + ) + + return this.registerLastStep('invalidEventForProvidedFlags', [event], [warningAck]) + } + } + const processedEvent = await this.runStep(pluginsProcessEventStep, [this, event], event.team_id) if (processedEvent == null) { // A plugin dropped the event. @@ -125,23 +150,27 @@ export class EventPipelineRunner { } // Normalizing is sync and doesn't need to run in a full `runStep` span for tracking. - const [normalizedEvent, timestamp] = normalizeEventStep(processedEvent) + const [normalizedEvent, timestamp] = normalizeEventStep(processedEvent, processPerson) const [postPersonEvent, person] = await this.runStep( processPersonsStep, - [this, normalizedEvent, timestamp], + [this, normalizedEvent, timestamp, processPerson], event.team_id ) - const preparedEvent = await this.runStep(prepareEventStep, [this, postPersonEvent], event.team_id) + const preparedEvent = await this.runStep( + prepareEventStep, + [this, postPersonEvent, processPerson], + event.team_id + ) const [rawClickhouseEvent, eventAck] = await this.runStep( createEventStep, - [this, preparedEvent, person], + [this, preparedEvent, person, processPerson], event.team_id ) - return this.registerLastStep('createEventStep', [rawClickhouseEvent, person], [eventAck]) + return this.registerLastStep('createEventStep', [rawClickhouseEvent], [eventAck]) } registerLastStep(stepName: string, args: any[], ackPromises?: Array>): EventPipelineResult { diff --git a/plugin-server/src/worker/ingestion/person-state.ts b/plugin-server/src/worker/ingestion/person-state.ts index dde359c2a08b59..8fb942e65c64ff 100644 --- a/plugin-server/src/worker/ingestion/person-state.ts +++ b/plugin-server/src/worker/ingestion/person-state.ts @@ -90,6 +90,7 @@ export class PersonState { private teamId: number, private distinctId: string, private timestamp: DateTime, + private processPerson: boolean, // $process_person flag from the event private db: DB, private personOverrideWriter?: DeferredPersonOverrideWriter, uuid: UUIDT | undefined = undefined @@ -103,6 +104,21 @@ export class PersonState { } async update(): Promise { + if (!this.processPerson) { + // We don't need to handle any properties for `processPerson=false` events, so we can + // short circuit by just finding or creating a person and returning early. + // + // In the future, we won't even get or create a real Person for these events, and so + // the `processPerson` boolean can be removed from this class altogether, as this class + // shouldn't even need to be invoked. + const [person, _] = await this.createOrGetPerson() + + // Ensure person properties don't propagate elsewhere, such as onto the event itself. + person.properties = {} + + return person + } + const person: Person | undefined = await this.handleIdentifyOrAlias() // TODO: make it also return a boolean for if we can exit early here if (person) { // try to shortcut if we have the person from identify or alias @@ -141,8 +157,13 @@ export class PersonState { return [person, false] } - const properties = this.eventProperties['$set'] || {} - const propertiesOnce = this.eventProperties['$set_once'] || {} + let properties = {} + let propertiesOnce = {} + if (this.processPerson) { + properties = this.eventProperties['$set'] + propertiesOnce = this.eventProperties['$set_once'] + } + person = await this.createPerson( this.timestamp, properties || {}, diff --git a/plugin-server/src/worker/ingestion/process-event.ts b/plugin-server/src/worker/ingestion/process-event.ts index 6698cc80e31f81..a3e9f384848b2a 100644 --- a/plugin-server/src/worker/ingestion/process-event.ts +++ b/plugin-server/src/worker/ingestion/process-event.ts @@ -5,6 +5,7 @@ import { DateTime } from 'luxon' import { Counter, Summary } from 'prom-client' import { + ClickHouseTimestamp, Element, GroupTypeIndex, Hub, @@ -70,7 +71,8 @@ export class EventsProcessor { data: PluginEvent, teamId: number, timestamp: DateTime, - eventUuid: string + eventUuid: string, + processPerson: boolean ): Promise { const singleSaveTimer = new Date() const timeout = timeoutGuard( @@ -92,7 +94,15 @@ export class EventsProcessor { eventUuid, }) try { - result = await this.capture(eventUuid, team, data['event'], distinctId, properties, timestamp) + result = await this.capture( + eventUuid, + team, + data['event'], + distinctId, + properties, + timestamp, + processPerson + ) processEventMsSummary.observe(Date.now() - singleSaveTimer.valueOf()) } finally { clearTimeout(captureTimeout) @@ -135,7 +145,8 @@ export class EventsProcessor { event: string, distinctId: string, properties: Properties, - timestamp: DateTime + timestamp: DateTime, + processPerson: boolean ): Promise { event = sanitizeEventName(event) @@ -156,11 +167,13 @@ export class EventsProcessor { } } - // Adds group_0 etc values to properties - properties = await addGroupProperties(team.id, properties, this.groupTypeManager) + if (processPerson) { + // Adds group_0 etc values to properties + properties = await addGroupProperties(team.id, properties, this.groupTypeManager) - if (event === '$groupidentify') { - await this.upsertGroup(team.id, properties, timestamp) + if (event === '$groupidentify') { + await this.upsertGroup(team.id, properties, timestamp) + } } return { @@ -186,7 +199,8 @@ export class EventsProcessor { async createEvent( preIngestionEvent: PreIngestionEvent, - person: Person + person: Person, + processPerson: boolean ): Promise<[RawClickHouseEvent, Promise]> { const { eventUuid: uuid, event, teamId, distinctId, properties, timestamp } = preIngestionEvent @@ -203,15 +217,26 @@ export class EventsProcessor { }) } - const groupIdentifiers = this.getGroupIdentifiers(properties) - const groupsColumns = await this.db.getGroupsColumns(teamId, groupIdentifiers) + let groupsColumns: Record = {} + let eventPersonProperties: string = '{}' + if (processPerson) { + const groupIdentifiers = this.getGroupIdentifiers(properties) + groupsColumns = await this.db.getGroupsColumns(teamId, groupIdentifiers) + eventPersonProperties = JSON.stringify({ + ...person.properties, + // For consistency, we'd like events to contain the properties that they set, even if those were changed + // before the event is ingested. + ...(properties.$set || {}), + }) + } else { + // TODO: Move this into `normalizeEventStep` where it belongs, but the code structure + // and tests demand this for now. + for (let groupTypeIndex = 0; groupTypeIndex < this.db.MAX_GROUP_TYPES_PER_TEAM; ++groupTypeIndex) { + const key = `$group_${groupTypeIndex}` + delete properties[key] + } + } - const eventPersonProperties: string = JSON.stringify({ - ...person.properties, - // For consistency, we'd like events to contain the properties that they set, even if those were changed - // before the event is ingested. - ...(properties.$set || {}), - }) // TODO: Remove Redis caching for person that's not used anymore const rawEvent: RawClickHouseEvent = { @@ -224,8 +249,9 @@ export class EventsProcessor { elements_chain: safeClickhouseString(elementsChain), created_at: castTimestampOrNow(null, TimestampFormat.ClickHouse), person_id: person.uuid, - person_properties: eventPersonProperties ?? undefined, + person_properties: eventPersonProperties, person_created_at: castTimestampOrNow(person.created_at, TimestampFormat.ClickHouseSecondPrecision), + person_mode: processPerson ? 'full' : 'propertyless', ...groupsColumns, } diff --git a/plugin-server/tests/worker/ingestion/event-pipeline/__snapshots__/runner.test.ts.snap b/plugin-server/tests/worker/ingestion/event-pipeline/__snapshots__/runner.test.ts.snap index 7e7b123b20d9b6..44bb55337e7620 100644 --- a/plugin-server/tests/worker/ingestion/event-pipeline/__snapshots__/runner.test.ts.snap +++ b/plugin-server/tests/worker/ingestion/event-pipeline/__snapshots__/runner.test.ts.snap @@ -41,14 +41,18 @@ Array [ Object { "distinct_id": "my_id", "event": "default event", - "ip": "127.0.0.1", + "ip": null, "now": "2020-02-23T02:15:00.000Z", - "properties": Object {}, + "properties": Object { + "$ip": "127.0.0.1", + }, "site_url": "http://localhost", "team_id": 2, "timestamp": "2020-02-23T02:15:00.000Z", "uuid": "uuid1", }, + "2020-02-23T02:15:00.000Z", + true, ], ], Array [ @@ -57,14 +61,17 @@ Array [ Object { "distinct_id": "my_id", "event": "default event", - "ip": "127.0.0.1", + "ip": null, "now": "2020-02-23T02:15:00.000Z", - "properties": Object {}, + "properties": Object { + "$ip": "127.0.0.1", + }, "site_url": "http://localhost", "team_id": 2, "timestamp": "2020-02-23T02:15:00.000Z", "uuid": "uuid1", }, + true, ], ], Array [ @@ -81,7 +88,6 @@ Array [ "timestamp": "2020-02-23T02:15:00.000Z", }, Object { - "get": [Function], "person": Object { "created_at": "2020-02-23T02:15:00.000Z", "id": 123, @@ -96,6 +102,7 @@ Array [ }, "personUpdateProperties": Object {}, }, + true, ], ], ] diff --git a/plugin-server/tests/worker/ingestion/event-pipeline/normalizeEventStep.test.ts b/plugin-server/tests/worker/ingestion/event-pipeline/normalizeEventStep.test.ts index 81c5de215f0df9..cc59c42f4905b0 100644 --- a/plugin-server/tests/worker/ingestion/event-pipeline/normalizeEventStep.test.ts +++ b/plugin-server/tests/worker/ingestion/event-pipeline/normalizeEventStep.test.ts @@ -5,7 +5,7 @@ import { UUIDT } from '../../../../src/utils/utils' import { normalizeEventStep } from '../../../../src/worker/ingestion/event-pipeline/normalizeEventStep' import { createOrganization, createTeam, resetTestDatabase } from '../../../helpers/sql' -describe.each([[true], [false]])('normalizeEventStep()', () => { +describe('normalizeEventStep()', () => { it('normalizes the event with properties set by plugins', async () => { await resetTestDatabase() const [hub, _] = await createHub() @@ -25,6 +25,7 @@ describe.each([[true], [false]])('normalizeEventStep()', () => { a: 5, }, $browser: 'Chrome', + $process_person: true, // This is dropped, as it is implied }, $set: { someProp: 'value', @@ -32,7 +33,8 @@ describe.each([[true], [false]])('normalizeEventStep()', () => { uuid: uuid, } - const [resEvent, timestamp] = normalizeEventStep(event) + const processPerson = true + const [resEvent, timestamp] = normalizeEventStep(event, processPerson) expect(resEvent).toEqual({ ...event, @@ -51,4 +53,51 @@ describe.each([[true], [false]])('normalizeEventStep()', () => { expect(timestamp).toEqual(DateTime.fromISO(event.timestamp!, { zone: 'utc' })) }) + + it('normalizes $process_person=false events by dropping $set and related', async () => { + await resetTestDatabase() + const [hub, _] = await createHub() + const organizationId = await createOrganization(hub.db.postgres) + const teamId = await createTeam(hub.db.postgres, organizationId) + const uuid = new UUIDT().toString() + const event = { + distinct_id: 'my_id', + ip: null, + site_url: 'http://localhost', + team_id: teamId, + now: '2020-02-23T02:15:00Z', + timestamp: '2020-02-23T02:15:00Z', + event: 'default event', + properties: { + $set: { + a: 5, + }, + $set_once: { + b: 10, + }, + $unset: ['c'], + $browser: 'Chrome', + }, + $set: { + someProp: 'value', + }, + $set_once: { + foo: 'bar', + }, + uuid: uuid, + } + + const processPerson = false + const [resEvent, timestamp] = normalizeEventStep(event, processPerson) + + expect(resEvent).toEqual({ + ...event, + properties: { + $browser: 'Chrome', + $process_person: false, + }, + }) + + expect(timestamp).toEqual(DateTime.fromISO(event.timestamp!, { zone: 'utc' })) + }) }) diff --git a/plugin-server/tests/worker/ingestion/event-pipeline/processPersonsStep.test.ts b/plugin-server/tests/worker/ingestion/event-pipeline/processPersonsStep.test.ts index d5962f030eee95..82ede43d996a4f 100644 --- a/plugin-server/tests/worker/ingestion/event-pipeline/processPersonsStep.test.ts +++ b/plugin-server/tests/worker/ingestion/event-pipeline/processPersonsStep.test.ts @@ -52,7 +52,8 @@ describe.each([[true], [false]])('processPersonsStep()', (poEEmbraceJoin) => { }) it('creates person', async () => { - const [resEvent, resPerson] = await processPersonsStep(runner, pluginEvent, timestamp) + const processPerson = true + const [resEvent, resPerson] = await processPersonsStep(runner, pluginEvent, timestamp, processPerson) expect(resEvent).toEqual(pluginEvent) expect(resPerson).toEqual( @@ -82,8 +83,9 @@ describe.each([[true], [false]])('processPersonsStep()', (poEEmbraceJoin) => { }, } - const [normalizedEvent, timestamp] = normalizeEventStep(event) - const [resEvent, resPerson] = await processPersonsStep(runner, normalizedEvent, timestamp) + const processPerson = true + const [normalizedEvent, timestamp] = normalizeEventStep(event, processPerson) + const [resEvent, resPerson] = await processPersonsStep(runner, normalizedEvent, timestamp, processPerson) expect(resEvent).toEqual({ ...event, diff --git a/plugin-server/tests/worker/ingestion/event-pipeline/runner.test.ts b/plugin-server/tests/worker/ingestion/event-pipeline/runner.test.ts index 364483f7c09a6f..105e4c5fa17108 100644 --- a/plugin-server/tests/worker/ingestion/event-pipeline/runner.test.ts +++ b/plugin-server/tests/worker/ingestion/event-pipeline/runner.test.ts @@ -25,7 +25,13 @@ class TestEventPipelineRunner extends EventPipelineRunner { protected runStep(step: any, [runner, ...args]: any[], teamId: number, sendtoDLQ: boolean) { this.steps.push(step.name) - this.stepsWithArgs.push([step.name, args]) + + // We stringify+parse to clone the `args` object, since we do a lot of event mutation + // and pass the same object around by reference. We want to see a "snapshot" of the args + // sent to each step, rather than the final mutated object (which many steps actually share + // in practice, for better or worse). + this.stepsWithArgs.push([step.name, JSON.parse(JSON.stringify(args))]) + return super.runStep(step, [runner, ...args], teamId, sendtoDLQ) } } @@ -242,3 +248,35 @@ describe('EventPipelineRunner', () => { }) }) }) + +describe('EventPipelineRunner $process_person=false', () => { + it('drops events that are not allowed when $process_person=false', async () => { + for (const eventName of ['$identify', '$create_alias', '$merge_dangerously', '$groupidentify']) { + const event = { + ...pipelineEvent, + properties: { $process_person: false }, + event: eventName, + team_id: 9, + } + + const hub: any = { + db: { + kafkaProducer: { queueMessage: jest.fn() }, + }, + } + const runner = new TestEventPipelineRunner(hub, event) + jest.mocked(populateTeamDataStep).mockResolvedValue(event) + + await runner.runEventPipeline(event) + expect(runner.steps).toEqual(['populateTeamDataStep']) + expect(hub.db.kafkaProducer.queueMessage).toHaveBeenCalledTimes(1) + expect( + JSON.parse(hub.db.kafkaProducer.queueMessage.mock.calls[0][0].kafkaMessage.messages[0].value) + ).toMatchObject({ + team_id: 9, + type: 'invalid_event_when_process_person_is_false', + details: JSON.stringify({ eventUuid: 'uuid1', event: eventName, distinctId: 'my_id' }), + }) + } + }) +}) diff --git a/plugin-server/tests/worker/ingestion/person-state.test.ts b/plugin-server/tests/worker/ingestion/person-state.test.ts index db64d97c3ec19e..3423ad01f6ab6f 100644 --- a/plugin-server/tests/worker/ingestion/person-state.test.ts +++ b/plugin-server/tests/worker/ingestion/person-state.test.ts @@ -95,7 +95,7 @@ describe('PersonState.update()', () => { await hub.db.clickhouseQuery('SYSTEM START MERGES') }) - function personState(event: Partial, customHub?: Hub) { + function personState(event: Partial, customHub?: Hub, processPerson = true) { const fullEvent = { team_id: teamId, properties: {}, @@ -107,6 +107,7 @@ describe('PersonState.update()', () => { teamId, event.distinct_id!, timestamp, + processPerson, customHub ? customHub.db : hub.db, overridesMode?.getWriter(customHub ?? hub), uuid @@ -172,6 +173,103 @@ describe('PersonState.update()', () => { expect(distinctIds).toEqual(expect.arrayContaining(['new-user'])) }) + it('creates person if they are new and $process_person=false', async () => { + // Note that eventually $process_person=false will be optimized so that the person is + // *not* created here. + const event_uuid = new UUIDT().toString() + const processPerson = false + const person = await personState( + { + event: '$pageview', + distinct_id: 'new-user', + uuid: event_uuid, + properties: { $process_person: false, $set: { a: 1 }, $set_once: { b: 2 } }, + }, + hub, + processPerson + ).update() + await hub.db.kafkaProducer.flush() + + expect(person).toEqual( + expect.objectContaining({ + id: expect.any(Number), + uuid: uuid.toString(), + properties: {}, + created_at: timestamp, + version: 0, + is_identified: false, + }) + ) + + expect(hub.db.fetchPerson).toHaveBeenCalledTimes(1) + expect(hub.db.updatePersonDeprecated).not.toHaveBeenCalled() + + // verify Postgres persons + const persons = await fetchPostgresPersonsH() + expect(persons.length).toEqual(1) + // For parity with existing functionality, the Person created in the DB actually gets + // the $creator_event_uuid property. When we stop creating person rows this won't matter. + expect(persons[0]).toEqual({ ...person, properties: { $creator_event_uuid: event_uuid } }) + + // verify Postgres distinct_ids + const distinctIds = await hub.db.fetchDistinctIdValues(persons[0]) + expect(distinctIds).toEqual(expect.arrayContaining(['new-user'])) + }) + + it('does not attach existing person properties to $process_person=false events', async () => { + const originalEventUuid = new UUIDT().toString() + const person = await personState({ + event: '$pageview', + distinct_id: 'new-user', + uuid: originalEventUuid, + properties: { $set: { c: 420 } }, + }).update() + await hub.db.kafkaProducer.flush() + + expect(person).toEqual( + expect.objectContaining({ + id: expect.any(Number), + uuid: uuid.toString(), + properties: { $creator_event_uuid: originalEventUuid, c: 420 }, + created_at: timestamp, + version: 0, + is_identified: false, + }) + ) + + // verify Postgres persons + const persons = await fetchPostgresPersonsH() + expect(persons.length).toEqual(1) + expect(persons[0]).toEqual(person) + + // verify Postgres distinct_ids + const distinctIds = await hub.db.fetchDistinctIdValues(persons[0]) + expect(distinctIds).toEqual(expect.arrayContaining(['new-user'])) + + // OK, a person now exists with { c: 420 }, let's prove the properties come back out + // of the DB. + const personVerifyProps = await personState({ + event: '$pageview', + distinct_id: 'new-user', + uuid: new UUIDT().toString(), + properties: {}, + }).update() + expect(personVerifyProps.properties).toEqual({ $creator_event_uuid: originalEventUuid, c: 420 }) + + // But they don't when $process_person=false + const processPersonFalseResult = await personState( + { + event: '$pageview', + distinct_id: 'new-user', + uuid: new UUIDT().toString(), + properties: {}, + }, + hub, + false + ).update() + expect(processPersonFalseResult.properties).toEqual({}) + }) + it('handles person being created in a race condition', async () => { await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, false, uuid.toString(), ['new-user']) diff --git a/plugin-server/tests/worker/ingestion/process-event.test.ts b/plugin-server/tests/worker/ingestion/process-event.test.ts index f4212284db5cc8..9e6c346c40675f 100644 --- a/plugin-server/tests/worker/ingestion/process-event.test.ts +++ b/plugin-server/tests/worker/ingestion/process-event.test.ts @@ -68,7 +68,8 @@ describe('EventsProcessor#createEvent()', () => { }) it('emits event with person columns, re-using event properties', async () => { - await eventsProcessor.createEvent(preIngestionEvent, person) + const processPerson = true + await eventsProcessor.createEvent(preIngestionEvent, person, processPerson) await eventsProcessor.kafkaProducer.flush() @@ -96,6 +97,7 @@ describe('EventsProcessor#createEvent()', () => { $group_2: '', $group_3: '', $group_4: '', + person_mode: 'full', }) ) }) @@ -112,7 +114,12 @@ describe('EventsProcessor#createEvent()', () => { 1 ) - await eventsProcessor.createEvent({ ...preIngestionEvent, properties: { $group_0: 'group_key' } }, person) + const processPerson = true + await eventsProcessor.createEvent( + { ...preIngestionEvent, properties: { $group_0: 'group_key' } }, + person, + processPerson + ) const events = await delayUntilEventIngested(() => hub.db.fetchEvents()) expect(events.length).toEqual(1) @@ -130,6 +137,36 @@ describe('EventsProcessor#createEvent()', () => { group2_properties: {}, group3_properties: {}, group4_properties: {}, + person_mode: 'full', + }) + ) + }) + + it('when $process_person=false, emits event with without person properties or groups', async () => { + const processPerson = false + await eventsProcessor.createEvent( + { ...preIngestionEvent, properties: { $group_0: 'group_key' } }, + person, + processPerson + ) + + await eventsProcessor.kafkaProducer.flush() + + const events = await delayUntilEventIngested(() => hub.db.fetchEvents()) + expect(events.length).toEqual(1) + expect(events[0]).toEqual( + expect.objectContaining({ + uuid: eventUuid, + event: '$pageview', + properties: {}, // $group_0 is removed + timestamp: expect.any(DateTime), + team_id: 2, + distinct_id: 'my_id', + elements_chain: null, + created_at: expect.any(DateTime), + person_id: personUuid, + person_properties: {}, + person_mode: 'propertyless', }) ) }) @@ -149,7 +186,12 @@ describe('EventsProcessor#createEvent()', () => { properties_last_updated_at: {}, properties_last_operation: {}, } - await eventsProcessor.createEvent({ ...preIngestionEvent, distinctId: 'no-such-person' }, nonExistingPerson) + const processPerson = true + await eventsProcessor.createEvent( + { ...preIngestionEvent, distinctId: 'no-such-person' }, + nonExistingPerson, + processPerson + ) await eventsProcessor.kafkaProducer.flush() const events = await delayUntilEventIngested(() => hub.db.fetchEvents())