Skip to content

Commit

Permalink
fix(plugin-server): write out less overrides on behalf of personless … (
Browse files Browse the repository at this point in the history
#23204)

* fix(plugin-server): write out less overrides on behalf of personless mode

* don't skip writing overrides (so we can backfill posthog_personlessdistinctid)

* swap ugly double-array params for array of objects

* bigint id for PersonlessDistinctId

* add LRU cache for posthog_personlessdistinctid inserts

* fix tests

* overzealous search and replace
  • Loading branch information
bretthoerner authored Jul 1, 2024
1 parent 92e58fe commit 70549f8
Show file tree
Hide file tree
Showing 13 changed files with 453 additions and 135 deletions.
2 changes: 1 addition & 1 deletion latest_migrations.manifest
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ contenttypes: 0002_remove_content_type_name
ee: 0016_rolemembership_organization_member
otp_static: 0002_throttling
otp_totp: 0002_auto_20190420_0723
posthog: 0431_externaldataschema_sync_type_payload
posthog: 0432_personlessdistinctid
sessions: 0001_initial
social_django: 0010_uid_db_index
two_factor: 0007_auto_20201201_1019
78 changes: 69 additions & 9 deletions plugin-server/src/utils/db/db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -641,11 +641,17 @@ export class DB {
isUserId: number | null,
isIdentified: boolean,
uuid: string,
distinctIds?: string[],
version = 0
distinctIds?: { distinctId: string; version?: number }[]
): Promise<InternalPerson> {
distinctIds ||= []

for (const distinctId of distinctIds) {
distinctId.version ||= 0
}

// The Person is being created, and so we can hardcode version 0!
const personVersion = 0

const { rows } = await this.postgres.query<RawPerson>(
PostgresUse.COMMON_WRITE,
`WITH inserted_person AS (
Expand All @@ -662,7 +668,12 @@ export class DB {
// `addDistinctIdPooled`
(_, index) => `, distinct_id_${index} AS (
INSERT INTO posthog_persondistinctid (distinct_id, person_id, team_id, version)
VALUES ($${10 + index}, (SELECT id FROM inserted_person), $5, $9))`
VALUES (
$${11 + index + distinctIds!.length - 1},
(SELECT id FROM inserted_person),
$5,
$${10 + index})
)`
)
.join('') +
`SELECT * FROM inserted_person;`,
Expand All @@ -675,14 +686,21 @@ export class DB {
isUserId,
isIdentified,
uuid,
version,
personVersion,
// The copy and reverse here is to maintain compatability with pre-existing code
// and tests. Postgres appears to assign IDs in reverse order of the INSERTs in the
// CTEs above, so we need to reverse the distinctIds to match the old behavior where
// we would do a round trip for each INSERT. We shouldn't actually depend on the
// `id` column of distinct_ids, so this is just a simple way to keeps tests exactly
// the same and prove behavior is the same as before.
...distinctIds.slice().reverse(),
...distinctIds
.slice()
.reverse()
.map(({ version }) => version),
...distinctIds
.slice()
.reverse()
.map(({ distinctId }) => distinctId),
],
'insertPerson'
)
Expand All @@ -698,8 +716,8 @@ export class DB {
value: JSON.stringify({
person_id: person.uuid,
team_id: teamId,
distinct_id: distinctId,
version,
distinct_id: distinctId.distinctId,
version: distinctId.version,
is_deleted: 0,
}),
},
Expand Down Expand Up @@ -830,8 +848,50 @@ export class DB {
return personDistinctIds.map((pdi) => pdi.distinct_id)
}

public async addDistinctId(person: InternalPerson, distinctId: string, version: number): Promise<void> {
const kafkaMessages = await this.addDistinctIdPooled(person, distinctId, version)
public async addPersonlessDistinctId(teamId: number, distinctId: string): Promise<boolean> {
const result = await this.postgres.query(
PostgresUse.COMMON_WRITE,
`
INSERT INTO posthog_personlessdistinctid (team_id, distinct_id, is_merged, created_at)
VALUES ($1, $2, false, now())
ON CONFLICT (team_id, distinct_id) DO NOTHING
RETURNING is_merged
`,
[teamId, distinctId],
'addPersonlessDistinctId'
)

return result.rows[0]['is_merged']
}

public async addPersonlessDistinctIdForMerge(
teamId: number,
distinctId: string,
tx?: TransactionClient
): Promise<boolean> {
const result = await this.postgres.query(
tx ?? PostgresUse.COMMON_WRITE,
`
INSERT INTO posthog_personlessdistinctid (team_id, distinct_id, is_merged, created_at)
VALUES ($1, $2, true, now())
ON CONFLICT (team_id, distinct_id) DO UPDATE
SET is_merged = true
RETURNING (xmax = 0) AS inserted
`,
[teamId, distinctId],
'addPersonlessDistinctIdForMerge'
)

return result.rows[0].inserted
}

public async addDistinctId(
person: InternalPerson,
distinctId: string,
version: number,
tx?: TransactionClient
): Promise<void> {
const kafkaMessages = await this.addDistinctIdPooled(person, distinctId, version, tx)
if (kafkaMessages.length) {
await this.kafkaProducer.queueMessages({ kafkaMessages, waitForAck: true })
}
Expand Down
Loading

0 comments on commit 70549f8

Please sign in to comment.