Skip to content

Commit

Permalink
refactor(plugin-server): Clean up Person Postgres & Kafka code a litt…
Browse files Browse the repository at this point in the history
…le bit (#20479)
  • Loading branch information
tkaemming authored Feb 21, 2024
1 parent 72b7077 commit 05a06d2
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 92 deletions.
88 changes: 25 additions & 63 deletions plugin-server/src/utils/db/db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -550,6 +550,14 @@ export class DB {
}
}

private toPerson(row: RawPerson): Person {
return {
...row,
created_at: DateTime.fromISO(row.created_at).toUTC(),
version: Number(row.version || 0),
}
}

public async fetchPersons(database?: Database.Postgres): Promise<Person[]>
public async fetchPersons(database: Database.ClickHouse): Promise<ClickHousePerson[]>
public async fetchPersons(database: Database = Database.Postgres): Promise<Person[] | ClickHousePerson[]> {
Expand All @@ -576,23 +584,9 @@ export class DB {
return rest
}) as ClickHousePerson[]
} else if (database === Database.Postgres) {
return (
(
await this.postgres.query(
PostgresUse.COMMON_WRITE,
'SELECT * FROM posthog_person',
undefined,
'fetchPersons'
)
).rows as RawPerson[]
).map(
(rawPerson: RawPerson) =>
({
...rawPerson,
created_at: DateTime.fromISO(rawPerson.created_at).toUTC(),
version: Number(rawPerson.version || 0),
} as Person)
)
return await this.postgres
.query<RawPerson>(PostgresUse.COMMON_WRITE, 'SELECT * FROM posthog_person', undefined, 'fetchPersons')
.then(({ rows }) => rows.map(this.toPerson))
} else {
throw new Error(`Can't fetch persons for database: ${database}`)
}
Expand Down Expand Up @@ -626,20 +620,15 @@ export class DB {
}
const values = [teamId, distinctId]

