Skip to content

Commit

Permalink
revert: PR 23204, except for migration
Browse files Browse the repository at this point in the history
  • Loading branch information
bretthoerner committed Jul 1, 2024
1 parent c5f14f3 commit 8fc49fd
Show file tree
Hide file tree
Showing 10 changed files with 134 additions and 411 deletions.
81 changes: 10 additions & 71 deletions plugin-server/src/utils/db/db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -641,20 +641,13 @@ export class DB {
isUserId: number | null,
isIdentified: boolean,
uuid: string,
distinctIds?: { distinctId: string; version?: number }[],
tx?: TransactionClient
distinctIds?: string[],
version = 0
): Promise<InternalPerson> {
distinctIds ||= []

for (const distinctId of distinctIds) {
distinctId.version ||= 0
}

// The Person is being created, and so we can hardcode version 0!
const personVersion = 0

const { rows } = await this.postgres.query<RawPerson>(
tx ?? PostgresUse.COMMON_WRITE,
PostgresUse.COMMON_WRITE,
`WITH inserted_person AS (
INSERT INTO posthog_person (
created_at, properties, properties_last_updated_at,
Expand All @@ -669,12 +662,7 @@ export class DB {
// `addDistinctIdPooled`
(_, index) => `, distinct_id_${index} AS (
INSERT INTO posthog_persondistinctid (distinct_id, person_id, team_id, version)
VALUES (
$${11 + index + distinctIds!.length - 1},
(SELECT id FROM inserted_person),
$5,
$${10 + index})
)`
VALUES ($${10 + index}, (SELECT id FROM inserted_person), $5, $9))`
)
.join('') +
`SELECT * FROM inserted_person;`,
Expand All @@ -687,21 +675,14 @@ export class DB {
isUserId,
isIdentified,
uuid,
personVersion,
version,
// The copy and reverse here is to maintain compatability with pre-existing code
// and tests. Postgres appears to assign IDs in reverse order of the INSERTs in the
// CTEs above, so we need to reverse the distinctIds to match the old behavior where
// we would do a round trip for each INSERT. We shouldn't actually depend on the
// `id` column of distinct_ids, so this is just a simple way to keeps tests exactly
// the same and prove behavior is the same as before.
...distinctIds
.slice()
.reverse()
.map(({ version }) => version),
...distinctIds
.slice()
.reverse()
.map(({ distinctId }) => distinctId),
...distinctIds.slice().reverse(),
],
'insertPerson'
)
Expand All @@ -717,8 +698,8 @@ export class DB {
value: JSON.stringify({
person_id: person.uuid,
team_id: teamId,
distinct_id: distinctId.distinctId,
version: distinctId.version,
distinct_id: distinctId,
version,
is_deleted: 0,
}),
},
Expand Down Expand Up @@ -849,50 +830,8 @@ export class DB {
return personDistinctIds.map((pdi) => pdi.distinct_id)
}

public async addPersonlessDistinctId(teamId: number, distinctId: string): Promise<boolean> {
const result = await this.postgres.query(
PostgresUse.COMMON_WRITE,
`
INSERT INTO posthog_personlessdistinctid (team_id, distinct_id, is_merged, created_at)
VALUES ($1, $2, false, now())
ON CONFLICT (team_id, distinct_id) DO NOTHING
RETURNING is_merged
`,
[teamId, distinctId],
'addPersonlessDistinctId'
)

return result.rows[0]['is_merged']
}

public async addPersonlessDistinctIdForMerge(
teamId: number,
distinctId: string,
tx?: TransactionClient
): Promise<boolean> {
const result = await this.postgres.query(
tx ?? PostgresUse.COMMON_WRITE,
`
INSERT INTO posthog_personlessdistinctid (team_id, distinct_id, is_merged, created_at)
VALUES ($1, $2, true, now())
ON CONFLICT (team_id, distinct_id) DO UPDATE
SET is_merged = true
RETURNING (xmax = 0) AS inserted
`,
[teamId, distinctId],
'addPersonlessDistinctIdForMerge'
)

return result.rows[0].inserted
}

public async addDistinctId(
person: InternalPerson,
distinctId: string,
version: number,
tx?: TransactionClient
): Promise<void> {
const kafkaMessages = await this.addDistinctIdPooled(person, distinctId, version, tx)
public async addDistinctId(person: InternalPerson, distinctId: string, version: number): Promise<void> {
const kafkaMessages = await this.addDistinctIdPooled(person, distinctId, version)
if (kafkaMessages.length) {
await this.kafkaProducer.queueMessages({ kafkaMessages, waitForAck: true })
}
Expand Down
Loading

0 comments on commit 8fc49fd

Please sign in to comment.