Skip to content

Commit

Permalink
feat(plugin-server): handle process_person=false (#21262)
Browse files Browse the repository at this point in the history
* chore(plugin-server): extract normalizeEvent method, since this has nothing to do with persons

* chore(plugin-server): cleanup PersonState args

* chore(plugin-server): lazily json serialize some timeoutguard context

* feat(plugin-server): handle process_person=false
  • Loading branch information
bretthoerner authored Apr 5, 2024
1 parent b28e44c commit ea05d8e
Show file tree
Hide file tree
Showing 18 changed files with 614 additions and 105 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,78 @@ test.concurrent(`event ingestion: can $set and update person properties`, async
})
})

test.concurrent(
`event ingestion: $process_person=false drops expected fields, doesn't include person properties`,
async () => {
const teamId = await createTeam(organizationId)
const distinctId = new UUIDT().toString()

// Normal ("full") event creates person with a property.
await capture({
teamId,
distinctId,
uuid: new UUIDT().toString(),
event: '$identify',
properties: {
distinct_id: distinctId,
$set: { prop: 'value' },
},
})

// Propertyless event tries to $set, $set_once, $unset and use groups, but none of these
// should work.
const properylessUuid = new UUIDT().toString()
await capture({
teamId,
distinctId,
uuid: properylessUuid,
event: 'custom event',
properties: {
$process_person: false,
$group_0: 'group_key',
$set: {
c: 3,
},
$set_once: {
d: 4,
},
$unset: ['prop'],
},
$set: {
a: 1,
},
$set_once: {
b: 2,
},
})
await waitForExpect(async () => {
const [event] = await fetchEvents(teamId, properylessUuid)
expect(event).toEqual(
expect.objectContaining({
person_properties: {},
properties: { $process_person: false, uuid: properylessUuid, $sent_at: expect.any(String) },
person_mode: 'propertyless',
})
)
})

// Another normal ("full") event sees the existing person property (it wasn't $unset)
const secondUuid = new UUIDT().toString()
await capture({ teamId, distinctId, uuid: secondUuid, event: 'custom event', properties: {} })
await waitForExpect(async () => {
const [event] = await fetchEvents(teamId, secondUuid)
expect(event).toEqual(
expect.objectContaining({
person_properties: expect.objectContaining({
prop: 'value',
}),
person_mode: 'full',
})
)
})
}
)

