diff --git a/plugin-server/src/utils/db/db.ts b/plugin-server/src/utils/db/db.ts index 33c384e28fb03e..4a046319c24a60 100644 --- a/plugin-server/src/utils/db/db.ts +++ b/plugin-server/src/utils/db/db.ts @@ -652,43 +652,73 @@ export class DB { uuid: string, distinctIds?: string[] ): Promise { - const kafkaMessages: ProducerRecord[] = [] + distinctIds ||= [] + const version = 0 // We're creating the person now! - const person = await this.postgres.transaction(PostgresUse.COMMON_WRITE, 'createPerson', async (tx) => { - const insertResult = await this.postgres.query( - tx, - 'INSERT INTO posthog_person (created_at, properties, properties_last_updated_at, properties_last_operation, team_id, is_user_id, is_identified, uuid, version) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) RETURNING *', - [ - createdAt.toISO(), - JSON.stringify(properties), - JSON.stringify(propertiesLastUpdatedAt), - JSON.stringify(propertiesLastOperation), - teamId, - isUserId, - isIdentified, - uuid, - 0, - ], - 'insertPerson' - ) - const personCreated = insertResult.rows[0] as RawPerson - const person = { - ...personCreated, - created_at: DateTime.fromISO(personCreated.created_at).toUTC(), - version: Number(personCreated.version || 0), - } as Person - - kafkaMessages.push( - generateKafkaPersonUpdateMessage(createdAt, properties, teamId, isIdentified, uuid, person.version) - ) + const insertResult = await this.postgres.query( + PostgresUse.COMMON_WRITE, + `WITH inserted_person AS ( + INSERT INTO posthog_person ( + created_at, properties, properties_last_updated_at, + properties_last_operation, team_id, is_user_id, is_identified, uuid, version + ) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) + RETURNING * + )` + + distinctIds + .map( + (_, index) => `, distinct_id_${index} AS ( + INSERT INTO posthog_persondistinctid (distinct_id, person_id, team_id, version) + VALUES ($${10 + index}, (SELECT id FROM inserted_person), $5, $9))` + ) + .join('') + + `SELECT * FROM inserted_person;`, + [ + createdAt.toISO(), + JSON.stringify(properties), + JSON.stringify(propertiesLastUpdatedAt), + JSON.stringify(propertiesLastOperation), + teamId, + isUserId, + isIdentified, + uuid, + version, + // The copy and reverse here is to maintain compatability with pre-existing code + // and tests. Postgres appears to assign IDs in reverse order of the INSERTs in the + // CTEs above, so we need to reverse the distinctIds to match the old behavior where + // we would do a round trip for each INSERT. We shouldn't actually depend on the + // `id` column of distinct_ids, so this is just a simple way to keeps tests exactly + // the same and prove behavior is the same as before. + ...distinctIds.slice().reverse(), + ], + 'insertPerson' + ) + const personCreated = insertResult.rows[0] as RawPerson + const person = { + ...personCreated, + created_at: DateTime.fromISO(personCreated.created_at).toUTC(), + version, + } as Person - for (const distinctId of distinctIds || []) { - const messages = await this.addDistinctIdPooled(person, distinctId, tx) - kafkaMessages.push(...messages) - } + const kafkaMessages: ProducerRecord[] = [] + kafkaMessages.push(generateKafkaPersonUpdateMessage(createdAt, properties, teamId, isIdentified, uuid, version)) - return person - }) + for (const distinctId of distinctIds) { + kafkaMessages.push({ + topic: KAFKA_PERSON_DISTINCT_ID, + messages: [ + { + value: JSON.stringify({ + person_id: person.uuid, + team_id: teamId, + distinct_id: distinctId, + version, + is_deleted: 0, + }), + }, + ], + }) + } await this.kafkaProducer.queueMessages(kafkaMessages) return person