diff --git a/plugin-server/src/utils/db/db.ts b/plugin-server/src/utils/db/db.ts index a7cd6d0b23dd9..de57b602725cc 100644 --- a/plugin-server/src/utils/db/db.ts +++ b/plugin-server/src/utils/db/db.ts @@ -717,6 +717,12 @@ export class DB { update: Partial, tx?: TransactionClient ): Promise<[InternalPerson, ProducerRecord[]]> { + let versionString = 'COALESCE(version, 0)::numeric + 1' + if (update.version) { + versionString = update.version.toString() + delete update['version'] + } + const updateValues = Object.values(unparsePersonPartial(update)) // short circuit if there are no updates to be made @@ -727,11 +733,9 @@ export class DB { const values = [...updateValues, person.id].map(sanitizeJsonbValue) // Potentially overriding values badly if there was an update to the person after computing updateValues above - const queryString = `UPDATE posthog_person SET version = COALESCE(version, 0)::numeric + 1, ${Object.keys( - update - ).map((field, index) => `"${sanitizeSqlIdentifier(field)}" = $${index + 1}`)} WHERE id = $${ - Object.values(update).length + 1 - } + const queryString = `UPDATE posthog_person SET version = ${versionString}, ${Object.keys(update).map( + (field, index) => `"${sanitizeSqlIdentifier(field)}" = $${index + 1}` + )} WHERE id = $${Object.values(update).length + 1} RETURNING *` const { rows } = await this.postgres.query( diff --git a/plugin-server/src/worker/ingestion/person-state.ts b/plugin-server/src/worker/ingestion/person-state.ts index d3bf32e21310b..24e279fd981da 100644 --- a/plugin-server/src/worker/ingestion/person-state.ts +++ b/plugin-server/src/worker/ingestion/person-state.ts @@ -545,6 +545,23 @@ export class PersonState { created_at: createdAt, properties: properties, is_identified: true, + + // By using the max version between the two Persons, we ensure that if + // this Person is later split, we can use `this_person.version + 1` for + // any split-off Persons and know that *that* version will be higher than + // any previously deleted Person, and so the new Person row will "win" and + // "undelete" the Person. + // + // For example: + // - Merge Person_1(version:7) into Person_2(version:2) + // - Person_1 is deleted + // - Person_2 attains version 8 via this code below + // - Person_2 is later split, which attempts to re-create Person_1 by using + // its `distinct_id` to generate the deterministic Person UUID. + // That new Person_1 will have a version _at least_ as high as 8, and + // so any previously existing rows in CH or otherwise from + // Person_1(version:7) will "lose" to this new Person_1. + version: Math.max(mergeInto.version, otherPerson.version) + 1, }, tx ) diff --git a/plugin-server/tests/worker/ingestion/person-state.test.ts b/plugin-server/tests/worker/ingestion/person-state.test.ts index ab921d71902cc..88b1f1dfabfc8 100644 --- a/plugin-server/tests/worker/ingestion/person-state.test.ts +++ b/plugin-server/tests/worker/ingestion/person-state.test.ts @@ -2207,7 +2207,10 @@ describe('PersonState.update()', () => { // then pros can be dropped, see https://docs.google.com/presentation/d/1Osz7r8bKkDD5yFzw0cCtsGVf1LTEifXS-dzuwaS8JGY // properties: { first: true, second: true, third: true }, created_at: timestamp, - version: 1, // the test intends for it to be a chain, so must get v1, we get v2 if second->first and third->first, but we want it to be third->second->first + // This is 2 because they all start with version 0, and then: x + // third -> second = max(third(0), second(0)) + 1 == version 1 + // second -> first = max(second(1), first(0)) + 1 == version 2 + version: 2, is_identified: true, }) ) @@ -2296,7 +2299,10 @@ describe('PersonState.update()', () => { uuid: firstUserUuid, // guaranteed to be merged into this based on timestamps properties: { first: true, second: true, third: true }, created_at: timestamp, - version: 1, // the test intends for it to be a chain, so must get v1, we get v2 if second->first and third->first, but we want it to be third->second->first + // This is 2 because they all start with version 0, and then: + // third -> second = max(third(0), second(0)) + 1 == version 1 + // second -> first = max(second(1), first(0)) + 1 == version 2 + version: 2, is_identified: true, }) ) diff --git a/posthog/models/person/person.py b/posthog/models/person/person.py index 20f9dd7675487..72a5bd7c79948 100644 --- a/posthog/models/person/person.py +++ b/posthog/models/person/person.py @@ -6,6 +6,7 @@ from posthog.models.utils import UUIDT from ..team import Team +from .missing_person import uuidFromDistinctId MAX_LIMIT_DISTINCT_IDS = 2500 @@ -51,7 +52,9 @@ def _add_distinct_ids(self, distinct_ids: list[str]) -> None: self.add_distinct_id(distinct_id) def split_person(self, main_distinct_id: Optional[str], max_splits: Optional[int] = None): - distinct_ids = Person.objects.get(pk=self.pk).distinct_ids + original_person = Person.objects.get(pk=self.pk) + distinct_ids = original_person.distinct_ids + original_person_version = original_person.version or 0 if not main_distinct_id: self.properties = {} self.save() @@ -65,7 +68,13 @@ def split_person(self, main_distinct_id: Optional[str], max_splits: Optional[int if not distinct_id == main_distinct_id: with transaction.atomic(): pdi = PersonDistinctId.objects.select_for_update().get(person=self, distinct_id=distinct_id) - person = Person.objects.create(team_id=self.team_id) + person, _ = Person.objects.get_or_create( + uuid=uuidFromDistinctId(self.team_id, distinct_id), + team_id=self.team_id, + defaults={ + "version": original_person_version + 1, + }, + ) pdi.person_id = str(person.id) pdi.version = (pdi.version or 0) + 1 pdi.save(update_fields=["version", "person_id"]) @@ -83,9 +92,7 @@ def split_person(self, main_distinct_id: Optional[str], max_splits: Optional[int version=pdi.version, ) create_person( - team_id=self.team_id, - uuid=str(person.uuid), - version=person.version or 0, + team_id=self.team_id, uuid=str(person.uuid), version=person.version, created_at=person.created_at ) objects = PersonManager()