diff --git a/plugin-server/src/types.ts b/plugin-server/src/types.ts index 331fc52f2e696..f3106fe62dd24 100644 --- a/plugin-server/src/types.ts +++ b/plugin-server/src/types.ts @@ -619,6 +619,7 @@ interface BaseEvent { export type ISOTimestamp = Brand export type ClickHouseTimestamp = Brand export type ClickHouseTimestampSecondPrecision = Brand +export type PersonMode = 'full' | 'propertyless' | 'force_upgrade' /** Raw event row from ClickHouse. */ export interface RawClickHouseEvent extends BaseEvent { @@ -638,7 +639,7 @@ export interface RawClickHouseEvent extends BaseEvent { group2_created_at?: ClickHouseTimestamp group3_created_at?: ClickHouseTimestamp group4_created_at?: ClickHouseTimestamp - person_mode: 'full' | 'propertyless' + person_mode: PersonMode } /** Parsed event row from ClickHouse. */ @@ -659,7 +660,7 @@ export interface ClickHouseEvent extends BaseEvent { group2_created_at?: DateTime | null group3_created_at?: DateTime | null group4_created_at?: DateTime | null - person_mode: 'full' | 'propertyless' + person_mode: PersonMode } /** Event in a database-agnostic shape, AKA an ingestion event. @@ -746,6 +747,11 @@ export interface Person { properties: Properties uuid: string created_at: DateTime + + // Set to `true` when an existing person row was found for this `distinct_id`, but the event was + // sent with `$process_person_profile=false`. This is an unexpected branch that we want to flag + // for debugging and billing purposes, and typically means a misconfigured SDK. + force_upgrade?: boolean } /** Clickhouse Person model. */ diff --git a/plugin-server/src/worker/ingestion/person-state.ts b/plugin-server/src/worker/ingestion/person-state.ts index e7f3652b2ef25..ca250aa3d52da 100644 --- a/plugin-server/src/worker/ingestion/person-state.ts +++ b/plugin-server/src/worker/ingestion/person-state.ts @@ -114,11 +114,16 @@ export class PersonState { async update(): Promise { if (!this.processPerson) { if (this.lazyPersonCreation) { - const person = await this.db.fetchPerson(this.teamId, this.distinctId, { useReadReplica: true }) - if (person) { + const existingPerson = await this.db.fetchPerson(this.teamId, this.distinctId, { useReadReplica: true }) + if (existingPerson) { + const person = existingPerson as Person + // Ensure person properties don't propagate elsewhere, such as onto the event itself. person.properties = {} + // See documentation on the field. + person.force_upgrade = true + return person } diff --git a/plugin-server/src/worker/ingestion/process-event.ts b/plugin-server/src/worker/ingestion/process-event.ts index 8b57e26190046..d3f49b85cfd71 100644 --- a/plugin-server/src/worker/ingestion/process-event.ts +++ b/plugin-server/src/worker/ingestion/process-event.ts @@ -11,6 +11,7 @@ import { Hub, ISOTimestamp, Person, + PersonMode, PreIngestionEvent, RawClickHouseEvent, Team, @@ -239,6 +240,13 @@ export class EventsProcessor { // TODO: Remove Redis caching for person that's not used anymore + let personMode: PersonMode = 'full' + if (person.force_upgrade) { + personMode = 'force_upgrade' + } else if (!processPerson) { + personMode = 'propertyless' + } + const rawEvent: RawClickHouseEvent = { uuid, event: safeClickhouseString(event), @@ -251,7 +259,7 @@ export class EventsProcessor { person_id: person.uuid, person_properties: eventPersonProperties, person_created_at: castTimestampOrNow(person.created_at, TimestampFormat.ClickHouseSecondPrecision), - person_mode: processPerson ? 'full' : 'propertyless', + person_mode: personMode, ...groupsColumns, } diff --git a/plugin-server/tests/worker/ingestion/person-state.test.ts b/plugin-server/tests/worker/ingestion/person-state.test.ts index 2223d0e2e1833..c465fb675ead0 100644 --- a/plugin-server/tests/worker/ingestion/person-state.test.ts +++ b/plugin-server/tests/worker/ingestion/person-state.test.ts @@ -222,6 +222,7 @@ describe('PersonState.update()', () => { created_at: DateTime.utc(1970, 1, 1, 0, 0, 5), // fake person created_at }) ) + expect(fakePerson.force_upgrade).toBeUndefined() // verify there is no Postgres person const persons = await fetchPostgresPersonsH() @@ -232,11 +233,11 @@ describe('PersonState.update()', () => { expect(distinctIds).toEqual(expect.arrayContaining([])) }) - it('merging with lazy person creation creates an override', async () => { + it('merging with lazy person creation creates an override and force_upgrade works', async () => { await hub.db.createPerson(timestamp, {}, {}, {}, teamId, null, false, oldUserUuid, [oldUserDistinctId]) const hubParam = undefined - const processPerson = true + let processPerson = true const lazyPersonCreation = true await personState( { @@ -266,6 +267,33 @@ describe('PersonState.update()', () => { }), ]) ) + + // Using the `distinct_id` again with `processPerson=false` results in + // `force_upgrade=true` and real Person `uuid` and `created_at` + processPerson = false + const event_uuid = new UUIDT().toString() + const fakePerson = await personState( + { + event: '$pageview', + distinct_id: newUserDistinctId, + uuid: event_uuid, + properties: { $set: { should_be_dropped: 100 } }, + }, + hubParam, + processPerson, + lazyPersonCreation + ).update() + await hub.db.kafkaProducer.flush() + + expect(fakePerson).toEqual( + expect.objectContaining({ + team_id: teamId, + uuid: oldUserUuid, // *old* user, because it existed before the merge + properties: {}, // empty even though there was a $set attempted + created_at: timestamp, // *not* the fake person created_at + force_upgrade: true, + }) + ) }) it('creates person if they are new', async () => { diff --git a/plugin-server/tests/worker/ingestion/process-event.test.ts b/plugin-server/tests/worker/ingestion/process-event.test.ts index 19f822fbb0b6e..b9947bb7eec74 100644 --- a/plugin-server/tests/worker/ingestion/process-event.test.ts +++ b/plugin-server/tests/worker/ingestion/process-event.test.ts @@ -171,6 +171,36 @@ describe('EventsProcessor#createEvent()', () => { ) }) + it('force_upgrade persons are recorded as such', async () => { + const processPerson = false + person.force_upgrade = true + await eventsProcessor.createEvent( + { ...preIngestionEvent, properties: { $group_0: 'group_key' } }, + person, + processPerson + ) + + await eventsProcessor.kafkaProducer.flush() + + const events = await delayUntilEventIngested(() => hub.db.fetchEvents()) + expect(events.length).toEqual(1) + expect(events[0]).toEqual( + expect.objectContaining({ + uuid: eventUuid, + event: '$pageview', + properties: {}, // $group_0 is removed + timestamp: expect.any(DateTime), + team_id: 2, + distinct_id: 'my_id', + elements_chain: null, + created_at: expect.any(DateTime), + person_id: personUuid, + person_properties: {}, + person_mode: 'force_upgrade', + }) + ) + }) + it('handles the person no longer existing', async () => { // This person is never in the DB, but createEvent gets a Person object and should use that const uuid = new UUIDT().toString() diff --git a/posthog/clickhouse/test/__snapshots__/test_schema.ambr b/posthog/clickhouse/test/__snapshots__/test_schema.ambr index 8539c393d05e6..851f3df420642 100644 --- a/posthog/clickhouse/test/__snapshots__/test_schema.ambr +++ b/posthog/clickhouse/test/__snapshots__/test_schema.ambr @@ -25,7 +25,7 @@ group2_created_at DateTime64, group3_created_at DateTime64, group4_created_at DateTime64, - person_mode Enum8('full' = 0, 'propertyless' = 1) + person_mode Enum8('full' = 0, 'propertyless' = 1, 'force_upgrade' = 2) @@ -108,7 +108,7 @@ group2_created_at DateTime64, group3_created_at DateTime64, group4_created_at DateTime64, - person_mode Enum8('full' = 0, 'propertyless' = 1) + person_mode Enum8('full' = 0, 'propertyless' = 1, 'force_upgrade' = 2) @@ -519,7 +519,7 @@ group2_created_at DateTime64, group3_created_at DateTime64, group4_created_at DateTime64, - person_mode Enum8('full' = 0, 'propertyless' = 1) + person_mode Enum8('full' = 0, 'propertyless' = 1, 'force_upgrade' = 2) , $group_0 VARCHAR COMMENT 'column_materializer::$group_0' , $group_1 VARCHAR COMMENT 'column_materializer::$group_1' @@ -840,7 +840,7 @@ group2_created_at DateTime64, group3_created_at DateTime64, group4_created_at DateTime64, - person_mode Enum8('full' = 0, 'propertyless' = 1) + person_mode Enum8('full' = 0, 'propertyless' = 1, 'force_upgrade' = 2) @@ -1887,7 +1887,7 @@ group2_created_at DateTime64, group3_created_at DateTime64, group4_created_at DateTime64, - person_mode Enum8('full' = 0, 'propertyless' = 1) + person_mode Enum8('full' = 0, 'propertyless' = 1, 'force_upgrade' = 2) , $group_0 VARCHAR MATERIALIZED replaceRegexpAll(JSONExtractRaw(properties, '$group_0'), '^"|"$', '') COMMENT 'column_materializer::$group_0' , $group_1 VARCHAR MATERIALIZED replaceRegexpAll(JSONExtractRaw(properties, '$group_1'), '^"|"$', '') COMMENT 'column_materializer::$group_1' @@ -2222,7 +2222,7 @@ group2_created_at DateTime64, group3_created_at DateTime64, group4_created_at DateTime64, - person_mode Enum8('full' = 0, 'propertyless' = 1) + person_mode Enum8('full' = 0, 'propertyless' = 1, 'force_upgrade' = 2) , _timestamp DateTime @@ -2778,7 +2778,7 @@ group2_created_at DateTime64, group3_created_at DateTime64, group4_created_at DateTime64, - person_mode Enum8('full' = 0, 'propertyless' = 1) + person_mode Enum8('full' = 0, 'propertyless' = 1, 'force_upgrade' = 2) , $group_0 VARCHAR MATERIALIZED replaceRegexpAll(JSONExtractRaw(properties, '$group_0'), '^"|"$', '') COMMENT 'column_materializer::$group_0' , $group_1 VARCHAR MATERIALIZED replaceRegexpAll(JSONExtractRaw(properties, '$group_1'), '^"|"$', '') COMMENT 'column_materializer::$group_1' diff --git a/posthog/models/event/sql.py b/posthog/models/event/sql.py index f8d2e543da4c7..50c8abdefa9d1 100644 --- a/posthog/models/event/sql.py +++ b/posthog/models/event/sql.py @@ -49,7 +49,7 @@ group2_created_at DateTime64, group3_created_at DateTime64, group4_created_at DateTime64, - person_mode Enum8('full' = 0, 'propertyless' = 1) + person_mode Enum8('full' = 0, 'propertyless' = 1, 'force_upgrade' = 2) {materialized_columns} {extra_fields} {indexes}