Skip to content

Commit

Permalink
refactor(plugin-server): reduce createPerson to a single query
Browse files Browse the repository at this point in the history
  • Loading branch information
bretthoerner committed Oct 21, 2023
1 parent 1900db9 commit d6ae520
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 36 deletions.
93 changes: 59 additions & 34 deletions plugin-server/src/utils/db/db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -652,43 +652,68 @@ export class DB {
uuid: string,
distinctIds?: string[]
): Promise<Person> {
const kafkaMessages: ProducerRecord[] = []
distinctIds ||= []
const version = 0 // We're creating the person now!

const person = await this.postgres.transaction(PostgresUse.COMMON_WRITE, 'createPerson', async (tx) => {
const insertResult = await this.postgres.query(
tx,
'INSERT INTO posthog_person (created_at, properties, properties_last_updated_at, properties_last_operation, team_id, is_user_id, is_identified, uuid, version) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) RETURNING *',
[
createdAt.toISO(),
JSON.stringify(properties),
JSON.stringify(propertiesLastUpdatedAt),
JSON.stringify(propertiesLastOperation),
teamId,
isUserId,
isIdentified,
uuid,
0,
],
'insertPerson'
)
const personCreated = insertResult.rows[0] as RawPerson
const person = {
...personCreated,
created_at: DateTime.fromISO(personCreated.created_at).toUTC(),
version: Number(personCreated.version || 0),
} as Person

kafkaMessages.push(
generateKafkaPersonUpdateMessage(createdAt, properties, teamId, isIdentified, uuid, person.version)
)
const insertResult = await this.postgres.query(
PostgresUse.COMMON_WRITE,
`WITH inserted_person AS (
INSERT INTO posthog_person (
created_at, properties, properties_last_updated_at,
properties_last_operation, team_id, is_user_id, is_identified, uuid, version
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
RETURNING *
)` +
distinctIds
.map(
(_, index) => `, distinct_id_${index} AS (
INSERT INTO posthog_persondistinctid (distinct_id, person_id, team_id, version)
VALUES ($${10 + index}, (SELECT id FROM inserted_person), $5, $9)
)`
)
.join('') +
`SELECT * FROM inserted_person;`,
[
createdAt.toISO(),
JSON.stringify(properties),
JSON.stringify(propertiesLastUpdatedAt),
JSON.stringify(propertiesLastOperation),
teamId,
isUserId,
isIdentified,
uuid,
version,
...distinctIds.reverse(),
],
'insertPerson'
)
const personCreated = insertResult.rows[0] as RawPerson
const person = {
...personCreated,
created_at: DateTime.fromISO(personCreated.created_at).toUTC(),
version,
} as Person

for (const distinctId of distinctIds || []) {
const messages = await this.addDistinctIdPooled(person, distinctId, tx)
kafkaMessages.push(...messages)
}
const kafkaMessages: ProducerRecord[] = []
kafkaMessages.push(generateKafkaPersonUpdateMessage(createdAt, properties, teamId, isIdentified, uuid, version))

return person
})
for (const distinctId of distinctIds || []) {
kafkaMessages.push({
topic: KAFKA_PERSON_DISTINCT_ID,
messages: [
{
value: JSON.stringify({
person_id: person.uuid,
team_id: teamId,
distinct_id: distinctId,
version,
is_deleted: 0,
}),
},
],
})
}

await this.kafkaProducer.queueMessages(kafkaMessages)
return person
Expand Down
2 changes: 1 addition & 1 deletion plugin-server/tests/main/process-event.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1142,7 +1142,7 @@ test('alias merge properties', async () => {
expect((await hub.db.fetchEvents()).length).toBe(1)
expect((await hub.db.fetchPersons()).length).toBe(1)
const [person] = await hub.db.fetchPersons()
expect((await hub.db.fetchDistinctIdValues(person)).sort()).toEqual(['new_distinct_id', 'old_distinct_id'])
expect(await hub.db.fetchDistinctIdValues(person)).toEqual(['new_distinct_id', 'old_distinct_id'])
expect(person.properties).toEqual({
key_on_both: 'new value both',
key_on_new: 'new value',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ describe('postgres parity', () => {
},
])
const postgresDistinctIds = await hub.db.fetchDistinctIdValues(person, Database.Postgres)
expect(postgresDistinctIds).toEqual(['distinct1', 'distinct2'])
expect(postgresDistinctIds.sort()).toEqual(['distinct1', 'distinct2'])

const newClickHouseDistinctIdValues = await hub.db.fetchDistinctIds(person, Database.ClickHouse)
expect(newClickHouseDistinctIdValues).toEqual(
Expand Down

0 comments on commit d6ae520

Please sign in to comment.