-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor(plugin-server): Add flat person override table and writer #19220
Changes from 19 commits
fa3fc28
2ee74a3
4f04b50
9d75860
55c0aaf
24dfd48
3834e98
201f74d
ce1365b
922d11d
845cf50
abe8f8a
61fb17e
72e2c9c
1a1d5a0
796983f
9515c37
fe8c88a
b042f2f
7b8176b
367b735
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -706,13 +706,142 @@ export class PersonOverrideWriter { | |
|
||
return id | ||
} | ||
|
||
public async getPersonOverrides(teamId: number): Promise<PersonOverrideDetails[]> { | ||
const { rows } = await this.postgres.query( | ||
PostgresUse.COMMON_WRITE, | ||
SQL` | ||
SELECT | ||
override.team_id, | ||
old_person.uuid as old_person_id, | ||
override_person.uuid as override_person_id, | ||
oldest_event | ||
FROM posthog_personoverride override | ||
LEFT OUTER JOIN posthog_personoverridemapping old_person | ||
ON override.team_id = old_person.team_id AND override.old_person_id = old_person.id | ||
LEFT OUTER JOIN posthog_personoverridemapping override_person | ||
ON override.team_id = override_person.team_id AND override.override_person_id = override_person.id | ||
WHERE override.team_id = ${teamId} | ||
`, | ||
undefined, | ||
'getPersonOverrides' | ||
) | ||
return rows.map((row) => ({ | ||
...row, | ||
oldest_event: DateTime.fromISO(row.oldest_event), | ||
})) | ||
} | ||
} | ||
|
||
export class FlatPersonOverrideWriter { | ||
constructor(private postgres: PostgresRouter) {} | ||
|
||
public async addPersonOverride( | ||
tx: TransactionClient, | ||
overrideDetails: PersonOverrideDetails | ||
): Promise<ProducerRecord[]> { | ||
const mergedAt = DateTime.now() | ||
|
||
await this.postgres.query( | ||
tx, | ||
SQL` | ||
INSERT INTO posthog_flatpersonoverride ( | ||
team_id, | ||
old_person_id, | ||
override_person_id, | ||
oldest_event, | ||
version | ||
) VALUES ( | ||
${overrideDetails.team_id}, | ||
${overrideDetails.old_person_id}, | ||
${overrideDetails.override_person_id}, | ||
${overrideDetails.oldest_event}, | ||
0 | ||
) | ||
`, | ||
undefined, | ||
'personOverride' | ||
) | ||
|
||
const { rows: transitiveUpdates } = await this.postgres.query( | ||
tx, | ||
SQL` | ||
UPDATE | ||
posthog_flatpersonoverride | ||
SET | ||
override_person_id = ${overrideDetails.override_person_id}, | ||
version = COALESCE(version, 0)::numeric + 1 | ||
WHERE | ||
team_id = ${overrideDetails.team_id} AND override_person_id = ${overrideDetails.old_person_id} | ||
RETURNING | ||
old_person_id, | ||
version, | ||
oldest_event | ||
`, | ||
undefined, | ||
'transitivePersonOverrides' | ||
) | ||
|
||
status.debug('🔁', 'person_overrides_updated', { transitiveUpdates }) | ||
|
||
const personOverrideMessages: ProducerRecord[] = [ | ||
{ | ||
topic: KAFKA_PERSON_OVERRIDE, | ||
messages: [ | ||
{ | ||
value: JSON.stringify({ | ||
team_id: overrideDetails.team_id, | ||
old_person_id: overrideDetails.old_person_id, | ||
override_person_id: overrideDetails.override_person_id, | ||
oldest_event: castTimestampOrNow(overrideDetails.oldest_event, TimestampFormat.ClickHouse), | ||
merged_at: castTimestampOrNow(mergedAt, TimestampFormat.ClickHouse), | ||
version: 0, | ||
}), | ||
}, | ||
...transitiveUpdates.map(({ old_person_id, version, oldest_event }) => ({ | ||
value: JSON.stringify({ | ||
team_id: overrideDetails.team_id, | ||
old_person_id: old_person_id, | ||
override_person_id: overrideDetails.override_person_id, | ||
oldest_event: castTimestampOrNow(oldest_event, TimestampFormat.ClickHouse), | ||
merged_at: castTimestampOrNow(mergedAt, TimestampFormat.ClickHouse), | ||
version: version, | ||
}), | ||
})), | ||
Comment on lines
+791
to
+810
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This could be made to be less duplicative with the non-flat version, but I don't think it's worth the investment given that the non-flat implementations days should be numbered at this point. |
||
], | ||
}, | ||
] | ||
|
||
return personOverrideMessages | ||
} | ||
|
||
public async getPersonOverrides(teamId: number): Promise<PersonOverrideDetails[]> { | ||
const { rows } = await this.postgres.query( | ||
PostgresUse.COMMON_WRITE, | ||
SQL` | ||
SELECT | ||
team_id, | ||
old_person_id, | ||
override_person_id, | ||
oldest_event | ||
FROM posthog_flatpersonoverride | ||
WHERE team_id = ${teamId} | ||
`, | ||
undefined, | ||
'getPersonOverrides' | ||
) | ||
return rows.map((row) => ({ | ||
...row, | ||
team_id: parseInt(row.team_id), // XXX: pg returns bigint as str (reasonably so) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is not a good idea in general terms, but like the other On second thought, there's probably no harm in replacing > BigInt('10') == 10
true There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, seems harmless (either way?). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hm, Jest doesn't like it after all, diff --git a/plugin-server/src/worker/ingestion/person-state.ts b/plugin-server/src/worker/ingestion/person-state.ts
index f49ee1ad33..e33fcad987 100644
--- a/plugin-server/src/worker/ingestion/person-state.ts
+++ b/plugin-server/src/worker/ingestion/person-state.ts
@@ -832,7 +832,7 @@ export class FlatPersonOverrideWriter {
)
return rows.map((row) => ({
...row,
- team_id: parseInt(row.team_id), // XXX: pg returns bigint as str (reasonably so)
+ team_id: BigInt(row.team_id), // pg returns bigint as str
oldest_event: DateTime.fromISO(row.oldest_event),
}))
} causes this error:
Not sure why, as the values compare just fine:
Looks like that equality test comes from somewhere in here, so it's not using the equality operator directly: https://github.com/jestjs/jest/blob/e54c0ebb048e10331345dbe99f8ec07654a43f1c/packages/expect-utils/src/jasmineUtils.ts#L63-L212
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Oh, weird. To be honesty when I read your comment I was 🤨 that we're planning for >2billion teams already. Not that it isn't nice to be prepared, but this seems pretty low priority to think about, lol. edit: Actually the Django Person model doesn't even claim to use BigInt if I'm reading correctly. I get the feeling this was a copy-pasta in the old Overrides where it makes more sense for Persons themselves to be BigInt. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, I just grabbed it from Knowing that, maybe it makes sense to drop these back down to a plain old 4 byte integer for consistency with the actual team primary key? Though these tables should ideally stay pretty small through overrides, so maybe it doesn't matter much at all? 🤷♂️ I do like the idea of 2 billion teams though. |
||
oldest_event: DateTime.fromISO(row.oldest_event), | ||
})) | ||
} | ||
} | ||
|
||
const deferredPersonOverridesWrittenCounter = new Counter({ | ||
name: 'deferred_person_overrides_written', | ||
help: 'Number of person overrides that have been written as pending', | ||
}) | ||
|
||
export class DeferredPersonOverrideWriter { | ||
constructor(private postgres: PostgresRouter) {} | ||
|
||
|
@@ -759,11 +888,11 @@ export class DeferredPersonOverrideWorker { | |
// it just needs to be consistent across all processes. | ||
public readonly lockId = 567 | ||
|
||
private writer: PersonOverrideWriter | ||
|
||
constructor(private postgres: PostgresRouter, private kafkaProducer: KafkaProducerWrapper) { | ||
this.writer = new PersonOverrideWriter(this.postgres) | ||
} | ||
constructor( | ||
private postgres: PostgresRouter, | ||
private kafkaProducer: KafkaProducerWrapper, | ||
private writer: PersonOverrideWriter | FlatPersonOverrideWriter | ||
) {} | ||
|
||
/** | ||
* Process all (or up to the given limit) pending overrides. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method was pulled in from tests so that I could swap the entire backing implementation at once without having to coordinate it across multiple abstractions. This method is really only intended to be used in tests, though.
(I did rewrite the query here because I found the other one challenging to read, though looking back at it now, I do think that the CTE having the predicate associated with it versus being specified here 3x is a nice idea.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense to me. I have a minor preference for naming things like
fooForTests
or something to avoid footguns. It'd be pretty weird for someone to reach for this... but...