Skip to content

Commit

Permalink
chore: split Persons using deterministic uuid (#22114)
Browse files Browse the repository at this point in the history
  • Loading branch information
bretthoerner authored Jun 11, 2024
1 parent 81b10aa commit 17bcd94
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 12 deletions.
14 changes: 9 additions & 5 deletions plugin-server/src/utils/db/db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -717,6 +717,12 @@ export class DB {
update: Partial<InternalPerson>,
tx?: TransactionClient
): Promise<[InternalPerson, ProducerRecord[]]> {
let versionString = 'COALESCE(version, 0)::numeric + 1'
if (update.version) {
versionString = update.version.toString()
delete update['version']
}

const updateValues = Object.values(unparsePersonPartial(update))

// short circuit if there are no updates to be made
Expand All @@ -727,11 +733,9 @@ export class DB {
const values = [...updateValues, person.id].map(sanitizeJsonbValue)

// Potentially overriding values badly if there was an update to the person after computing updateValues above
const queryString = `UPDATE posthog_person SET version = COALESCE(version, 0)::numeric + 1, ${Object.keys(
update
).map((field, index) => `"${sanitizeSqlIdentifier(field)}" = $${index + 1}`)} WHERE id = $${
Object.values(update).length + 1
}
const queryString = `UPDATE posthog_person SET version = ${versionString}, ${Object.keys(update).map(
(field, index) => `"${sanitizeSqlIdentifier(field)}" = $${index + 1}`
)} WHERE id = $${Object.values(update).length + 1}
RETURNING *`

const { rows } = await this.postgres.query<RawPerson>(
Expand Down
17 changes: 17 additions & 0 deletions plugin-server/src/worker/ingestion/person-state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,23 @@ export class PersonState {
created_at: createdAt,
properties: properties,
is_identified: true,

// By using the max version between the two Persons, we ensure that if
// this Person is later split, we can use `this_person.version + 1` for
// any split-off Persons and know that *that* version will be higher than
// any previously deleted Person, and so the new Person row will "win" and
// "undelete" the Person.
//
// For example:
// - Merge Person_1(version:7) into Person_2(version:2)
// - Person_1 is deleted
// - Person_2 attains version 8 via this code below
// - Person_2 is later split, which attempts to re-create Person_1 by using
// its `distinct_id` to generate the deterministic Person UUID.
// That new Person_1 will have a version _at least_ as high as 8, and
// so any previously existing rows in CH or otherwise from
// Person_1(version:7) will "lose" to this new Person_1.
version: Math.max(mergeInto.version, otherPerson.version) + 1,
},
tx
)
Expand Down
10 changes: 8 additions & 2 deletions plugin-server/tests/worker/ingestion/person-state.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2207,7 +2207,10 @@ describe('PersonState.update()', () => {
// then pros can be dropped, see https://docs.google.com/presentation/d/1Osz7r8bKkDD5yFzw0cCtsGVf1LTEifXS-dzuwaS8JGY
// properties: { first: true, second: true, third: true },
created_at: timestamp,
version: 1, // the test intends for it to be a chain, so must get v1, we get v2 if second->first and third->first, but we want it to be third->second->first
// This is 2 because they all start with version 0, and then: x
// third -> second = max(third(0), second(0)) + 1 == version 1
// second -> first = max(second(1), first(0)) + 1 == version 2
version: 2,
is_identified: true,
})
)
Expand Down Expand Up @@ -2296,7 +2299,10 @@ describe('PersonState.update()', () => {
uuid: firstUserUuid, // guaranteed to be merged into this based on timestamps
properties: { first: true, second: true, third: true },
created_at: timestamp,
version: 1, // the test intends for it to be a chain, so must get v1, we get v2 if second->first and third->first, but we want it to be third->second->first
// This is 2 because they all start with version 0, and then:
// third -> second = max(third(0), second(0)) + 1 == version 1
// second -> first = max(second(1), first(0)) + 1 == version 2
version: 2,
is_identified: true,
})
)
Expand Down
17 changes: 12 additions & 5 deletions posthog/models/person/person.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from posthog.models.utils import UUIDT

from ..team import Team
from .missing_person import uuidFromDistinctId

MAX_LIMIT_DISTINCT_IDS = 2500

Expand Down Expand Up @@ -51,7 +52,9 @@ def _add_distinct_ids(self, distinct_ids: list[str]) -> None:
self.add_distinct_id(distinct_id)

def split_person(self, main_distinct_id: Optional[str], max_splits: Optional[int] = None):
distinct_ids = Person.objects.get(pk=self.pk).distinct_ids
original_person = Person.objects.get(pk=self.pk)
distinct_ids = original_person.distinct_ids
original_person_version = original_person.version or 0
if not main_distinct_id:
self.properties = {}
self.save()
Expand All @@ -65,7 +68,13 @@ def split_person(self, main_distinct_id: Optional[str], max_splits: Optional[int
if not distinct_id == main_distinct_id:
with transaction.atomic():
pdi = PersonDistinctId.objects.select_for_update().get(person=self, distinct_id=distinct_id)
person = Person.objects.create(team_id=self.team_id)
person, _ = Person.objects.get_or_create(
uuid=uuidFromDistinctId(self.team_id, distinct_id),
team_id=self.team_id,
defaults={
"version": original_person_version + 1,
},
)
pdi.person_id = str(person.id)
pdi.version = (pdi.version or 0) + 1
pdi.save(update_fields=["version", "person_id"])
Expand All @@ -83,9 +92,7 @@ def split_person(self, main_distinct_id: Optional[str], max_splits: Optional[int
version=pdi.version,
)
create_person(
team_id=self.team_id,
uuid=str(person.uuid),
version=person.version or 0,
team_id=self.team_id, uuid=str(person.uuid), version=person.version, created_at=person.created_at
)

objects = PersonManager()
Expand Down

0 comments on commit 17bcd94

Please sign in to comment.