Skip to content

Commit

Permalink
chore(plugin-server, temporal): Consolidate on deferred flat person o…
Browse files Browse the repository at this point in the history
…verrides everywhere (PostHog#19590)
  • Loading branch information
tkaemming authored and jacobwgillespie committed Jan 12, 2024
1 parent eb1da14 commit 74defde
Show file tree
Hide file tree
Showing 9 changed files with 53 additions and 642 deletions.
2 changes: 1 addition & 1 deletion plugin-server/src/capabilities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ export function getPluginServerCapabilities(config: PluginsServerConfig): Plugin
processAsyncOnEventHandlers: true,
processAsyncWebhooksHandlers: true,
sessionRecordingBlobIngestion: true,
personOverrides: config.POE_DEFERRED_WRITES_ENABLED,
personOverrides: true,
appManagementSingleton: true,
preflightSchedules: true,
...sharedCapabilities,
Expand Down
2 changes: 0 additions & 2 deletions plugin-server/src/config/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,6 @@ export function getDefaultConfig(): PluginsServerConfig {
EXTERNAL_REQUEST_TIMEOUT_MS: 10 * 1000, // 10 seconds
DROP_EVENTS_BY_TOKEN_DISTINCT_ID: '',
DROP_EVENTS_BY_TOKEN: '',
POE_DEFERRED_WRITES_ENABLED: false,
POE_DEFERRED_WRITES_USE_FLAT_OVERRIDES: false,
POE_EMBRACE_JOIN_FOR_TEAMS: '',
POE_WRITES_ENABLED_MAX_TEAM_ID: 0,
POE_WRITES_EXCLUDE_TEAMS: '',
Expand Down
10 changes: 2 additions & 8 deletions plugin-server/src/main/pluginsServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,7 @@ import { status } from '../utils/status'
import { delay } from '../utils/utils'
import { AppMetrics } from '../worker/ingestion/app-metrics'
import { OrganizationManager } from '../worker/ingestion/organization-manager'
import {
DeferredPersonOverrideWorker,
FlatPersonOverrideWriter,
PersonOverrideWriter,
} from '../worker/ingestion/person-state'
import { DeferredPersonOverrideWorker, FlatPersonOverrideWriter } from '../worker/ingestion/person-state'
import { TeamManager } from '../worker/ingestion/team-manager'
import Piscina, { makePiscina as defaultMakePiscina } from '../worker/piscina'
import { GraphileWorker } from './graphile-worker/graphile-worker'
Expand Down Expand Up @@ -450,9 +446,7 @@ export async function startPluginsServer(
personOverridesPeriodicTask = new DeferredPersonOverrideWorker(
postgres,
kafkaProducer,
serverConfig.POE_DEFERRED_WRITES_USE_FLAT_OVERRIDES
? new FlatPersonOverrideWriter(postgres)
: new PersonOverrideWriter(postgres)
new FlatPersonOverrideWriter(postgres)
).runTask(5000)
personOverridesPeriodicTask.promise.catch(async () => {
status.error('⚠️', 'Person override worker task crashed! Requesting shutdown...')
Expand Down
2 changes: 0 additions & 2 deletions plugin-server/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,6 @@ export interface PluginsServerConfig {
DROP_EVENTS_BY_TOKEN_DISTINCT_ID: string
DROP_EVENTS_BY_TOKEN: string
POE_EMBRACE_JOIN_FOR_TEAMS: string
POE_DEFERRED_WRITES_ENABLED: boolean
POE_DEFERRED_WRITES_USE_FLAT_OVERRIDES: boolean
POE_WRITES_ENABLED_MAX_TEAM_ID: number
POE_WRITES_EXCLUDE_TEAMS: string
RELOAD_PLUGIN_JITTER_MAX_MS: number
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { Person } from 'types'

import { normalizeEvent } from '../../../utils/event'
import { status } from '../../../utils/status'
import { DeferredPersonOverrideWriter, PersonOverrideWriter, PersonState } from '../person-state'
import { DeferredPersonOverrideWriter, PersonState } from '../person-state'
import { parseEventTimestamp } from '../timestamps'
import { EventPipelineRunner } from './runner'

Expand All @@ -22,13 +22,9 @@ export async function processPersonsStep(
throw error
}

let overridesWriter: PersonOverrideWriter | DeferredPersonOverrideWriter | undefined = undefined
let overridesWriter: DeferredPersonOverrideWriter | undefined = undefined
if (runner.poEEmbraceJoin) {
if (runner.hub.POE_DEFERRED_WRITES_ENABLED) {
overridesWriter = new DeferredPersonOverrideWriter(runner.hub.db.postgres)
} else {
overridesWriter = new PersonOverrideWriter(runner.hub.db.postgres)
}
overridesWriter = new DeferredPersonOverrideWriter(runner.hub.db.postgres)
}

const person = await new PersonState(
Expand Down
202 changes: 5 additions & 197 deletions plugin-server/src/worker/ingestion/person-state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ export class PersonState {
distinctId: string,
timestamp: DateTime,
db: DB,
private personOverrideWriter?: PersonOverrideWriter | DeferredPersonOverrideWriter,
private personOverrideWriter?: DeferredPersonOverrideWriter,
uuid: UUIDT | undefined = undefined,
maxMergeAttempts: number = MAX_FAILED_PERSON_MERGE_ATTEMPTS
) {
Expand Down Expand Up @@ -496,23 +496,14 @@ export class PersonState {

const deletePersonMessages = await this.db.deletePerson(otherPerson, tx)

let personOverrideMessages: ProducerRecord[] = []
if (this.personOverrideWriter) {
personOverrideMessages = await this.personOverrideWriter.addPersonOverride(
await this.personOverrideWriter.addPersonOverride(
tx,
getPersonOverrideDetails(this.teamId, otherPerson, mergeInto)
)
}

return [
[
...personOverrideMessages,
...updatePersonMessages,
...distinctIdMessages,
...deletePersonMessages,
],
person,
]
return [[...updatePersonMessages, ...distinctIdMessages, ...deletePersonMessages], person]
}
)

Expand Down Expand Up @@ -554,185 +545,6 @@ function getPersonOverrideDetails(teamId: number, oldPerson: Person, overridePer
}
}

export class PersonOverrideWriter {
constructor(private postgres: PostgresRouter) {}

public async addPersonOverride(
tx: TransactionClient,
overrideDetails: PersonOverrideDetails
): Promise<ProducerRecord[]> {
const mergedAt = DateTime.now()
/**
We'll need to do 4 updates:
1. Add the persons involved to the helper table (2 of them)
2. Add an override from oldPerson to override person
3. Update any entries that have oldPerson as the override person to now also point to the new override person. Note that we don't update `oldest_event`, because it's a heuristic (used to optimise squashing) tied to the old_person and nothing changed about the old_person who's events need to get squashed.
*/
const oldPersonMappingId = await this.addPersonOverrideMapping(
tx,
overrideDetails.team_id,
overrideDetails.old_person_id
)
const overridePersonMappingId = await this.addPersonOverrideMapping(
tx,
overrideDetails.team_id,
overrideDetails.override_person_id
)

await this.postgres.query(
tx,
SQL`
INSERT INTO posthog_personoverride (
team_id,
old_person_id,
override_person_id,
oldest_event,
version
) VALUES (
${overrideDetails.team_id},
${oldPersonMappingId},
${overridePersonMappingId},
${overrideDetails.oldest_event},
0
)
`,
undefined,
'personOverride'
)

// The follow-up JOIN is required as ClickHouse requires UUIDs, so we need to fetch the UUIDs
// of the IDs we updated from the mapping table.
const { rows: transitiveUpdates } = await this.postgres.query(
tx,
SQL`
WITH updated_ids AS (
UPDATE
posthog_personoverride
SET
override_person_id = ${overridePersonMappingId}, version = COALESCE(version, 0)::numeric + 1
WHERE
team_id = ${overrideDetails.team_id} AND override_person_id = ${oldPersonMappingId}
RETURNING
old_person_id,
version,
oldest_event
)
SELECT
helper.uuid as old_person_id,
updated_ids.version,
updated_ids.oldest_event
FROM
updated_ids
JOIN
posthog_personoverridemapping helper
ON
helper.id = updated_ids.old_person_id;
`,
undefined,
'transitivePersonOverrides'
)

status.debug('🔁', 'person_overrides_updated', { transitiveUpdates })

const personOverrideMessages: ProducerRecord[] = [
{
topic: KAFKA_PERSON_OVERRIDE,
messages: [
{
value: JSON.stringify({
team_id: overrideDetails.team_id,
old_person_id: overrideDetails.old_person_id,
override_person_id: overrideDetails.override_person_id,
oldest_event: castTimestampOrNow(overrideDetails.oldest_event, TimestampFormat.ClickHouse),
merged_at: castTimestampOrNow(mergedAt, TimestampFormat.ClickHouse),
version: 0,
}),
},
...transitiveUpdates.map(({ old_person_id, version, oldest_event }) => ({
value: JSON.stringify({
team_id: overrideDetails.team_id,
old_person_id: old_person_id,
override_person_id: overrideDetails.override_person_id,
oldest_event: castTimestampOrNow(oldest_event, TimestampFormat.ClickHouse),
merged_at: castTimestampOrNow(mergedAt, TimestampFormat.ClickHouse),
version: version,
}),
})),
],
},
]

return personOverrideMessages
}

private async addPersonOverrideMapping(tx: TransactionClient, teamId: number, personId: string): Promise<number> {
/**
Update the helper table that serves as a mapping between a serial ID and a Person UUID.
This mapping is used to enable an exclusion constraint in the personoverrides table, which
requires int[], while avoiding any constraints on "hotter" tables, like person.
**/

// ON CONFLICT nothing is returned, so we get the id in the second SELECT statement below.
// Fear not, the constraints on personoverride will handle any inconsistencies.
// This mapping table is really nothing more than a mapping to support exclusion constraints
// as we map int ids to UUIDs (the latter not supported in exclusion contraints).
const {
rows: [{ id }],
} = await this.postgres.query(
tx,
`WITH insert_id AS (
INSERT INTO posthog_personoverridemapping(
team_id,
uuid
)
VALUES (
${teamId},
'${personId}'
)
ON CONFLICT("team_id", "uuid") DO NOTHING
RETURNING id
)
SELECT * FROM insert_id
UNION ALL
SELECT id
FROM posthog_personoverridemapping
WHERE team_id = ${teamId} AND uuid = '${personId}'
`,
undefined,
'personOverrideMapping'
)

return id
}

public async getPersonOverrides(teamId: number): Promise<PersonOverrideDetails[]> {
const { rows } = await this.postgres.query(
PostgresUse.COMMON_WRITE,
SQL`
SELECT
override.team_id,
old_person.uuid as old_person_id,
override_person.uuid as override_person_id,
oldest_event
FROM posthog_personoverride override
LEFT OUTER JOIN posthog_personoverridemapping old_person
ON override.team_id = old_person.team_id AND override.old_person_id = old_person.id
LEFT OUTER JOIN posthog_personoverridemapping override_person
ON override.team_id = override_person.team_id AND override.override_person_id = override_person.id
WHERE override.team_id = ${teamId}
`,
undefined,
'getPersonOverrides'
)
return rows.map((row) => ({
...row,
oldest_event: DateTime.fromISO(row.oldest_event),
}))
}
}

export class FlatPersonOverrideWriter {
constructor(private postgres: PostgresRouter) {}

Expand Down Expand Up @@ -848,10 +660,7 @@ export class DeferredPersonOverrideWriter {
/**
* Enqueue an override for deferred processing.
*/
public async addPersonOverride(
tx: TransactionClient,
overrideDetails: PersonOverrideDetails
): Promise<ProducerRecord[]> {
public async addPersonOverride(tx: TransactionClient, overrideDetails: PersonOverrideDetails): Promise<void> {
await this.postgres.query(
tx,
SQL`
Expand All @@ -870,7 +679,6 @@ export class DeferredPersonOverrideWriter {
'pendingPersonOverride'
)
deferredPersonOverridesWrittenCounter.inc()
return []
}
}

Expand All @@ -891,7 +699,7 @@ export class DeferredPersonOverrideWorker {
constructor(
private postgres: PostgresRouter,
private kafkaProducer: KafkaProducerWrapper,
private writer: PersonOverrideWriter | FlatPersonOverrideWriter
private writer: FlatPersonOverrideWriter
) {}

/**
Expand Down
Loading

0 comments on commit 74defde

Please sign in to comment.