Skip to content

Commit

Permalink
chore(plugin-server): use uuidv5 for person uuids, based on distinct_…
Browse files Browse the repository at this point in the history
…id (#21547)

* chore(plugin-server): bump uuid package

* chore(plugin-server): use uuidv5 for person uuids, based on distinct_id
  • Loading branch information
bretthoerner authored Apr 17, 2024
1 parent 703e0b8 commit fff6720
Show file tree
Hide file tree
Showing 4 changed files with 322 additions and 247 deletions.
4 changes: 2 additions & 2 deletions plugin-server/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@
"re2": "^1.20.3",
"safe-stable-stringify": "^2.4.0",
"tail": "^2.2.6",
"uuid": "^8.3.2",
"uuid": "^9.0.1",
"v8-profiler-next": "^1.9.0",
"vm2": "3.9.18"
},
Expand Down Expand Up @@ -111,7 +111,7 @@
"@types/redlock": "^4.0.1",
"@types/snowflake-sdk": "^1.5.1",
"@types/tar-stream": "^2.2.0",
"@types/uuid": "^8.3.0",
"@types/uuid": "^9.0.1",
"@typescript-eslint/eslint-plugin": "^7.1.1",
"@typescript-eslint/parser": "^7.1.1",
"babel-eslint": "^10.1.0",
Expand Down
19 changes: 12 additions & 7 deletions plugin-server/pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 19 additions & 9 deletions plugin-server/src/worker/ingestion/person-state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { ProducerRecord } from 'kafkajs'
import { DateTime } from 'luxon'
import { Counter } from 'prom-client'
import { KafkaProducerWrapper } from 'utils/db/kafka-producer-wrapper'
import { parse as parseUuid, v5 as uuidv5 } from 'uuid'

import { KAFKA_PERSON_OVERRIDE } from '../../config/kafka-topics'
import { Person, PropertyUpdateOperation, TimestampFormat } from '../../types'
Expand All @@ -13,7 +14,7 @@ import { timeoutGuard } from '../../utils/db/utils'
import { PeriodicTask } from '../../utils/periodic-task'
import { promiseRetry } from '../../utils/retries'
import { status } from '../../utils/status'
import { castTimestampOrNow, UUIDT } from '../../utils/utils'
import { castTimestampOrNow } from '../../utils/utils'
import { captureIngestionWarning } from './utils'

export const mergeFinalFailuresCounter = new Counter({
Expand All @@ -33,6 +34,15 @@ export const mergeTxnSuccessCounter = new Counter({
labelNames: ['call', 'oldPersonIdentified', 'newPersonIdentified', 'poEEmbraceJoin'],
})

// UUIDv5 requires a namespace, which is itself a UUID. This was a randomly generated UUIDv4
// that must be used to deterministrically generate UUIDv5s for Person rows.
const PERSON_UUIDV5_NAMESPACE = parseUuid('932979b4-65c3-4424-8467-0b66ec27bc22')

function uuidFromDistinctId(teamId: number, distinctId: string): string {
// Deterministcally create a UUIDv5 based on the (team_id, distinct_id) pair.
return uuidv5(`${teamId}:${distinctId}`, PERSON_UUIDV5_NAMESPACE)
}

// used to prevent identify from being used with generic IDs
// that we can safely assume stem from a bug or mistake
// used to prevent identify from being used with generic IDs
Expand Down Expand Up @@ -81,7 +91,6 @@ const isDistinctIdIllegal = (id: string): boolean => {
// This class is responsible for creating/updating a single person through the process-event pipeline
export class PersonState {
private eventProperties: Properties
private newUuid: string

public updateIsIdentified: boolean // TODO: remove this from the class and being hidden

Expand All @@ -92,11 +101,9 @@ export class PersonState {
private timestamp: DateTime,
private processPerson: boolean, // $process_person_profile flag from the event
private db: DB,
private personOverrideWriter?: DeferredPersonOverrideWriter,
uuid: UUIDT | undefined = undefined
private personOverrideWriter?: DeferredPersonOverrideWriter
) {
this.eventProperties = event.properties!
this.newUuid = (uuid || new UUIDT()).toString()

// If set to true, we'll update `is_identified` at the end of `updateProperties`
// :KLUDGE: This is an indirect communication channel between `handleIdentifyOrAlias` and `updateProperties`
Expand Down Expand Up @@ -172,7 +179,6 @@ export class PersonState {
null,
// :NOTE: This should never be set in this branch, but adding this for logical consistency
this.updateIsIdentified,
this.newUuid,
this.event.uuid,
[this.distinctId]
)
Expand All @@ -186,10 +192,14 @@ export class PersonState {
teamId: number,
isUserId: number | null,
isIdentified: boolean,
uuid: string,
creatorEventUuid: string,
distinctIds?: string[]
distinctIds: string[]
): Promise<Person> {
if (distinctIds.length < 1) {
throw new Error('at least 1 distinctId is required in `createPerson`')
}
const uuid = uuidFromDistinctId(teamId, distinctIds[0])

const props = { ...propertiesOnce, ...properties, ...{ $creator_event_uuid: creatorEventUuid } }
const propertiesLastOperation: Record<string, any> = {}
const propertiesLastUpdatedAt: Record<string, any> = {}
Expand Down Expand Up @@ -389,6 +399,7 @@ export class PersonState {
otherPersonDistinctId: otherPersonDistinctId,
})
}

// The last case: (!oldPerson && !newPerson)
return await this.createPerson(
// TODO: in this case we could skip the properties updates later
Expand All @@ -398,7 +409,6 @@ export class PersonState {
teamId,
null,
true,
this.newUuid,
this.event.uuid,
[mergeIntoDistinctId, otherPersonDistinctId]
)
Expand Down
Loading

0 comments on commit fff6720

Please sign in to comment.