const selectResult: QueryResult = await this.postgres.query<RawPerson>(
const { rows } = await this.postgres.query<RawPerson>(
PostgresUse.COMMON_WRITE,
queryString,
values,
'fetchPerson'
)

if (selectResult.rows.length > 0) {
const rawPerson = selectResult.rows[0]
return {
...rawPerson,
created_at: DateTime.fromISO(rawPerson.created_at).toUTC(),
version: Number(rawPerson.version || 0),
}
if (rows.length > 0) {
return this.toPerson(rows[0])
}
}

Expand All @@ -657,7 +646,7 @@ export class DB {
distinctIds ||= []
const version = 0 // We're creating the person now!

const insertResult = await this.postgres.query(
const { rows } = await this.postgres.query<RawPerson>(
PostgresUse.COMMON_WRITE,
`WITH inserted_person AS (
INSERT INTO posthog_person (
Expand Down Expand Up @@ -697,15 +686,9 @@ export class DB {
],
'insertPerson'
)
const personCreated = insertResult.rows[0] as RawPerson
const person = {
...personCreated,
created_at: DateTime.fromISO(personCreated.created_at).toUTC(),
version,
} as Person
const person = this.toPerson(rows[0])

const kafkaMessages: ProducerRecord[] = []
kafkaMessages.push(generateKafkaPersonUpdateMessage(createdAt, properties, teamId, isIdentified, uuid, version))
const kafkaMessages = [generateKafkaPersonUpdateMessage(person)]

for (const distinctId of distinctIds) {
kafkaMessages.push({
Expand Down Expand Up @@ -751,23 +734,18 @@ export class DB {
}
RETURNING *`

const updateResult: QueryResult = await this.postgres.query(
const { rows } = await this.postgres.query<RawPerson>(
tx ?? PostgresUse.COMMON_WRITE,
queryString,
values,
'updatePerson'
)
if (updateResult.rows.length == 0) {
if (rows.length == 0) {
throw new NoRowsUpdatedError(
`Person with team_id="${person.team_id}" and uuid="${person.uuid} couldn't be updated`
)
}
const updatedPersonRaw = updateResult.rows[0] as RawPerson
const updatedPerson = {
...updatedPersonRaw,
created_at: DateTime.fromISO(updatedPersonRaw.created_at).toUTC(),
version: Number(updatedPersonRaw.version || 0),
} as Person
const updatedPerson = this.toPerson(rows[0])

// Track the disparity between the version on the database and the version of the person we have in memory
// Without races, the returned person (updatedPerson) should have a version that's only +1 the person in memory
Expand All @@ -777,14 +755,7 @@ export class DB {
}

const kafkaMessages = []
const message = generateKafkaPersonUpdateMessage(
updatedPerson.created_at,
updatedPerson.properties,
updatedPerson.team_id,
updatedPerson.is_identified,
updatedPerson.uuid,
updatedPerson.version
)
const message = generateKafkaPersonUpdateMessage(updatedPerson)
if (tx) {
kafkaMessages.push(message)
} else {
Expand All @@ -800,7 +771,7 @@ export class DB {
}

public async deletePerson(person: Person, tx?: TransactionClient): Promise<ProducerRecord[]> {
const result = await this.postgres.query<{ version: string }>(
const { rows } = await this.postgres.query<{ version: string }>(
tx ?? PostgresUse.COMMON_WRITE,
'DELETE FROM posthog_person WHERE team_id = $1 AND id = $2 RETURNING version',
[person.team_id, person.id],
Expand All @@ -809,18 +780,9 @@ export class DB {

let kafkaMessages: ProducerRecord[] = []

if (result.rows.length > 0) {
kafkaMessages = [
generateKafkaPersonUpdateMessage(
person.created_at,
person.properties,
person.team_id,
person.is_identified,
person.uuid,
Number(result.rows[0].version || 0) + 100, // keep in sync with delete_person in posthog/models/person/util.py
1
),
]
if (rows.length > 0) {
const [row] = rows
kafkaMessages = [generateKafkaPersonUpdateMessage({ ...person, version: Number(row.version || 0) }, true)]
}
return kafkaMessages
}
Expand Down
37 changes: 18 additions & 19 deletions plugin-server/src/utils/db/utils.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
import { Properties } from '@posthog/plugin-scaffold'
import * as Sentry from '@sentry/node'
import { ProducerRecord } from 'kafkajs'
import { DateTime } from 'luxon'
import { Counter } from 'prom-client'

import { defaultConfig } from '../../config/config'
import { KAFKA_PERSON } from '../../config/kafka-topics'
import { BasePerson, Person, PluginLogEntryType, PluginLogLevel, RawPerson, TimestampFormat } from '../../types'
import {
BasePerson,
ClickHousePerson,
Person,
PluginLogEntryType,
PluginLogLevel,
RawPerson,
TimestampFormat,
} from '../../types'
import { status } from '../../utils/status'
import { castTimestampOrNow } from '../../utils/utils'

Expand Down Expand Up @@ -99,28 +106,20 @@ export function personInitialAndUTMProperties(properties: Properties): Propertie
return propertiesCopy
}

export function generateKafkaPersonUpdateMessage(
createdAt: DateTime | string,
properties: Properties,
teamId: number,
isIdentified: boolean,
id: string,
version: number,
isDeleted = 0
): ProducerRecord {
export function generateKafkaPersonUpdateMessage(person: Person, isDeleted = false): ProducerRecord {
return {
topic: KAFKA_PERSON,
messages: [
{
value: JSON.stringify({
id,
created_at: castTimestampOrNow(createdAt, TimestampFormat.ClickHouseSecondPrecision),
properties: JSON.stringify(properties),
team_id: teamId,
is_identified: isIdentified,
is_deleted: isDeleted,
...(version !== null ? { version } : {}),
}),
id: person.uuid,
created_at: castTimestampOrNow(person.created_at, TimestampFormat.ClickHouseSecondPrecision),
properties: JSON.stringify(person.properties),
team_id: person.team_id,
is_identified: Number(person.is_identified),
is_deleted: Number(isDeleted),
version: person.version + (isDeleted ? 100 : 0), // keep in sync with delete_person in posthog/models/person/util.py
} as Omit<ClickHousePerson, 'timestamp'>),
},
],
}
Expand Down
10 changes: 2 additions & 8 deletions plugin-server/tests/main/db.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -367,15 +367,9 @@ describe('DB', () => {
expect(updatedPerson.properties).toEqual({ c: 'aaa' })

// verify correct Kafka message was sent
const expected_message = generateKafkaPersonUpdateMessage(
updateTs,
{ c: 'aaa' },
personDbBefore.team_id,
personDbBefore.is_identified,
personDbBefore.uuid,
1
expect(db.kafkaProducer!.queueMessage).toHaveBeenLastCalledWith(
generateKafkaPersonUpdateMessage(updatedPerson)
)
expect(db.kafkaProducer!.queueMessage).toHaveBeenLastCalledWith(expected_message)
})
})

Expand Down
7 changes: 5 additions & 2 deletions plugin-server/tests/worker/ingestion/postgres-parity.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,16 @@ describe('postgres parity', () => {
await delayUntilEventIngested(() => hub.db.fetchDistinctIdValues(person, Database.ClickHouse), 2)
await delayUntilEventIngested(() => hub.db.fetchDistinctIds(person, Database.ClickHouse), 2)

const clickHousePersons = await hub.db.fetchPersons(Database.ClickHouse)
const clickHousePersons = (await hub.db.fetchPersons(Database.ClickHouse)).map((row) => ({
...row,
properties: JSON.parse(row.properties), // avoids depending on key sort order
}))
expect(clickHousePersons).toEqual([
{
id: uuid,
created_at: expect.any(String), // '2021-02-04 00:18:26.472',
team_id: teamId,
properties: '{"userPropOnce":"propOnceValue","userProp":"propValue"}',
properties: { userPropOnce: 'propOnceValue', userProp: 'propValue' },
is_identified: 1,
is_deleted: 0,
_timestamp: expect.any(String),
Expand Down

0 comments on commit 05a06d2

Please sign in to comment.