Skip to content

Commit

Permalink
feat(plugin-server): handle process_person=false
Browse files Browse the repository at this point in the history
  • Loading branch information
bretthoerner committed Apr 5, 2024
1 parent aca9f72 commit cb3ded9
Show file tree
Hide file tree
Showing 17 changed files with 499 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,78 @@ test.concurrent(`event ingestion: can $set and update person properties`, async
})
})

test.concurrent(
`event ingestion: $process_person=false drops expected fields, doesn't include person properties`,
async () => {
const teamId = await createTeam(organizationId)
const distinctId = new UUIDT().toString()

// Normal ("full") event creates person with a property.
await capture({
teamId,
distinctId,
uuid: new UUIDT().toString(),
event: '$identify',
properties: {
distinct_id: distinctId,
$set: { prop: 'value' },
},
})

// Propertyless event tries to $set, $set_once, $unset and use groups, but none of these
// should work.
const properylessUuid = new UUIDT().toString()
await capture({
teamId,
distinctId,
uuid: properylessUuid,
event: 'custom event',
properties: {
$process_person: false,
$group_0: 'group_key',
$set: {
c: 3,
},
$set_once: {
d: 4,
},
$unset: ['prop'],
},
$set: {
a: 1,
},
$set_once: {
b: 2,
},
})
await waitForExpect(async () => {
const [event] = await fetchEvents(teamId, properylessUuid)
expect(event).toEqual(
expect.objectContaining({
person_properties: {},
properties: { $process_person: false, uuid: properylessUuid, $sent_at: expect.any(String) },
person_mode: 'propertyless',
})
)
})

// Another normal ("full") event sees the existing person property (it wasn't $unset)
const secondUuid = new UUIDT().toString()
await capture({ teamId, distinctId, uuid: secondUuid, event: 'custom event', properties: {} })
await waitForExpect(async () => {
const [event] = await fetchEvents(teamId, secondUuid)
expect(event).toEqual(
expect.objectContaining({
person_properties: expect.objectContaining({
prop: 'value',
}),
person_mode: 'full',
})
)
})
}
)

