diff --git a/plugin-server/src/worker/ingestion/person-state.ts b/plugin-server/src/worker/ingestion/person-state.ts index 16cae0d5e1a1e..2bbd3bb4dd2dd 100644 --- a/plugin-server/src/worker/ingestion/person-state.ts +++ b/plugin-server/src/worker/ingestion/person-state.ts @@ -23,6 +23,16 @@ export const mergeFinalFailuresCounter = new Counter({ help: 'Number of person merge final failures.', }) +export const mergeTxnAttemptCounter = new Counter({ + name: 'person_merge_txn_attempt_total', + help: 'Number of person merge attempts.', +}) + +export const mergeTxnSuccessCounter = new Counter({ + name: 'person_merge_txn_success_total', + help: 'Number of person merges that succeeded.', +}) + // used to prevent identify from being used with generic IDs // that we can safely assume stem from a bug or mistake // used to prevent identify from being used with generic IDs @@ -462,35 +472,49 @@ export class PersonState { createdAt: DateTime, properties: Properties ): Promise<[ProducerRecord[], Person]> { - return await this.db.postgres.transaction(PostgresUse.COMMON_WRITE, 'mergePeople', async (tx) => { - const [person, updatePersonMessages] = await this.db.updatePersonDeprecated( - mergeInto, - { - created_at: createdAt, - properties: properties, - is_identified: true, - }, - tx - ) + mergeTxnAttemptCounter.inc() + + const result: [ProducerRecord[], Person] = await this.db.postgres.transaction( + PostgresUse.COMMON_WRITE, + 'mergePeople', + async (tx) => { + const [person, updatePersonMessages] = await this.db.updatePersonDeprecated( + mergeInto, + { + created_at: createdAt, + properties: properties, + is_identified: true, + }, + tx + ) + + // Merge the distinct IDs + // TODO: Doesn't this table need to add updates to CH too? + await this.handleTablesDependingOnPersonID(otherPerson, mergeInto, tx) - // Merge the distinct IDs - // TODO: Doesn't this table need to add updates to CH too? - await this.handleTablesDependingOnPersonID(otherPerson, mergeInto, tx) + const distinctIdMessages = await this.db.moveDistinctIds(otherPerson, mergeInto, tx) - const distinctIdMessages = await this.db.moveDistinctIds(otherPerson, mergeInto, tx) + const deletePersonMessages = await this.db.deletePerson(otherPerson, tx) - const deletePersonMessages = await this.db.deletePerson(otherPerson, tx) + let personOverrideMessages: ProducerRecord[] = [] + if (this.poEEmbraceJoin) { + personOverrideMessages = [await this.addPersonOverride(otherPerson, mergeInto, tx)] + } - let personOverrideMessages: ProducerRecord[] = [] - if (this.poEEmbraceJoin) { - personOverrideMessages = [await this.addPersonOverride(otherPerson, mergeInto, tx)] + return [ + [ + ...personOverrideMessages, + ...updatePersonMessages, + ...distinctIdMessages, + ...deletePersonMessages, + ], + person, + ] } + ) - return [ - [...personOverrideMessages, ...updatePersonMessages, ...distinctIdMessages, ...deletePersonMessages], - person, - ] - }) + mergeTxnSuccessCounter.inc() + return result } private async addPersonOverride(