test.concurrent(`event ingestion: can $set and update person properties with top level $set`, async () => {
// We support $set at the top level. This is as the time of writing how the
// posthog-js library works.
Expand Down
10 changes: 9 additions & 1 deletion plugin-server/src/backfill.ts
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,14 @@ async function handleEvent(db: DB, event: RawClickHouseEvent): Promise<void> {
// single CH event handlin
const pluginEvent = formPluginEvent(event)
const ts: DateTime = DateTime.fromISO(pluginEvent.timestamp as string)
const personState = new PersonState(pluginEvent, pluginEvent.team_id, pluginEvent.distinct_id, ts, db)
const processPerson = true
const personState = new PersonState(
pluginEvent,
pluginEvent.team_id,
pluginEvent.distinct_id,
ts,
processPerson,
db
)
await personState.handleIdentifyOrAlias()
}
2 changes: 2 additions & 0 deletions plugin-server/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -635,6 +635,7 @@ export interface RawClickHouseEvent extends BaseEvent {
group2_created_at?: ClickHouseTimestamp
group3_created_at?: ClickHouseTimestamp
group4_created_at?: ClickHouseTimestamp
person_mode: 'full' | 'propertyless'
}

/** Parsed event row from ClickHouse. */
Expand All @@ -655,6 +656,7 @@ export interface ClickHouseEvent extends BaseEvent {
group2_created_at?: DateTime | null
group3_created_at?: DateTime | null
group4_created_at?: DateTime | null
person_mode: 'full' | 'propertyless'
}

/** Event in a database-agnostic shape, AKA an ingestion event.
Expand Down
28 changes: 28 additions & 0 deletions plugin-server/src/utils/event.ts
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,34 @@ export function convertToIngestionEvent(event: RawClickHouseEvent, skipElementsC
}
}

/// Does normalization steps involving the $process_person property. This is currently a separate
/// function because `normalizeEvent` is called from multiple places, some early in the pipeline,
/// and we want to have one trusted place where `$process_person` is handled and passed through
/// all of the processing steps.
///
/// If `formPipelineEvent` is removed this can easily be combined with `normalizeEvent`.
export function normalizeProcessPerson(event: PluginEvent, processPerson: boolean): PluginEvent {
const properties = event.properties ?? {}

// $process_person steps:
// 1. If person processing is disabled, $set, $set_once and $unset are dropped
// 2. Normalize the $process_person property on the event, if true, drop it since true is
// the default. If it was false before plugins ran, ensure it's still set to false.
if (!processPerson) {
delete event.$set
delete event.$set_once
delete properties.$set
delete properties.$set_once
delete properties.$unset
properties.$process_person = false
} else {
delete properties.$process_person
}

event.properties = properties
return event
}

export function normalizeEvent(event: PluginEvent): PluginEvent {
event.distinct_id = event.distinct_id?.toString()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ import { EventPipelineRunner } from './runner'
export async function createEventStep(
runner: EventPipelineRunner,
event: PreIngestionEvent,
person: Person
person: Person,
processPerson: boolean
): Promise<[RawClickHouseEvent, Promise<void>]> {
return await runner.hub.eventsProcessor.createEvent(event, person)
return await runner.hub.eventsProcessor.createEvent(event, person, processPerson)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import { PluginEvent } from '@posthog/plugin-scaffold'
import { DateTime } from 'luxon'

import { normalizeEvent, normalizeProcessPerson } from '../../../utils/event'
import { status } from '../../../utils/status'
import { parseEventTimestamp } from '../timestamps'

export function normalizeEventStep(event: PluginEvent, processPerson: boolean): Promise<[PluginEvent, DateTime]> {
let timestamp: DateTime
try {
event = normalizeEvent(event)
event = normalizeProcessPerson(event, processPerson)
timestamp = parseEventTimestamp(event)
} catch (error) {
status.warn('⚠️', 'Failed normalizing event', {
team_id: event.team_id,
uuid: event.uuid,
error,
})
throw error
}

// We need to be "async" to deal with how `runStep` currently works.
return Promise.resolve([event, timestamp])
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@ import { captureIngestionWarning } from '../utils'
import { invalidTimestampCounter } from './metrics'
import { EventPipelineRunner } from './runner'

export async function prepareEventStep(runner: EventPipelineRunner, event: PluginEvent): Promise<PreIngestionEvent> {
export async function prepareEventStep(
runner: EventPipelineRunner,
event: PluginEvent,
processPerson: boolean
): Promise<PreIngestionEvent> {
const { team_id, uuid } = event
const tsParsingIngestionWarnings: Promise<void>[] = []
const invalidTimestampCallback = function (type: string, details: Record<string, any>) {
Expand All @@ -20,7 +24,8 @@ export async function prepareEventStep(runner: EventPipelineRunner, event: Plugi
event,
team_id,
parseEventTimestamp(event, invalidTimestampCallback),
uuid! // it will throw if it's undefined,
uuid!, // it will throw if it's undefined,
processPerson
)
await Promise.all(tsParsingIngestionWarnings)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,15 @@ import { PluginEvent } from '@posthog/plugin-scaffold'
import { DateTime } from 'luxon'
import { Person } from 'types'

import { normalizeEvent } from '../../../utils/event'
import { status } from '../../../utils/status'
import { DeferredPersonOverrideWriter, PersonState } from '../person-state'
import { parseEventTimestamp } from '../timestamps'
import { EventPipelineRunner } from './runner'

export async function processPersonsStep(
runner: EventPipelineRunner,
pluginEvent: PluginEvent
event: PluginEvent,
timestamp: DateTime,
processPerson: boolean
): Promise<[PluginEvent, Person]> {
let event: PluginEvent
let timestamp: DateTime
try {
event = normalizeEvent(pluginEvent)
timestamp = parseEventTimestamp(event)
} catch (error) {
status.warn('⚠️', 'Failed normalizing event', { team_id: pluginEvent.team_id, uuid: pluginEvent.uuid, error })
throw error
}

let overridesWriter: DeferredPersonOverrideWriter | undefined = undefined
if (runner.poEEmbraceJoin) {
overridesWriter = new DeferredPersonOverrideWriter(runner.hub.db.postgres)
Expand All @@ -32,6 +21,7 @@ export async function processPersonsStep(
event.team_id,
String(event.distinct_id),
timestamp,
processPerson,
runner.hub.db,
overridesWriter
).update()
Expand Down
62 changes: 54 additions & 8 deletions plugin-server/src/worker/ingestion/event-pipeline/runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ import { runInSpan } from '../../../sentry'
import { Hub, PipelineEvent } from '../../../types'
import { DependencyUnavailableError } from '../../../utils/db/error'
import { timeoutGuard } from '../../../utils/db/utils'
import { normalizeProcessPerson } from '../../../utils/event'
import { status } from '../../../utils/status'
import { generateEventDeadLetterQueueMessage } from '../utils'
import { captureIngestionWarning, generateEventDeadLetterQueueMessage } from '../utils'
import { createEventStep } from './createEventStep'
import {
eventProcessedAndIngestedCounter,
Expand All @@ -17,6 +18,7 @@ import {
pipelineStepMsSummary,
pipelineStepThrowCounter,
} from './metrics'
import { normalizeEventStep } from './normalizeEventStep'
import { pluginsProcessEventStep } from './pluginsProcessEventStep'
import { populateTeamDataStep } from './populateTeamDataStep'
import { prepareEventStep } from './prepareEventStep'
Expand Down Expand Up @@ -117,22 +119,66 @@ export class EventPipelineRunner {
// ingestion pipeline is working well for all teams.
this.poEEmbraceJoin = true
}
const processedEvent = await this.runStep(pluginsProcessEventStep, [this, event], event.team_id)

let processPerson = true
if (event.properties && event.properties.$process_person === false) {
// We are purposefully being very explicit here. The `$process_person` property *must*
// exist and be set to `false` (not missing, or null, or any other value) to disable
// person processing.
processPerson = false

if (['$identify', '$create_alias', '$merge_dangerously', '$groupidentify'].includes(event.event)) {
const warningAck = captureIngestionWarning(
this.hub.db.kafkaProducer,
event.team_id,
'invalid_event_when_process_person_is_false',
{
eventUuid: event.uuid,
event: event.event,
distinctId: event.distinct_id,
},
{ alwaysSend: true }
)

return this.registerLastStep('invalidEventForProvidedFlags', [event], [warningAck])
}

// If person processing is disabled, go ahead and remove person related keys before
// any plugins have a chance to see them.
event = normalizeProcessPerson(event, processPerson)
}

const processedEvent = await this.runStep(pluginsProcessEventStep, [this, event], event.team_id)
if (processedEvent == null) {
// A plugin dropped the event.
return this.registerLastStep('pluginsProcessEventStep', [event])
}
const [normalizedEvent, person] = await this.runStep(processPersonsStep, [this, processedEvent], event.team_id)

const preparedEvent = await this.runStep(prepareEventStep, [this, normalizedEvent], event.team_id)
const [normalizedEvent, timestamp] = await this.runStep(
normalizeEventStep,
[processedEvent, processPerson],
event.team_id
)

const [postPersonEvent, person] = await this.runStep(
processPersonsStep,
[this, normalizedEvent, timestamp, processPerson],
event.team_id
)

const preparedEvent = await this.runStep(
prepareEventStep,
[this, postPersonEvent, processPerson],
event.team_id
)

const [rawClickhouseEvent, eventAck] = await this.runStep(
createEventStep,
[this, preparedEvent, person],
[this, preparedEvent, person, processPerson],
event.team_id
)

return this.registerLastStep('createEventStep', [rawClickhouseEvent, person], [eventAck])
return this.registerLastStep('createEventStep', [rawClickhouseEvent], [eventAck])
}

registerLastStep(stepName: string, args: any[], ackPromises?: Array<Promise<void>>): EventPipelineResult {
Expand All @@ -156,12 +202,12 @@ export class EventPipelineRunner {
const sendToSentry = false
const timeout = timeoutGuard(
`Event pipeline step stalled. Timeout warning after ${this.hub.PIPELINE_STEP_STALLED_LOG_TIMEOUT} sec! step=${step.name} team_id=${teamId} distinct_id=${this.originalEvent.distinct_id}`,
{
() => ({
step: step.name,
event: JSON.stringify(this.originalEvent),
teamId: teamId,
distinctId: this.originalEvent.distinct_id,
},
}),
this.hub.PIPELINE_STEP_STALLED_LOG_TIMEOUT * 1000,
sendToSentry
)
Expand Down
Loading

0 comments on commit ea05d8e

Please sign in to comment.