-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor(plugin-server): reduce createPerson to a single query #18126
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -652,43 +652,75 @@ export class DB { | |
uuid: string, | ||
distinctIds?: string[] | ||
): Promise<Person> { | ||
const kafkaMessages: ProducerRecord[] = [] | ||
distinctIds ||= [] | ||
const version = 0 // We're creating the person now! | ||
|
||
const person = await this.postgres.transaction(PostgresUse.COMMON_WRITE, 'createPerson', async (tx) => { | ||
const insertResult = await this.postgres.query( | ||
tx, | ||
'INSERT INTO posthog_person (created_at, properties, properties_last_updated_at, properties_last_operation, team_id, is_user_id, is_identified, uuid, version) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) RETURNING *', | ||
[ | ||
createdAt.toISO(), | ||
JSON.stringify(properties), | ||
JSON.stringify(propertiesLastUpdatedAt), | ||
JSON.stringify(propertiesLastOperation), | ||
teamId, | ||
isUserId, | ||
isIdentified, | ||
uuid, | ||
0, | ||
], | ||
'insertPerson' | ||
) | ||
const personCreated = insertResult.rows[0] as RawPerson | ||
const person = { | ||
...personCreated, | ||
created_at: DateTime.fromISO(personCreated.created_at).toUTC(), | ||
version: Number(personCreated.version || 0), | ||
} as Person | ||
|
||
kafkaMessages.push( | ||
generateKafkaPersonUpdateMessage(createdAt, properties, teamId, isIdentified, uuid, person.version) | ||
) | ||
const insertResult = await this.postgres.query( | ||
PostgresUse.COMMON_WRITE, | ||
`WITH inserted_person AS ( | ||
INSERT INTO posthog_person ( | ||
created_at, properties, properties_last_updated_at, | ||
properties_last_operation, team_id, is_user_id, is_identified, uuid, version | ||
) | ||
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) | ||
RETURNING * | ||
)` + | ||
distinctIds | ||
.map( | ||
// NOTE: Keep this in sync with the posthog_persondistinctid INSERT in | ||
// `addDistinctIdPooled` | ||
(_, index) => `, distinct_id_${index} AS ( | ||
INSERT INTO posthog_persondistinctid (distinct_id, person_id, team_id, version) | ||
VALUES ($${10 + index}, (SELECT id FROM inserted_person), $5, $9))` | ||
) | ||
.join('') + | ||
`SELECT * FROM inserted_person;`, | ||
[ | ||
createdAt.toISO(), | ||
JSON.stringify(properties), | ||
JSON.stringify(propertiesLastUpdatedAt), | ||
JSON.stringify(propertiesLastOperation), | ||
teamId, | ||
isUserId, | ||
isIdentified, | ||
uuid, | ||
version, | ||
// The copy and reverse here is to maintain compatability with pre-existing code | ||
// and tests. Postgres appears to assign IDs in reverse order of the INSERTs in the | ||
// CTEs above, so we need to reverse the distinctIds to match the old behavior where | ||
// we would do a round trip for each INSERT. We shouldn't actually depend on the | ||
// `id` column of distinct_ids, so this is just a simple way to keeps tests exactly | ||
// the same and prove behavior is the same as before. | ||
...distinctIds.slice().reverse(), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So it looks like PG does inserts and/or assigns row IDs in reverse order of what you provide in the CTEs (meaning the last INSERT gets the earlier ID). This doesn't matter at all for actual code correctness, but we have a lot of tests that depend on There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. please add a comment about that |
||
], | ||
'insertPerson' | ||
) | ||
const personCreated = insertResult.rows[0] as RawPerson | ||
const person = { | ||
...personCreated, | ||
created_at: DateTime.fromISO(personCreated.created_at).toUTC(), | ||
version, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit (for follow-up PR): maybe we should change |
||
} as Person | ||
|
||
for (const distinctId of distinctIds || []) { | ||
const messages = await this.addDistinctIdPooled(person, distinctId, tx) | ||
kafkaMessages.push(...messages) | ||
} | ||
const kafkaMessages: ProducerRecord[] = [] | ||
kafkaMessages.push(generateKafkaPersonUpdateMessage(createdAt, properties, teamId, isIdentified, uuid, version)) | ||
|
||
return person | ||
}) | ||
for (const distinctId of distinctIds) { | ||
kafkaMessages.push({ | ||
topic: KAFKA_PERSON_DISTINCT_ID, | ||
messages: [ | ||
{ | ||
value: JSON.stringify({ | ||
person_id: person.uuid, | ||
team_id: teamId, | ||
distinct_id: distinctId, | ||
version, | ||
is_deleted: 0, | ||
}), | ||
}, | ||
], | ||
}) | ||
} | ||
|
||
await this.kafkaProducer.queueMessages(kafkaMessages) | ||
return person | ||
|
@@ -839,6 +871,7 @@ export class DB { | |
): Promise<ProducerRecord[]> { | ||
const insertResult = await this.postgres.query( | ||
tx ?? PostgresUse.COMMON_WRITE, | ||
// NOTE: Keep this in sync with the posthog_persondistinctid INSERT in `createPerson` | ||
'INSERT INTO posthog_persondistinctid (distinct_id, person_id, team_id, version) VALUES ($1, $2, $3, 0) RETURNING *', | ||
[distinctId, person.id, person.team_id], | ||
'addDistinctIdPooled' | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I worry about this getting out of sync with
addDistinctIdPooled
, for lack of a better solution maybe a comment to keep them in sync like we have done in other places 🤷♀️