test.concurrent(`event ingestion: can $set and update person properties with top level $set`, async () => {
// We support $set at the top level. This is as the time of writing how the
// posthog-js library works.
Expand Down
10 changes: 9 additions & 1 deletion plugin-server/src/backfill.ts
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,14 @@ async function handleEvent(db: DB, event: RawClickHouseEvent): Promise<void> {
// single CH event handlin
const pluginEvent = formPluginEvent(event)
const ts: DateTime = DateTime.fromISO(pluginEvent.timestamp as string)
const personState = new PersonState(pluginEvent, pluginEvent.team_id, pluginEvent.distinct_id, ts, db)
const processPerson = true
const personState = new PersonState(
pluginEvent,
pluginEvent.team_id,
pluginEvent.distinct_id,
ts,
processPerson,
db
)
await personState.handleIdentifyOrAlias()
}
2 changes: 2 additions & 0 deletions plugin-server/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -635,6 +635,7 @@ export interface RawClickHouseEvent extends BaseEvent {
group2_created_at?: ClickHouseTimestamp
group3_created_at?: ClickHouseTimestamp
group4_created_at?: ClickHouseTimestamp
person_mode: 'full' | 'propertyless'
}

/** Parsed event row from ClickHouse. */
Expand All @@ -655,6 +656,7 @@ export interface ClickHouseEvent extends BaseEvent {
group2_created_at?: DateTime | null
group3_created_at?: DateTime | null
group4_created_at?: DateTime | null
person_mode: 'full' | 'propertyless'
}

/** Event in a database-agnostic shape, AKA an ingestion event.
Expand Down
28 changes: 28 additions & 0 deletions plugin-server/src/utils/event.ts
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,34 @@ export function convertToIngestionEvent(event: RawClickHouseEvent, skipElementsC
}
}

/// Does normalization steps involving the $process_person property. This is currently a separate
/// function because `normalizeEvent` is called from multiple places, some early in the pipeline,
/// and we want to have one trusted place where `$process_person` is handled and passed through
/// all of the processing steps.
///
/// If `formPipelineEvent` is removed this can easily be combined with `normalizeEvent`.
export function normalizeProcessPerson(event: PluginEvent, processPerson: boolean): PluginEvent {
const properties = event.properties ?? {}

// $process_person steps:
// 1. If person processing is disabled, $set, $set_once and $unset are dropped
// 2. Normalize the $process_person property on the event, if true, drop it since true is
// the default. If it was false before plugins ran, ensure it's still set to false.
if (!processPerson) {
delete event.$set
delete event.$set_once
delete properties.$set
delete properties.$set_once
delete properties.$unset
properties.$process_person = false
} else {
delete properties.$process_person
}

event.properties = properties
return event
}

export function normalizeEvent(event: PluginEvent): PluginEvent {
event.distinct_id = event.distinct_id?.toString()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ import { EventPipelineRunner } from './runner'
export async function createEventStep(
runner: EventPipelineRunner,
event: PreIngestionEvent,
person: Person
person: Person,
processPerson: boolean
): Promise<[RawClickHouseEvent, Promise<void>]> {
return await runner.hub.eventsProcessor.createEvent(event, person)
return await runner.hub.eventsProcessor.createEvent(event, person, processPerson)
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
import { PluginEvent } from '@posthog/plugin-scaffold'
import { DateTime } from 'luxon'

import { normalizeEvent } from '../../../utils/event'
import { normalizeEvent, normalizeProcessPerson } from '../../../utils/event'
import { status } from '../../../utils/status'
import { parseEventTimestamp } from '../timestamps'

export function normalizeEventStep(event: PluginEvent): [PluginEvent, DateTime] {
export function normalizeEventStep(event: PluginEvent, processPerson: boolean): Promise<[PluginEvent, DateTime]> {
let timestamp: DateTime
try {
event = normalizeEvent(event)
event = normalizeProcessPerson(event, processPerson)
timestamp = parseEventTimestamp(event)
} catch (error) {
status.warn('⚠️', 'Failed normalizing event', {
Expand All @@ -19,5 +20,6 @@ export function normalizeEventStep(event: PluginEvent): [PluginEvent, DateTime]
throw error
}

return [event, timestamp]
// We need to be "async" to deal with how `runStep` currently works.
return Promise.resolve([event, timestamp])
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@ import { captureIngestionWarning } from '../utils'
import { invalidTimestampCounter } from './metrics'
import { EventPipelineRunner } from './runner'

export async function prepareEventStep(runner: EventPipelineRunner, event: PluginEvent): Promise<PreIngestionEvent> {
export async function prepareEventStep(
runner: EventPipelineRunner,
event: PluginEvent,
processPerson: boolean
): Promise<PreIngestionEvent> {
const { team_id, uuid } = event
const tsParsingIngestionWarnings: Promise<void>[] = []
const invalidTimestampCallback = function (type: string, details: Record<string, any>) {
Expand All @@ -20,7 +24,8 @@ export async function prepareEventStep(runner: EventPipelineRunner, event: Plugi
event,
team_id,
parseEventTimestamp(event, invalidTimestampCallback),
uuid! // it will throw if it's undefined,
uuid!, // it will throw if it's undefined,
processPerson
)
await Promise.all(tsParsingIngestionWarnings)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ import { EventPipelineRunner } from './runner'
export async function processPersonsStep(
runner: EventPipelineRunner,
event: PluginEvent,
timestamp: DateTime
timestamp: DateTime,
processPerson: boolean
): Promise<[PluginEvent, Person]> {
let overridesWriter: DeferredPersonOverrideWriter | undefined = undefined
if (runner.poEEmbraceJoin) {
Expand All @@ -20,6 +21,7 @@ export async function processPersonsStep(
event.team_id,
String(event.distinct_id),
timestamp,
processPerson,
runner.hub.db,
overridesWriter
).update()
Expand Down
51 changes: 44 additions & 7 deletions plugin-server/src/worker/ingestion/event-pipeline/runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ import { runInSpan } from '../../../sentry'
import { Hub, PipelineEvent } from '../../../types'
import { DependencyUnavailableError } from '../../../utils/db/error'
import { timeoutGuard } from '../../../utils/db/utils'
import { normalizeProcessPerson } from '../../../utils/event'
import { status } from '../../../utils/status'
import { generateEventDeadLetterQueueMessage } from '../utils'
import { captureIngestionWarning, generateEventDeadLetterQueueMessage } from '../utils'
import { createEventStep } from './createEventStep'
import {
eventProcessedAndIngestedCounter,
Expand Down Expand Up @@ -118,30 +119,66 @@ export class EventPipelineRunner {
// ingestion pipeline is working well for all teams.
this.poEEmbraceJoin = true
}

let processPerson = true
if (event.properties && event.properties.$process_person === false) {
// We are purposefully being very explicit here. The `$process_person` property *must*
// exist and be set to `false` (not missing, or null, or any other value) to disable
// person processing.
processPerson = false

if (['$identify', '$create_alias', '$merge_dangerously', '$groupidentify'].includes(event.event)) {
const warningAck = captureIngestionWarning(
this.hub.db.kafkaProducer,
event.team_id,
'invalid_event_when_process_person_is_false',
{
eventUuid: event.uuid,
event: event.event,
distinctId: event.distinct_id,
},
{ alwaysSend: true }
)

return this.registerLastStep('invalidEventForProvidedFlags', [event], [warningAck])
}

// If person processing is disabled, go ahead and remove person related keys before
// any plugins have a chance to see them.
event = normalizeProcessPerson(event, processPerson)
}

const processedEvent = await this.runStep(pluginsProcessEventStep, [this, event], event.team_id)
if (processedEvent == null) {
// A plugin dropped the event.
return this.registerLastStep('pluginsProcessEventStep', [event])
}

// Normalizing is sync and doesn't need to run in a full `runStep` span for tracking.
const [normalizedEvent, timestamp] = normalizeEventStep(processedEvent)
const [normalizedEvent, timestamp] = await this.runStep(
normalizeEventStep,
[processedEvent, processPerson],
event.team_id
)

const [postPersonEvent, person] = await this.runStep(
processPersonsStep,
[this, normalizedEvent, timestamp],
[this, normalizedEvent, timestamp, processPerson],
event.team_id
)

const preparedEvent = await this.runStep(prepareEventStep, [this, postPersonEvent], event.team_id)
const preparedEvent = await this.runStep(
prepareEventStep,
[this, postPersonEvent, processPerson],
event.team_id
)

const [rawClickhouseEvent, eventAck] = await this.runStep(
createEventStep,
[this, preparedEvent, person],
[this, preparedEvent, person, processPerson],
event.team_id
)

return this.registerLastStep('createEventStep', [rawClickhouseEvent, person], [eventAck])
return this.registerLastStep('createEventStep', [rawClickhouseEvent], [eventAck])
}

registerLastStep(stepName: string, args: any[], ackPromises?: Array<Promise<void>>): EventPipelineResult {
Expand Down
25 changes: 23 additions & 2 deletions plugin-server/src/worker/ingestion/person-state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ export class PersonState {
private teamId: number,
private distinctId: string,
private timestamp: DateTime,
private processPerson: boolean, // $process_person flag from the event
private db: DB,
private personOverrideWriter?: DeferredPersonOverrideWriter,
uuid: UUIDT | undefined = undefined
Expand All @@ -103,6 +104,21 @@ export class PersonState {
}

async update(): Promise<Person> {
if (!this.processPerson) {
// We don't need to handle any properties for `processPerson=false` events, so we can
// short circuit by just finding or creating a person and returning early.
//
// In the future, we won't even get or create a real Person for these events, and so
// the `processPerson` boolean can be removed from this class altogether, as this class
// shouldn't even need to be invoked.
const [person, _] = await this.createOrGetPerson()

// Ensure person properties don't propagate elsewhere, such as onto the event itself.
person.properties = {}

return person
}

const person: Person | undefined = await this.handleIdentifyOrAlias() // TODO: make it also return a boolean for if we can exit early here
if (person) {
// try to shortcut if we have the person from identify or alias
Expand Down Expand Up @@ -141,8 +157,13 @@ export class PersonState {
return [person, false]
}

const properties = this.eventProperties['$set'] || {}
const propertiesOnce = this.eventProperties['$set_once'] || {}
let properties = {}
let propertiesOnce = {}
if (this.processPerson) {
properties = this.eventProperties['$set']
propertiesOnce = this.eventProperties['$set_once']
}

person = await this.createPerson(
this.timestamp,
properties || {},
Expand Down
Loading

0 comments on commit cb3ded9

Please sign in to comment.