From ea05d8e27648180ba12d2bc54460f289abe46517 Mon Sep 17 00:00:00 2001 From: Brett Hoerner Date: Fri, 5 Apr 2024 08:34:08 -0600 Subject: [PATCH] feat(plugin-server): handle process_person=false (#21262) * chore(plugin-server): extract normalizeEvent method, since this has nothing to do with persons * chore(plugin-server): cleanup PersonState args * chore(plugin-server): lazily json serialize some timeoutguard context * feat(plugin-server): handle process_person=false --- .../analytics-ingestion/happy-path.test.ts | 72 +++++++++++ plugin-server/src/backfill.ts | 10 +- plugin-server/src/types.ts | 2 + plugin-server/src/utils/event.ts | 28 ++++ .../event-pipeline/createEventStep.ts | 5 +- .../event-pipeline/normalizeEventStep.ts | 25 ++++ .../event-pipeline/prepareEventStep.ts | 9 +- .../event-pipeline/processPersonsStep.ts | 18 +-- .../worker/ingestion/event-pipeline/runner.ts | 62 +++++++-- .../src/worker/ingestion/person-state.ts | 59 +++++---- .../src/worker/ingestion/process-event.ts | 67 +++++++--- .../ingestion/property-definitions-manager.ts | 9 +- .../__snapshots__/runner.test.ts.snap | 23 +++- .../event-pipeline/normalizeEventStep.test.ts | 103 +++++++++++++++ .../event-pipeline/processPersonsStep.test.ts | 14 +- .../ingestion/event-pipeline/runner.test.ts | 44 ++++++- .../worker/ingestion/person-state.test.ts | 121 +++++++++++++++--- .../worker/ingestion/process-event.test.ts | 48 ++++++- 18 files changed, 614 insertions(+), 105 deletions(-) create mode 100644 plugin-server/src/worker/ingestion/event-pipeline/normalizeEventStep.ts create mode 100644 plugin-server/tests/worker/ingestion/event-pipeline/normalizeEventStep.test.ts diff --git a/plugin-server/functional_tests/analytics-ingestion/happy-path.test.ts b/plugin-server/functional_tests/analytics-ingestion/happy-path.test.ts index c8b3020c72d04..8d3052459e13d 100644 --- a/plugin-server/functional_tests/analytics-ingestion/happy-path.test.ts +++ b/plugin-server/functional_tests/analytics-ingestion/happy-path.test.ts @@ -202,6 +202,78 @@ test.concurrent(`event ingestion: can $set and update person properties`, async }) }) +test.concurrent( + `event ingestion: $process_person=false drops expected fields, doesn't include person properties`, + async () => { + const teamId = await createTeam(organizationId) + const distinctId = new UUIDT().toString() + + // Normal ("full") event creates person with a property. + await capture({ + teamId, + distinctId, + uuid: new UUIDT().toString(), + event: '$identify', + properties: { + distinct_id: distinctId, + $set: { prop: 'value' }, + }, + }) + + // Propertyless event tries to $set, $set_once, $unset and use groups, but none of these + // should work. + const properylessUuid = new UUIDT().toString() + await capture({ + teamId, + distinctId, + uuid: properylessUuid, + event: 'custom event', + properties: { + $process_person: false, + $group_0: 'group_key', + $set: { + c: 3, + }, + $set_once: { + d: 4, + }, + $unset: ['prop'], + }, + $set: { + a: 1, + }, + $set_once: { + b: 2, + }, + }) + await waitForExpect(async () => { + const [event] = await fetchEvents(teamId, properylessUuid) + expect(event).toEqual( + expect.objectContaining({ + person_properties: {}, + properties: { $process_person: false, uuid: properylessUuid, $sent_at: expect.any(String) }, + person_mode: 'propertyless', + }) + ) + }) + + // Another normal ("full") event sees the existing person property (it wasn't $unset) + const secondUuid = new UUIDT().toString() + await capture({ teamId, distinctId, uuid: secondUuid, event: 'custom event', properties: {} }) + await waitForExpect(async () => { + const [event] = await fetchEvents(teamId, secondUuid) + expect(event).toEqual( + expect.objectContaining({ + person_properties: expect.objectContaining({ + prop: 'value', + }), + person_mode: 'full', + }) + ) + }) + } +) + test.concurrent(`event ingestion: can $set and update person properties with top level $set`, async () => { // We support $set at the top level. This is as the time of writing how the // posthog-js library works. diff --git a/plugin-server/src/backfill.ts b/plugin-server/src/backfill.ts index e04a93e6f3b27..dd2bd1b118a3e 100644 --- a/plugin-server/src/backfill.ts +++ b/plugin-server/src/backfill.ts @@ -150,6 +150,14 @@ async function handleEvent(db: DB, event: RawClickHouseEvent): Promise { // single CH event handlin const pluginEvent = formPluginEvent(event) const ts: DateTime = DateTime.fromISO(pluginEvent.timestamp as string) - const personState = new PersonState(pluginEvent, pluginEvent.team_id, pluginEvent.distinct_id, ts, db) + const processPerson = true + const personState = new PersonState( + pluginEvent, + pluginEvent.team_id, + pluginEvent.distinct_id, + ts, + processPerson, + db + ) await personState.handleIdentifyOrAlias() } diff --git a/plugin-server/src/types.ts b/plugin-server/src/types.ts index 98b656e37a18d..b07e4771f3f2c 100644 --- a/plugin-server/src/types.ts +++ b/plugin-server/src/types.ts @@ -635,6 +635,7 @@ export interface RawClickHouseEvent extends BaseEvent { group2_created_at?: ClickHouseTimestamp group3_created_at?: ClickHouseTimestamp group4_created_at?: ClickHouseTimestamp + person_mode: 'full' | 'propertyless' } /** Parsed event row from ClickHouse. */ @@ -655,6 +656,7 @@ export interface ClickHouseEvent extends BaseEvent { group2_created_at?: DateTime | null group3_created_at?: DateTime | null group4_created_at?: DateTime | null + person_mode: 'full' | 'propertyless' } /** Event in a database-agnostic shape, AKA an ingestion event. diff --git a/plugin-server/src/utils/event.ts b/plugin-server/src/utils/event.ts index 1b97c1baa7bf4..0fe917e0aebfe 100644 --- a/plugin-server/src/utils/event.ts +++ b/plugin-server/src/utils/event.ts @@ -111,6 +111,34 @@ export function convertToIngestionEvent(event: RawClickHouseEvent, skipElementsC } } +/// Does normalization steps involving the $process_person property. This is currently a separate +/// function because `normalizeEvent` is called from multiple places, some early in the pipeline, +/// and we want to have one trusted place where `$process_person` is handled and passed through +/// all of the processing steps. +/// +/// If `formPipelineEvent` is removed this can easily be combined with `normalizeEvent`. +export function normalizeProcessPerson(event: PluginEvent, processPerson: boolean): PluginEvent { + const properties = event.properties ?? {} + + // $process_person steps: + // 1. If person processing is disabled, $set, $set_once and $unset are dropped + // 2. Normalize the $process_person property on the event, if true, drop it since true is + // the default. If it was false before plugins ran, ensure it's still set to false. + if (!processPerson) { + delete event.$set + delete event.$set_once + delete properties.$set + delete properties.$set_once + delete properties.$unset + properties.$process_person = false + } else { + delete properties.$process_person + } + + event.properties = properties + return event +} + export function normalizeEvent(event: PluginEvent): PluginEvent { event.distinct_id = event.distinct_id?.toString() diff --git a/plugin-server/src/worker/ingestion/event-pipeline/createEventStep.ts b/plugin-server/src/worker/ingestion/event-pipeline/createEventStep.ts index f1056e175484f..89dabade2d30b 100644 --- a/plugin-server/src/worker/ingestion/event-pipeline/createEventStep.ts +++ b/plugin-server/src/worker/ingestion/event-pipeline/createEventStep.ts @@ -4,7 +4,8 @@ import { EventPipelineRunner } from './runner' export async function createEventStep( runner: EventPipelineRunner, event: PreIngestionEvent, - person: Person + person: Person, + processPerson: boolean ): Promise<[RawClickHouseEvent, Promise]> { - return await runner.hub.eventsProcessor.createEvent(event, person) + return await runner.hub.eventsProcessor.createEvent(event, person, processPerson) } diff --git a/plugin-server/src/worker/ingestion/event-pipeline/normalizeEventStep.ts b/plugin-server/src/worker/ingestion/event-pipeline/normalizeEventStep.ts new file mode 100644 index 0000000000000..a302412f42d98 --- /dev/null +++ b/plugin-server/src/worker/ingestion/event-pipeline/normalizeEventStep.ts @@ -0,0 +1,25 @@ +import { PluginEvent } from '@posthog/plugin-scaffold' +import { DateTime } from 'luxon' + +import { normalizeEvent, normalizeProcessPerson } from '../../../utils/event' +import { status } from '../../../utils/status' +import { parseEventTimestamp } from '../timestamps' + +export function normalizeEventStep(event: PluginEvent, processPerson: boolean): Promise<[PluginEvent, DateTime]> { + let timestamp: DateTime + try { + event = normalizeEvent(event) + event = normalizeProcessPerson(event, processPerson) + timestamp = parseEventTimestamp(event) + } catch (error) { + status.warn('⚠️', 'Failed normalizing event', { + team_id: event.team_id, + uuid: event.uuid, + error, + }) + throw error + } + + // We need to be "async" to deal with how `runStep` currently works. + return Promise.resolve([event, timestamp]) +} diff --git a/plugin-server/src/worker/ingestion/event-pipeline/prepareEventStep.ts b/plugin-server/src/worker/ingestion/event-pipeline/prepareEventStep.ts index 879941e4d1838..b097643e0ca5f 100644 --- a/plugin-server/src/worker/ingestion/event-pipeline/prepareEventStep.ts +++ b/plugin-server/src/worker/ingestion/event-pipeline/prepareEventStep.ts @@ -6,7 +6,11 @@ import { captureIngestionWarning } from '../utils' import { invalidTimestampCounter } from './metrics' import { EventPipelineRunner } from './runner' -export async function prepareEventStep(runner: EventPipelineRunner, event: PluginEvent): Promise { +export async function prepareEventStep( + runner: EventPipelineRunner, + event: PluginEvent, + processPerson: boolean +): Promise { const { team_id, uuid } = event const tsParsingIngestionWarnings: Promise[] = [] const invalidTimestampCallback = function (type: string, details: Record) { @@ -20,7 +24,8 @@ export async function prepareEventStep(runner: EventPipelineRunner, event: Plugi event, team_id, parseEventTimestamp(event, invalidTimestampCallback), - uuid! // it will throw if it's undefined, + uuid!, // it will throw if it's undefined, + processPerson ) await Promise.all(tsParsingIngestionWarnings) diff --git a/plugin-server/src/worker/ingestion/event-pipeline/processPersonsStep.ts b/plugin-server/src/worker/ingestion/event-pipeline/processPersonsStep.ts index 91bf6f10f27bd..3cc38242736d6 100644 --- a/plugin-server/src/worker/ingestion/event-pipeline/processPersonsStep.ts +++ b/plugin-server/src/worker/ingestion/event-pipeline/processPersonsStep.ts @@ -2,26 +2,15 @@ import { PluginEvent } from '@posthog/plugin-scaffold' import { DateTime } from 'luxon' import { Person } from 'types' -import { normalizeEvent } from '../../../utils/event' -import { status } from '../../../utils/status' import { DeferredPersonOverrideWriter, PersonState } from '../person-state' -import { parseEventTimestamp } from '../timestamps' import { EventPipelineRunner } from './runner' export async function processPersonsStep( runner: EventPipelineRunner, - pluginEvent: PluginEvent + event: PluginEvent, + timestamp: DateTime, + processPerson: boolean ): Promise<[PluginEvent, Person]> { - let event: PluginEvent - let timestamp: DateTime - try { - event = normalizeEvent(pluginEvent) - timestamp = parseEventTimestamp(event) - } catch (error) { - status.warn('⚠️', 'Failed normalizing event', { team_id: pluginEvent.team_id, uuid: pluginEvent.uuid, error }) - throw error - } - let overridesWriter: DeferredPersonOverrideWriter | undefined = undefined if (runner.poEEmbraceJoin) { overridesWriter = new DeferredPersonOverrideWriter(runner.hub.db.postgres) @@ -32,6 +21,7 @@ export async function processPersonsStep( event.team_id, String(event.distinct_id), timestamp, + processPerson, runner.hub.db, overridesWriter ).update() diff --git a/plugin-server/src/worker/ingestion/event-pipeline/runner.ts b/plugin-server/src/worker/ingestion/event-pipeline/runner.ts index 6ae2248513073..000df7262ca13 100644 --- a/plugin-server/src/worker/ingestion/event-pipeline/runner.ts +++ b/plugin-server/src/worker/ingestion/event-pipeline/runner.ts @@ -6,8 +6,9 @@ import { runInSpan } from '../../../sentry' import { Hub, PipelineEvent } from '../../../types' import { DependencyUnavailableError } from '../../../utils/db/error' import { timeoutGuard } from '../../../utils/db/utils' +import { normalizeProcessPerson } from '../../../utils/event' import { status } from '../../../utils/status' -import { generateEventDeadLetterQueueMessage } from '../utils' +import { captureIngestionWarning, generateEventDeadLetterQueueMessage } from '../utils' import { createEventStep } from './createEventStep' import { eventProcessedAndIngestedCounter, @@ -17,6 +18,7 @@ import { pipelineStepMsSummary, pipelineStepThrowCounter, } from './metrics' +import { normalizeEventStep } from './normalizeEventStep' import { pluginsProcessEventStep } from './pluginsProcessEventStep' import { populateTeamDataStep } from './populateTeamDataStep' import { prepareEventStep } from './prepareEventStep' @@ -117,22 +119,66 @@ export class EventPipelineRunner { // ingestion pipeline is working well for all teams. this.poEEmbraceJoin = true } - const processedEvent = await this.runStep(pluginsProcessEventStep, [this, event], event.team_id) + let processPerson = true + if (event.properties && event.properties.$process_person === false) { + // We are purposefully being very explicit here. The `$process_person` property *must* + // exist and be set to `false` (not missing, or null, or any other value) to disable + // person processing. + processPerson = false + + if (['$identify', '$create_alias', '$merge_dangerously', '$groupidentify'].includes(event.event)) { + const warningAck = captureIngestionWarning( + this.hub.db.kafkaProducer, + event.team_id, + 'invalid_event_when_process_person_is_false', + { + eventUuid: event.uuid, + event: event.event, + distinctId: event.distinct_id, + }, + { alwaysSend: true } + ) + + return this.registerLastStep('invalidEventForProvidedFlags', [event], [warningAck]) + } + + // If person processing is disabled, go ahead and remove person related keys before + // any plugins have a chance to see them. + event = normalizeProcessPerson(event, processPerson) + } + + const processedEvent = await this.runStep(pluginsProcessEventStep, [this, event], event.team_id) if (processedEvent == null) { + // A plugin dropped the event. return this.registerLastStep('pluginsProcessEventStep', [event]) } - const [normalizedEvent, person] = await this.runStep(processPersonsStep, [this, processedEvent], event.team_id) - const preparedEvent = await this.runStep(prepareEventStep, [this, normalizedEvent], event.team_id) + const [normalizedEvent, timestamp] = await this.runStep( + normalizeEventStep, + [processedEvent, processPerson], + event.team_id + ) + + const [postPersonEvent, person] = await this.runStep( + processPersonsStep, + [this, normalizedEvent, timestamp, processPerson], + event.team_id + ) + + const preparedEvent = await this.runStep( + prepareEventStep, + [this, postPersonEvent, processPerson], + event.team_id + ) const [rawClickhouseEvent, eventAck] = await this.runStep( createEventStep, - [this, preparedEvent, person], + [this, preparedEvent, person, processPerson], event.team_id ) - return this.registerLastStep('createEventStep', [rawClickhouseEvent, person], [eventAck]) + return this.registerLastStep('createEventStep', [rawClickhouseEvent], [eventAck]) } registerLastStep(stepName: string, args: any[], ackPromises?: Array>): EventPipelineResult { @@ -156,12 +202,12 @@ export class EventPipelineRunner { const sendToSentry = false const timeout = timeoutGuard( `Event pipeline step stalled. Timeout warning after ${this.hub.PIPELINE_STEP_STALLED_LOG_TIMEOUT} sec! step=${step.name} team_id=${teamId} distinct_id=${this.originalEvent.distinct_id}`, - { + () => ({ step: step.name, event: JSON.stringify(this.originalEvent), teamId: teamId, distinctId: this.originalEvent.distinct_id, - }, + }), this.hub.PIPELINE_STEP_STALLED_LOG_TIMEOUT * 1000, sendToSentry ) diff --git a/plugin-server/src/worker/ingestion/person-state.ts b/plugin-server/src/worker/ingestion/person-state.ts index 525bbbf84c910..8fb942e65c64f 100644 --- a/plugin-server/src/worker/ingestion/person-state.ts +++ b/plugin-server/src/worker/ingestion/person-state.ts @@ -16,8 +16,6 @@ import { status } from '../../utils/status' import { castTimestampOrNow, UUIDT } from '../../utils/utils' import { captureIngestionWarning } from './utils' -const MAX_FAILED_PERSON_MERGE_ATTEMPTS = 3 - export const mergeFinalFailuresCounter = new Counter({ name: 'person_merge_final_failure_total', help: 'Number of person merge final failures.', @@ -82,36 +80,23 @@ const isDistinctIdIllegal = (id: string): boolean => { // This class is responsible for creating/updating a single person through the process-event pipeline export class PersonState { - event: PluginEvent - distinctId: string - teamId: number - eventProperties: Properties - timestamp: DateTime - newUuid: string - maxMergeAttempts: number - - private db: DB + private eventProperties: Properties + private newUuid: string + public updateIsIdentified: boolean // TODO: remove this from the class and being hidden constructor( - event: PluginEvent, - teamId: number, - distinctId: string, - timestamp: DateTime, - db: DB, + private event: PluginEvent, + private teamId: number, + private distinctId: string, + private timestamp: DateTime, + private processPerson: boolean, // $process_person flag from the event + private db: DB, private personOverrideWriter?: DeferredPersonOverrideWriter, - uuid: UUIDT | undefined = undefined, - maxMergeAttempts: number = MAX_FAILED_PERSON_MERGE_ATTEMPTS + uuid: UUIDT | undefined = undefined ) { - this.event = event - this.distinctId = distinctId - this.teamId = teamId this.eventProperties = event.properties! - this.timestamp = timestamp this.newUuid = (uuid || new UUIDT()).toString() - this.maxMergeAttempts = maxMergeAttempts - - this.db = db // If set to true, we'll update `is_identified` at the end of `updateProperties` // :KLUDGE: This is an indirect communication channel between `handleIdentifyOrAlias` and `updateProperties` @@ -119,6 +104,21 @@ export class PersonState { } async update(): Promise { + if (!this.processPerson) { + // We don't need to handle any properties for `processPerson=false` events, so we can + // short circuit by just finding or creating a person and returning early. + // + // In the future, we won't even get or create a real Person for these events, and so + // the `processPerson` boolean can be removed from this class altogether, as this class + // shouldn't even need to be invoked. + const [person, _] = await this.createOrGetPerson() + + // Ensure person properties don't propagate elsewhere, such as onto the event itself. + person.properties = {} + + return person + } + const person: Person | undefined = await this.handleIdentifyOrAlias() // TODO: make it also return a boolean for if we can exit early here if (person) { // try to shortcut if we have the person from identify or alias @@ -157,8 +157,13 @@ export class PersonState { return [person, false] } - const properties = this.eventProperties['$set'] || {} - const propertiesOnce = this.eventProperties['$set_once'] || {} + let properties = {} + let propertiesOnce = {} + if (this.processPerson) { + properties = this.eventProperties['$set'] + propertiesOnce = this.eventProperties['$set_once'] + } + person = await this.createPerson( this.timestamp, properties || {}, diff --git a/plugin-server/src/worker/ingestion/process-event.ts b/plugin-server/src/worker/ingestion/process-event.ts index 5d6a9a4191334..a3e9f384848b2 100644 --- a/plugin-server/src/worker/ingestion/process-event.ts +++ b/plugin-server/src/worker/ingestion/process-event.ts @@ -5,6 +5,7 @@ import { DateTime } from 'luxon' import { Counter, Summary } from 'prom-client' import { + ClickHouseTimestamp, Element, GroupTypeIndex, Hub, @@ -70,12 +71,14 @@ export class EventsProcessor { data: PluginEvent, teamId: number, timestamp: DateTime, - eventUuid: string + eventUuid: string, + processPerson: boolean ): Promise { const singleSaveTimer = new Date() - const timeout = timeoutGuard('Still inside "EventsProcessor.processEvent". Timeout warning after 30 sec!', { - event: JSON.stringify(data), - }) + const timeout = timeoutGuard( + 'Still inside "EventsProcessor.processEvent". Timeout warning after 30 sec!', + () => ({ event: JSON.stringify(data) }) + ) let result: PreIngestionEvent | null = null try { @@ -91,7 +94,15 @@ export class EventsProcessor { eventUuid, }) try { - result = await this.capture(eventUuid, team, data['event'], distinctId, properties, timestamp) + result = await this.capture( + eventUuid, + team, + data['event'], + distinctId, + properties, + timestamp, + processPerson + ) processEventMsSummary.observe(Date.now() - singleSaveTimer.valueOf()) } finally { clearTimeout(captureTimeout) @@ -134,7 +145,8 @@ export class EventsProcessor { event: string, distinctId: string, properties: Properties, - timestamp: DateTime + timestamp: DateTime, + processPerson: boolean ): Promise { event = sanitizeEventName(event) @@ -155,11 +167,13 @@ export class EventsProcessor { } } - // Adds group_0 etc values to properties - properties = await addGroupProperties(team.id, properties, this.groupTypeManager) + if (processPerson) { + // Adds group_0 etc values to properties + properties = await addGroupProperties(team.id, properties, this.groupTypeManager) - if (event === '$groupidentify') { - await this.upsertGroup(team.id, properties, timestamp) + if (event === '$groupidentify') { + await this.upsertGroup(team.id, properties, timestamp) + } } return { @@ -185,7 +199,8 @@ export class EventsProcessor { async createEvent( preIngestionEvent: PreIngestionEvent, - person: Person + person: Person, + processPerson: boolean ): Promise<[RawClickHouseEvent, Promise]> { const { eventUuid: uuid, event, teamId, distinctId, properties, timestamp } = preIngestionEvent @@ -202,15 +217,26 @@ export class EventsProcessor { }) } - const groupIdentifiers = this.getGroupIdentifiers(properties) - const groupsColumns = await this.db.getGroupsColumns(teamId, groupIdentifiers) + let groupsColumns: Record = {} + let eventPersonProperties: string = '{}' + if (processPerson) { + const groupIdentifiers = this.getGroupIdentifiers(properties) + groupsColumns = await this.db.getGroupsColumns(teamId, groupIdentifiers) + eventPersonProperties = JSON.stringify({ + ...person.properties, + // For consistency, we'd like events to contain the properties that they set, even if those were changed + // before the event is ingested. + ...(properties.$set || {}), + }) + } else { + // TODO: Move this into `normalizeEventStep` where it belongs, but the code structure + // and tests demand this for now. + for (let groupTypeIndex = 0; groupTypeIndex < this.db.MAX_GROUP_TYPES_PER_TEAM; ++groupTypeIndex) { + const key = `$group_${groupTypeIndex}` + delete properties[key] + } + } - const eventPersonProperties: string = JSON.stringify({ - ...person.properties, - // For consistency, we'd like events to contain the properties that they set, even if those were changed - // before the event is ingested. - ...(properties.$set || {}), - }) // TODO: Remove Redis caching for person that's not used anymore const rawEvent: RawClickHouseEvent = { @@ -223,8 +249,9 @@ export class EventsProcessor { elements_chain: safeClickhouseString(elementsChain), created_at: castTimestampOrNow(null, TimestampFormat.ClickHouse), person_id: person.uuid, - person_properties: eventPersonProperties ?? undefined, + person_properties: eventPersonProperties, person_created_at: castTimestampOrNow(person.created_at, TimestampFormat.ClickHouseSecondPrecision), + person_mode: processPerson ? 'full' : 'propertyless', ...groupsColumns, } diff --git a/plugin-server/src/worker/ingestion/property-definitions-manager.ts b/plugin-server/src/worker/ingestion/property-definitions-manager.ts index aad9343a16a28..f7ae512e37948 100644 --- a/plugin-server/src/worker/ingestion/property-definitions-manager.ts +++ b/plugin-server/src/worker/ingestion/property-definitions-manager.ts @@ -95,9 +95,12 @@ export class PropertyDefinitionsManager { } const timer = new Date() - const timeout = timeoutGuard('Still running "updateEventNamesAndProperties". Timeout warning after 30 sec!', { - event: event, - }) + const timeout = timeoutGuard( + 'Still running "updateEventNamesAndProperties". Timeout warning after 30 sec!', + () => ({ + event: event, + }) + ) try { const team: Team | null = await this.teamManager.fetchTeam(teamId) diff --git a/plugin-server/tests/worker/ingestion/event-pipeline/__snapshots__/runner.test.ts.snap b/plugin-server/tests/worker/ingestion/event-pipeline/__snapshots__/runner.test.ts.snap index 7e7b123b20d9b..9cd0d244500ae 100644 --- a/plugin-server/tests/worker/ingestion/event-pipeline/__snapshots__/runner.test.ts.snap +++ b/plugin-server/tests/worker/ingestion/event-pipeline/__snapshots__/runner.test.ts.snap @@ -35,20 +35,30 @@ Array [ }, ], ], + Array [ + "normalizeEventStep", + Array [ + true, + ], + ], Array [ "processPersonsStep", Array [ Object { "distinct_id": "my_id", "event": "default event", - "ip": "127.0.0.1", + "ip": null, "now": "2020-02-23T02:15:00.000Z", - "properties": Object {}, + "properties": Object { + "$ip": "127.0.0.1", + }, "site_url": "http://localhost", "team_id": 2, "timestamp": "2020-02-23T02:15:00.000Z", "uuid": "uuid1", }, + "2020-02-23T02:15:00.000Z", + true, ], ], Array [ @@ -57,14 +67,17 @@ Array [ Object { "distinct_id": "my_id", "event": "default event", - "ip": "127.0.0.1", + "ip": null, "now": "2020-02-23T02:15:00.000Z", - "properties": Object {}, + "properties": Object { + "$ip": "127.0.0.1", + }, "site_url": "http://localhost", "team_id": 2, "timestamp": "2020-02-23T02:15:00.000Z", "uuid": "uuid1", }, + true, ], ], Array [ @@ -81,7 +94,6 @@ Array [ "timestamp": "2020-02-23T02:15:00.000Z", }, Object { - "get": [Function], "person": Object { "created_at": "2020-02-23T02:15:00.000Z", "id": 123, @@ -96,6 +108,7 @@ Array [ }, "personUpdateProperties": Object {}, }, + true, ], ], ] diff --git a/plugin-server/tests/worker/ingestion/event-pipeline/normalizeEventStep.test.ts b/plugin-server/tests/worker/ingestion/event-pipeline/normalizeEventStep.test.ts new file mode 100644 index 0000000000000..52b659808597a --- /dev/null +++ b/plugin-server/tests/worker/ingestion/event-pipeline/normalizeEventStep.test.ts @@ -0,0 +1,103 @@ +import { DateTime } from 'luxon' + +import { createHub } from '../../../../src/utils/db/hub' +import { UUIDT } from '../../../../src/utils/utils' +import { normalizeEventStep } from '../../../../src/worker/ingestion/event-pipeline/normalizeEventStep' +import { createOrganization, createTeam, resetTestDatabase } from '../../../helpers/sql' + +describe('normalizeEventStep()', () => { + it('normalizes the event with properties set by plugins', async () => { + await resetTestDatabase() + const [hub, _] = await createHub() + const organizationId = await createOrganization(hub.db.postgres) + const teamId = await createTeam(hub.db.postgres, organizationId) + const uuid = new UUIDT().toString() + const event = { + distinct_id: 'my_id', + ip: null, + site_url: 'http://localhost', + team_id: teamId, + now: '2020-02-23T02:15:00Z', + timestamp: '2020-02-23T02:15:00Z', + event: 'default event', + properties: { + $set: { + a: 5, + }, + $browser: 'Chrome', + $process_person: true, // This is dropped, as it is implied + }, + $set: { + someProp: 'value', + }, + uuid: uuid, + } + + const processPerson = true + const [resEvent, timestamp] = await normalizeEventStep(event, processPerson) + + expect(resEvent).toEqual({ + ...event, + properties: { + $browser: 'Chrome', + $set: { + someProp: 'value', + a: 5, + $browser: 'Chrome', + }, + $set_once: { + $initial_browser: 'Chrome', + }, + }, + }) + + expect(timestamp).toEqual(DateTime.fromISO(event.timestamp!, { zone: 'utc' })) + }) + + it('normalizes $process_person=false events by dropping $set and related', async () => { + await resetTestDatabase() + const [hub, _] = await createHub() + const organizationId = await createOrganization(hub.db.postgres) + const teamId = await createTeam(hub.db.postgres, organizationId) + const uuid = new UUIDT().toString() + const event = { + distinct_id: 'my_id', + ip: null, + site_url: 'http://localhost', + team_id: teamId, + now: '2020-02-23T02:15:00Z', + timestamp: '2020-02-23T02:15:00Z', + event: 'default event', + properties: { + $set: { + a: 5, + }, + $set_once: { + b: 10, + }, + $unset: ['c'], + $browser: 'Chrome', + }, + $set: { + someProp: 'value', + }, + $set_once: { + foo: 'bar', + }, + uuid: uuid, + } + + const processPerson = false + const [resEvent, timestamp] = await normalizeEventStep(event, processPerson) + + expect(resEvent).toEqual({ + ...event, + properties: { + $browser: 'Chrome', + $process_person: false, + }, + }) + + expect(timestamp).toEqual(DateTime.fromISO(event.timestamp!, { zone: 'utc' })) + }) +}) diff --git a/plugin-server/tests/worker/ingestion/event-pipeline/processPersonsStep.test.ts b/plugin-server/tests/worker/ingestion/event-pipeline/processPersonsStep.test.ts index 2e9f1df0e69db..771a6900edc05 100644 --- a/plugin-server/tests/worker/ingestion/event-pipeline/processPersonsStep.test.ts +++ b/plugin-server/tests/worker/ingestion/event-pipeline/processPersonsStep.test.ts @@ -1,8 +1,10 @@ import { PluginEvent } from '@posthog/plugin-scaffold' +import { DateTime } from 'luxon' import { Hub } from '../../../../src/types' import { createHub } from '../../../../src/utils/db/hub' import { UUIDT } from '../../../../src/utils/utils' +import { normalizeEventStep } from '../../../../src/worker/ingestion/event-pipeline/normalizeEventStep' import { processPersonsStep } from '../../../../src/worker/ingestion/event-pipeline/processPersonsStep' import { createOrganization, createTeam, fetchPostgresPersons, resetTestDatabase } from '../../../helpers/sql' @@ -14,6 +16,7 @@ describe.each([[true], [false]])('processPersonsStep()', (poEEmbraceJoin) => { let uuid: string let teamId: number let pluginEvent: PluginEvent + let timestamp: DateTime beforeEach(async () => { await resetTestDatabase() @@ -42,13 +45,15 @@ describe.each([[true], [false]])('processPersonsStep()', (poEEmbraceJoin) => { }, uuid: uuid, } + timestamp = DateTime.fromISO(pluginEvent.timestamp!) }) afterEach(async () => { await closeHub?.() }) it('creates person', async () => { - const [resEvent, resPerson] = await processPersonsStep(runner, pluginEvent) + const processPerson = true + const [resEvent, resPerson] = await processPersonsStep(runner, pluginEvent, timestamp, processPerson) expect(resEvent).toEqual(pluginEvent) expect(resPerson).toEqual( @@ -67,7 +72,7 @@ describe.each([[true], [false]])('processPersonsStep()', (poEEmbraceJoin) => { expect(persons).toEqual([resPerson]) }) - it('re-normalizes the event with properties set by plugins', async () => { + it('creates event with normalized properties set by plugins', async () => { const event = { ...pluginEvent, properties: { @@ -77,7 +82,10 @@ describe.each([[true], [false]])('processPersonsStep()', (poEEmbraceJoin) => { someProp: 'value', }, } - const [resEvent, resPerson] = await processPersonsStep(runner, event) + + const processPerson = true + const [normalizedEvent, timestamp] = await normalizeEventStep(event, processPerson) + const [resEvent, resPerson] = await processPersonsStep(runner, normalizedEvent, timestamp, processPerson) expect(resEvent).toEqual({ ...event, diff --git a/plugin-server/tests/worker/ingestion/event-pipeline/runner.test.ts b/plugin-server/tests/worker/ingestion/event-pipeline/runner.test.ts index 364483f7c09a6..401829aa4c47a 100644 --- a/plugin-server/tests/worker/ingestion/event-pipeline/runner.test.ts +++ b/plugin-server/tests/worker/ingestion/event-pipeline/runner.test.ts @@ -25,7 +25,13 @@ class TestEventPipelineRunner extends EventPipelineRunner { protected runStep(step: any, [runner, ...args]: any[], teamId: number, sendtoDLQ: boolean) { this.steps.push(step.name) - this.stepsWithArgs.push([step.name, args]) + + // We stringify+parse to clone the `args` object, since we do a lot of event mutation + // and pass the same object around by reference. We want to see a "snapshot" of the args + // sent to each step, rather than the final mutated object (which many steps actually share + // in practice, for better or worse). + this.stepsWithArgs.push([step.name, JSON.parse(JSON.stringify(args))]) + return super.runStep(step, [runner, ...args], teamId, sendtoDLQ) } } @@ -111,6 +117,7 @@ describe('EventPipelineRunner', () => { expect(runner.steps).toEqual([ 'populateTeamDataStep', 'pluginsProcessEventStep', + 'normalizeEventStep', 'processPersonsStep', 'prepareEventStep', 'createEventStep', @@ -137,6 +144,7 @@ describe('EventPipelineRunner', () => { expect(runner.steps).toEqual([ 'populateTeamDataStep', 'pluginsProcessEventStep', + 'normalizeEventStep', 'processPersonsStep', 'prepareEventStep', 'createEventStep', @@ -161,7 +169,7 @@ describe('EventPipelineRunner', () => { const result = await runner.runEventPipeline(pipelineEvent) expect(result.error).toBeUndefined() - expect(pipelineStepMsSummarySpy).toHaveBeenCalledTimes(5) + expect(pipelineStepMsSummarySpy).toHaveBeenCalledTimes(6) expect(pipelineLastStepCounterSpy).toHaveBeenCalledTimes(1) expect(eventProcessedAndIngestedCounterSpy).toHaveBeenCalledTimes(1) expect(pipelineStepMsSummarySpy).toHaveBeenCalledWith('createEventStep') @@ -242,3 +250,35 @@ describe('EventPipelineRunner', () => { }) }) }) + +describe('EventPipelineRunner $process_person=false', () => { + it('drops events that are not allowed when $process_person=false', async () => { + for (const eventName of ['$identify', '$create_alias', '$merge_dangerously', '$groupidentify']) { + const event = { + ...pipelineEvent, + properties: { $process_person: false }, + event: eventName, + team_id: 9, + } + + const hub: any = { + db: { + kafkaProducer: { queueMessage: jest.fn() }, + }, + } + const runner = new TestEventPipelineRunner(hub, event) + jest.mocked(populateTeamDataStep).mockResolvedValue(event) + + await runner.runEventPipeline(event) + expect(runner.steps).toEqual(['populateTeamDataStep']) + expect(hub.db.kafkaProducer.queueMessage).toHaveBeenCalledTimes(1) + expect( + JSON.parse(hub.db.kafkaProducer.queueMessage.mock.calls[0][0].kafkaMessage.messages[0].value) + ).toMatchObject({ + team_id: 9, + type: 'invalid_event_when_process_person_is_false', + details: JSON.stringify({ eventUuid: 'uuid1', event: eventName, distinctId: 'my_id' }), + }) + } + }) +}) diff --git a/plugin-server/tests/worker/ingestion/person-state.test.ts b/plugin-server/tests/worker/ingestion/person-state.test.ts index df847a5112f73..3423ad01f6ab6 100644 --- a/plugin-server/tests/worker/ingestion/person-state.test.ts +++ b/plugin-server/tests/worker/ingestion/person-state.test.ts @@ -95,7 +95,7 @@ describe('PersonState.update()', () => { await hub.db.clickhouseQuery('SYSTEM START MERGES') }) - function personState(event: Partial, customHub?: Hub, maxMergeAttempts?: number) { + function personState(event: Partial, customHub?: Hub, processPerson = true) { const fullEvent = { team_id: teamId, properties: {}, @@ -107,10 +107,10 @@ describe('PersonState.update()', () => { teamId, event.distinct_id!, timestamp, + processPerson, customHub ? customHub.db : hub.db, overridesMode?.getWriter(customHub ?? hub), - uuid, - maxMergeAttempts ?? 3 // the default + uuid ) } @@ -173,6 +173,103 @@ describe('PersonState.update()', () => { expect(distinctIds).toEqual(expect.arrayContaining(['new-user'])) }) + it('creates person if they are new and $process_person=false', async () => { + // Note that eventually $process_person=false will be optimized so that the person is + // *not* created here. + const event_uuid = new UUIDT().toString() + const processPerson = false + const person = await personState( + { + event: '$pageview', + distinct_id: 'new-user', + uuid: event_uuid, + properties: { $process_person: false, $set: { a: 1 }, $set_once: { b: 2 } }, + }, + hub, + processPerson + ).update() + await hub.db.kafkaProducer.flush() + + expect(person).toEqual( + expect.objectContaining({ + id: expect.any(Number), + uuid: uuid.toString(), + properties: {}, + created_at: timestamp, + version: 0, + is_identified: false, + }) + ) + + expect(hub.db.fetchPerson).toHaveBeenCalledTimes(1) + expect(hub.db.updatePersonDeprecated).not.toHaveBeenCalled() + + // verify Postgres persons + const persons = await fetchPostgresPersonsH() + expect(persons.length).toEqual(1) + // For parity with existing functionality, the Person created in the DB actually gets + // the $creator_event_uuid property. When we stop creating person rows this won't matter. + expect(persons[0]).toEqual({ ...person, properties: { $creator_event_uuid: event_uuid } }) + + // verify Postgres distinct_ids + const distinctIds = await hub.db.fetchDistinctIdValues(persons[0]) + expect(distinctIds).toEqual(expect.arrayContaining(['new-user'])) + }) + + it('does not attach existing person properties to $process_person=false events', async () => { + const originalEventUuid = new UUIDT().toString() + const person = await personState({ + event: '$pageview', + distinct_id: 'new-user', + uuid: originalEventUuid, + properties: { $set: { c: 420 } }, + }).update() + await hub.db.kafkaProducer.flush() + + expect(person).toEqual( + expect.objectContaining({ + id: expect.any(Number), + uuid: uuid.toString(), + properties: { $creator_event_uuid: originalEventUuid, c: 420 }, + created_at: timestamp, + version: 0, + is_identified: false, + }) + ) + + // verify Postgres persons + const persons = await fetchPostgresPersonsH() + expect(persons.length).toEqual(1) + expect(persons[0]).toEqual(person) + + // verify Postgres distinct_ids + const distinctIds = await hub.db.fetchDistinctIdValues(persons[0]) + expect(distinctIds).toEqual(expect.arrayContaining(['new-user'])) + + // OK, a person now exists with { c: 420 }, let's prove the properties come back out + // of the DB. + const personVerifyProps = await personState({ + event: '$pageview', + distinct_id: 'new-user', + uuid: new UUIDT().toString(), + properties: {}, + }).update() + expect(personVerifyProps.properties).toEqual({ $creator_event_uuid: originalEventUuid, c: 420 }) + + // But they don't when $process_person=false + const processPersonFalseResult = await personState( + { + event: '$pageview', + distinct_id: 'new-user', + uuid: new UUIDT().toString(), + properties: {}, + }, + hub, + false + ).update() + expect(processPersonFalseResult.properties).toEqual({}) + }) + it('handles person being created in a race condition', async () => { await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, false, uuid.toString(), ['new-user']) @@ -1907,8 +2004,7 @@ describe('PersonState.update()', () => { alias: 'second', }, }, - hub, - 0 + hub ).handleIdentifyOrAlias(), personState( { @@ -1918,8 +2014,7 @@ describe('PersonState.update()', () => { alias: 'third', }, }, - hub, - 0 + hub ).handleIdentifyOrAlias(), ]) @@ -1934,8 +2029,7 @@ describe('PersonState.update()', () => { alias: 'second', }, }, - hub, - 0 + hub ).handleIdentifyOrAlias(), personState( { @@ -1945,8 +2039,7 @@ describe('PersonState.update()', () => { alias: 'third', }, }, - hub, - 0 + hub ).handleIdentifyOrAlias(), ]) @@ -2026,8 +2119,7 @@ describe('PersonState.update()', () => { alias: 'third', }, }, - hub, - 0 + hub ).handleIdentifyOrAlias() await personState( @@ -2038,8 +2130,7 @@ describe('PersonState.update()', () => { alias: 'second', }, }, - hub, - 0 + hub ).handleIdentifyOrAlias() // verify Postgres persons diff --git a/plugin-server/tests/worker/ingestion/process-event.test.ts b/plugin-server/tests/worker/ingestion/process-event.test.ts index f4212284db5cc..9e6c346c40675 100644 --- a/plugin-server/tests/worker/ingestion/process-event.test.ts +++ b/plugin-server/tests/worker/ingestion/process-event.test.ts @@ -68,7 +68,8 @@ describe('EventsProcessor#createEvent()', () => { }) it('emits event with person columns, re-using event properties', async () => { - await eventsProcessor.createEvent(preIngestionEvent, person) + const processPerson = true + await eventsProcessor.createEvent(preIngestionEvent, person, processPerson) await eventsProcessor.kafkaProducer.flush() @@ -96,6 +97,7 @@ describe('EventsProcessor#createEvent()', () => { $group_2: '', $group_3: '', $group_4: '', + person_mode: 'full', }) ) }) @@ -112,7 +114,12 @@ describe('EventsProcessor#createEvent()', () => { 1 ) - await eventsProcessor.createEvent({ ...preIngestionEvent, properties: { $group_0: 'group_key' } }, person) + const processPerson = true + await eventsProcessor.createEvent( + { ...preIngestionEvent, properties: { $group_0: 'group_key' } }, + person, + processPerson + ) const events = await delayUntilEventIngested(() => hub.db.fetchEvents()) expect(events.length).toEqual(1) @@ -130,6 +137,36 @@ describe('EventsProcessor#createEvent()', () => { group2_properties: {}, group3_properties: {}, group4_properties: {}, + person_mode: 'full', + }) + ) + }) + + it('when $process_person=false, emits event with without person properties or groups', async () => { + const processPerson = false + await eventsProcessor.createEvent( + { ...preIngestionEvent, properties: { $group_0: 'group_key' } }, + person, + processPerson + ) + + await eventsProcessor.kafkaProducer.flush() + + const events = await delayUntilEventIngested(() => hub.db.fetchEvents()) + expect(events.length).toEqual(1) + expect(events[0]).toEqual( + expect.objectContaining({ + uuid: eventUuid, + event: '$pageview', + properties: {}, // $group_0 is removed + timestamp: expect.any(DateTime), + team_id: 2, + distinct_id: 'my_id', + elements_chain: null, + created_at: expect.any(DateTime), + person_id: personUuid, + person_properties: {}, + person_mode: 'propertyless', }) ) }) @@ -149,7 +186,12 @@ describe('EventsProcessor#createEvent()', () => { properties_last_updated_at: {}, properties_last_operation: {}, } - await eventsProcessor.createEvent({ ...preIngestionEvent, distinctId: 'no-such-person' }, nonExistingPerson) + const processPerson = true + await eventsProcessor.createEvent( + { ...preIngestionEvent, distinctId: 'no-such-person' }, + nonExistingPerson, + processPerson + ) await eventsProcessor.kafkaProducer.flush() const events = await delayUntilEventIngested(() => hub.db.fetchEvents())