Skip to content

Commit

Permalink
refactor(plugin-server): reduce createPerson to a single query
Browse files Browse the repository at this point in the history
  • Loading branch information
bretthoerner committed Oct 25, 2023
1 parent 367b44a commit 011c47f
Showing 1 changed file with 64 additions and 34 deletions.
98 changes: 64 additions & 34 deletions plugin-server/src/utils/db/db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -652,43 +652,73 @@ export class DB {
uuid: string,
distinctIds?: string[]
): Promise<Person> {
const kafkaMessages: ProducerRecord[] = []
distinctIds ||= []
const version = 0 // We're creating the person now!

const person = await this.postgres.transaction(PostgresUse.COMMON_WRITE, 'createPerson', async (tx) => {
const insertResult = await this.postgres.query(
tx,
'INSERT INTO posthog_person (created_at, properties, properties_last_updated_at, properties_last_operation, team_id, is_user_id, is_identified, uuid, version) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) RETURNING *',
[
createdAt.toISO(),
JSON.stringify(properties),
JSON.stringify(propertiesLastUpdatedAt),
JSON.stringify(propertiesLastOperation),
teamId,
isUserId,
isIdentified,
uuid,
0,
],
'insertPerson'
)
const personCreated = insertResult.rows[0] as RawPerson
const person = {
...personCreated,
created_at: DateTime.fromISO(personCreated.created_at).toUTC(),
version: Number(personCreated.version || 0),
} as Person

kafkaMessages.push(
generateKafkaPersonUpdateMessage(createdAt, properties, teamId, isIdentified, uuid, person.version)
)
const insertResult = await this.postgres.query(
PostgresUse.COMMON_WRITE,
`WITH inserted_person AS (
INSERT INTO posthog_person (
created_at, properties, properties_last_updated_at,
properties_last_operation, team_id, is_user_id, is_identified, uuid, version
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
RETURNING *
)` +
distinctIds
.map(
(_, index) => `, distinct_id_${index} AS (
INSERT INTO posthog_persondistinctid (distinct_id, person_id, team_id, version)
VALUES ($${10 + index}, (SELECT id FROM inserted_person), $5, $9))`
)
.join('') +
`SELECT * FROM inserted_person;`,
[
createdAt.toISO(),
JSON.stringify(properties),
JSON.stringify(propertiesLastUpdatedAt),
JSON.stringify(propertiesLastOperation),
teamId,
isUserId,
isIdentified,
uuid,
version,
// The copy and reverse here is to maintain compatability with pre-existing code
// and tests. Postgres appears to assign IDs in reverse order of the INSERTs in the
// CTEs above, so we need to reverse the distinctIds to match the old behavior where
// we would do a round trip for each INSERT. We shouldn't actually depend on the
// `id` column of distinct_ids, so this is just a simple way to keeps tests exactly
// the same and prove behavior is the same as before.
...distinctIds.slice().reverse(),
],
'insertPerson'
)
const personCreated = insertResult.rows[0] as RawPerson
const person = {
...personCreated,
created_at: DateTime.fromISO(personCreated.created_at).toUTC(),
version,
} as Person

for (const distinctId of distinctIds || []) {
const messages = await this.addDistinctIdPooled(person, distinctId, tx)
kafkaMessages.push(...messages)
}
const kafkaMessages: ProducerRecord[] = []
kafkaMessages.push(generateKafkaPersonUpdateMessage(createdAt, properties, teamId, isIdentified, uuid, version))

return person
})
for (const distinctId of distinctIds) {
kafkaMessages.push({
topic: KAFKA_PERSON_DISTINCT_ID,
messages: [
{
value: JSON.stringify({
person_id: person.uuid,
team_id: teamId,
distinct_id: distinctId,
version,
is_deleted: 0,
}),
},
],
})
}

await this.kafkaProducer.queueMessages(kafkaMessages)
return person
Expand Down

0 comments on commit 011c47f

Please sign in to comment.