From 011c47ff87172541f7da88e677d7ad13b541b110 Mon Sep 17 00:00:00 2001
From: Brett Hoerner <brett@bretthoerner.com>
Date: Fri, 20 Oct 2023 16:31:05 -0400
Subject: [PATCH] refactor(plugin-server): reduce createPerson to a single
 query

---
 plugin-server/src/utils/db/db.ts | 98 +++++++++++++++++++++-----------
 1 file changed, 64 insertions(+), 34 deletions(-)

diff --git a/plugin-server/src/utils/db/db.ts b/plugin-server/src/utils/db/db.ts
index 33c384e28fb03e..4a046319c24a60 100644
--- a/plugin-server/src/utils/db/db.ts
+++ b/plugin-server/src/utils/db/db.ts
@@ -652,43 +652,73 @@ export class DB {
         uuid: string,
         distinctIds?: string[]
     ): Promise<Person> {
-        const kafkaMessages: ProducerRecord[] = []
+        distinctIds ||= []
+        const version = 0 // We're creating the person now!
 
-        const person = await this.postgres.transaction(PostgresUse.COMMON_WRITE, 'createPerson', async (tx) => {
-            const insertResult = await this.postgres.query(
-                tx,
-                'INSERT INTO posthog_person (created_at, properties, properties_last_updated_at, properties_last_operation, team_id, is_user_id, is_identified, uuid, version) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) RETURNING *',
-                [
-                    createdAt.toISO(),
-                    JSON.stringify(properties),
-                    JSON.stringify(propertiesLastUpdatedAt),
-                    JSON.stringify(propertiesLastOperation),
-                    teamId,
-                    isUserId,
-                    isIdentified,
-                    uuid,
-                    0,
-                ],
-                'insertPerson'
-            )
-            const personCreated = insertResult.rows[0] as RawPerson
-            const person = {
-                ...personCreated,
-                created_at: DateTime.fromISO(personCreated.created_at).toUTC(),
-                version: Number(personCreated.version || 0),
-            } as Person
-
-            kafkaMessages.push(
-                generateKafkaPersonUpdateMessage(createdAt, properties, teamId, isIdentified, uuid, person.version)
-            )
+        const insertResult = await this.postgres.query(
+            PostgresUse.COMMON_WRITE,
+            `WITH inserted_person AS (
+                    INSERT INTO posthog_person (
+                        created_at, properties, properties_last_updated_at,
+                        properties_last_operation, team_id, is_user_id, is_identified, uuid, version
+                    )
+                    VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
+                    RETURNING *
+                )` +
+                distinctIds
+                    .map(
+                        (_, index) => `, distinct_id_${index} AS (
+                        INSERT INTO posthog_persondistinctid (distinct_id, person_id, team_id, version)
+                        VALUES ($${10 + index}, (SELECT id FROM inserted_person), $5, $9))`
+                    )
+                    .join('') +
+                `SELECT * FROM inserted_person;`,
+            [
+                createdAt.toISO(),
+                JSON.stringify(properties),
+                JSON.stringify(propertiesLastUpdatedAt),
+                JSON.stringify(propertiesLastOperation),
+                teamId,
+                isUserId,
+                isIdentified,
+                uuid,
+                version,
+                // The copy and reverse here is to maintain compatability with pre-existing code
+                // and tests. Postgres appears to assign IDs in reverse order of the INSERTs in the
+                // CTEs above, so we need to reverse the distinctIds to match the old behavior where
+                // we would do a round trip for each INSERT. We shouldn't actually depend on the
+                // `id` column of distinct_ids, so this is just a simple way to keeps tests exactly
+                // the same and prove behavior is the same as before.
+                ...distinctIds.slice().reverse(),
+            ],
+            'insertPerson'
+        )
+        const personCreated = insertResult.rows[0] as RawPerson
+        const person = {
+            ...personCreated,
+            created_at: DateTime.fromISO(personCreated.created_at).toUTC(),
+            version,
+        } as Person
 
-            for (const distinctId of distinctIds || []) {
-                const messages = await this.addDistinctIdPooled(person, distinctId, tx)
-                kafkaMessages.push(...messages)
-            }
+        const kafkaMessages: ProducerRecord[] = []
+        kafkaMessages.push(generateKafkaPersonUpdateMessage(createdAt, properties, teamId, isIdentified, uuid, version))
 
-            return person
-        })
+        for (const distinctId of distinctIds) {
+            kafkaMessages.push({
+                topic: KAFKA_PERSON_DISTINCT_ID,
+                messages: [
+                    {
+                        value: JSON.stringify({
+                            person_id: person.uuid,
+                            team_id: teamId,
+                            distinct_id: distinctId,
+                            version,
+                            is_deleted: 0,
+                        }),
+                    },
+                ],
+            })
+        }
 
         await this.kafkaProducer.queueMessages(kafkaMessages)
         return person