diff --git a/plugin-server/src/utils/db/db.ts b/plugin-server/src/utils/db/db.ts index 33c384e28fb03e..ce20217a90703f 100644 --- a/plugin-server/src/utils/db/db.ts +++ b/plugin-server/src/utils/db/db.ts @@ -652,43 +652,68 @@ export class DB { uuid: string, distinctIds?: string[] ): Promise { - const kafkaMessages: ProducerRecord[] = [] + distinctIds ||= [] + const version = 0 // We're creating the person now! - const person = await this.postgres.transaction(PostgresUse.COMMON_WRITE, 'createPerson', async (tx) => { - const insertResult = await this.postgres.query( - tx, - 'INSERT INTO posthog_person (created_at, properties, properties_last_updated_at, properties_last_operation, team_id, is_user_id, is_identified, uuid, version) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) RETURNING *', - [ - createdAt.toISO(), - JSON.stringify(properties), - JSON.stringify(propertiesLastUpdatedAt), - JSON.stringify(propertiesLastOperation), - teamId, - isUserId, - isIdentified, - uuid, - 0, - ], - 'insertPerson' - ) - const personCreated = insertResult.rows[0] as RawPerson - const person = { - ...personCreated, - created_at: DateTime.fromISO(personCreated.created_at).toUTC(), - version: Number(personCreated.version || 0), - } as Person - - kafkaMessages.push( - generateKafkaPersonUpdateMessage(createdAt, properties, teamId, isIdentified, uuid, person.version) - ) + const insertResult = await this.postgres.query( + PostgresUse.COMMON_WRITE, + `WITH inserted_person AS ( + INSERT INTO posthog_person ( + created_at, properties, properties_last_updated_at, + properties_last_operation, team_id, is_user_id, is_identified, uuid, version + ) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) + RETURNING * + )` + + distinctIds + .map( + (_, index) => `, distinct_id_${index} AS ( + INSERT INTO posthog_persondistinctid (distinct_id, person_id, team_id, version) + VALUES ($${10 + index}, (SELECT id FROM inserted_person), $5, $9) + )` + ) + .join('') + + `SELECT * FROM inserted_person;`, + [ + createdAt.toISO(), + JSON.stringify(properties), + JSON.stringify(propertiesLastUpdatedAt), + JSON.stringify(propertiesLastOperation), + teamId, + isUserId, + isIdentified, + uuid, + version, + ...distinctIds.reverse(), + ], + 'insertPerson' + ) + const personCreated = insertResult.rows[0] as RawPerson + const person = { + ...personCreated, + created_at: DateTime.fromISO(personCreated.created_at).toUTC(), + version, + } as Person - for (const distinctId of distinctIds || []) { - const messages = await this.addDistinctIdPooled(person, distinctId, tx) - kafkaMessages.push(...messages) - } + const kafkaMessages: ProducerRecord[] = [] + kafkaMessages.push(generateKafkaPersonUpdateMessage(createdAt, properties, teamId, isIdentified, uuid, version)) - return person - }) + for (const distinctId of distinctIds || []) { + kafkaMessages.push({ + topic: KAFKA_PERSON_DISTINCT_ID, + messages: [ + { + value: JSON.stringify({ + person_id: person.uuid, + team_id: teamId, + distinct_id: distinctId, + version, + is_deleted: 0, + }), + }, + ], + }) + } await this.kafkaProducer.queueMessages(kafkaMessages) return person diff --git a/plugin-server/tests/main/process-event.test.ts b/plugin-server/tests/main/process-event.test.ts index 786acbbd7698e4..d13c80a8fade59 100644 --- a/plugin-server/tests/main/process-event.test.ts +++ b/plugin-server/tests/main/process-event.test.ts @@ -1142,7 +1142,7 @@ test('alias merge properties', async () => { expect((await hub.db.fetchEvents()).length).toBe(1) expect((await hub.db.fetchPersons()).length).toBe(1) const [person] = await hub.db.fetchPersons() - expect((await hub.db.fetchDistinctIdValues(person)).sort()).toEqual(['new_distinct_id', 'old_distinct_id']) + expect(await hub.db.fetchDistinctIdValues(person)).toEqual(['new_distinct_id', 'old_distinct_id']) expect(person.properties).toEqual({ key_on_both: 'new value both', key_on_new: 'new value', diff --git a/plugin-server/tests/worker/ingestion/postgres-parity.test.ts b/plugin-server/tests/worker/ingestion/postgres-parity.test.ts index 7f5f722dc122f1..f005f974c04656 100644 --- a/plugin-server/tests/worker/ingestion/postgres-parity.test.ts +++ b/plugin-server/tests/worker/ingestion/postgres-parity.test.ts @@ -119,7 +119,7 @@ describe('postgres parity', () => { }, ]) const postgresDistinctIds = await hub.db.fetchDistinctIdValues(person, Database.Postgres) - expect(postgresDistinctIds).toEqual(['distinct1', 'distinct2']) + expect(postgresDistinctIds.sort()).toEqual(['distinct1', 'distinct2']) const newClickHouseDistinctIdValues = await hub.db.fetchDistinctIds(person, Database.ClickHouse) expect(newClickHouseDistinctIdValues).toEqual(