-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(plugin-server): Add capability to use deferred overrides writer and worker #19112
Conversation
d98386f
to
94ef79c
Compare
@@ -19,6 +19,7 @@ export function getPluginServerCapabilities(config: PluginsServerConfig): Plugin | |||
processAsyncOnEventHandlers: true, | |||
processAsyncWebhooksHandlers: true, | |||
sessionRecordingBlobIngestion: true, | |||
personOverrides: config.POE_DEFERRED_WRITES_ENABLED, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is off by default, keeping it from inadvertently being enabled for hobby deploys.
plugin-server/src/worker/ingestion/event-pipeline/processPersonsStep.ts
Outdated
Show resolved
Hide resolved
@@ -712,7 +712,7 @@ export class DeferredPersonOverrideWriter { | |||
* @param lockId the lock identifier/key used to ensure that only one | |||
* process is updating the overrides at a time | |||
*/ | |||
constructor(private postgres: PostgresRouter, private lockId: number) {} | |||
constructor(private postgres: PostgresRouter, private lockId: number = 567) {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is that intentional? how is this lockId used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this is intentional; this is for the advisory lock acquired here that ensures only one process at a time is processing pending overrides to avoid race conditions in transitive overrides processing from overlapping transactions (also described on line 712 above):
posthog/plugin-server/src/worker/ingestion/person-state.ts
Lines 758 to 768 in 7e2065a
const { | |
rows: [{ acquired }], | |
} = await this.postgres.query( | |
tx, | |
SQL`SELECT pg_try_advisory_xact_lock(${this.lockId}) as acquired`, | |
undefined, | |
'processPendingOverrides' | |
) | |
if (!acquired) { | |
throw new Error('could not acquire lock') | |
} |
It's annoying that these are numbers and some varchar/text type that could be more descriptive, but that's what the Postgres API provides: https://www.postgresql.org/docs/current/explicit-locking.html#ADVISORY-LOCKS
The value could also be provided via a setting, but I couldn't think of a reason to make that configurable so I avoided that for now. I guess this could be a class property rather than an instance property though, I had initially made it an instance property so that the functional test runner could be running while other unit/integration tests were run — but thinking about it more, those are still sharing the same tables on the test database so they probably should conflict with each other.
It's possible that this could get replaced with a table lock on the overrides (not pending overrides) table but I want to defer that at least until the immediate overrides path is fully removed so that we don't lock the overrides table by accident while it's still being used in the ingest consumer transaction during the changeover.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Made this static with e2a7487.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's minor, but it may be worth putting a "yep, this is how the PG API works, we're just picking a consistent int yadda yadda" line alongside your existing comment. I have to admit I read it and was still head-tilt at the magic number.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How does this look? fb43fbd
posthog/plugin-server/src/worker/ingestion/person-state.ts
Lines 754 to 760 in fb43fbd
// This lock ID is used as an advisory lock identifier/key for a lock that | |
// ensures only one worker is able to update the overrides table at a time. | |
// (We do this to make it simpler to ensure that we maintain the consistency | |
// of transitive updates.) There isn't any special significance to this | |
// particular value (other than Postgres requires it to be a numeric one), | |
// it just needs to be consistent across all processes. | |
public readonly lockId = 567 |
c3b65af
to
6c4b2d9
Compare
be892a5
to
f239cd9
Compare
@@ -778,7 +785,7 @@ export class DeferredPersonOverrideWriter { | |||
|
|||
const messages: ProducerRecord[] = [] | |||
for (const { id, ...mergeOperation } of rows) { | |||
messages.push(...(await writer.addPersonOverride(tx, mergeOperation))) | |||
messages.push(...(await this.writer.addPersonOverride(tx, mergeOperation))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Something important I hadn't considered before: Team on the override model is a foreign key (although it's not one on the mapping table, just a bigint). I hadn't defined it as a foreign key on the pending override table, so this is either going to need to gracefully handle foreign key constraint violations, or those schemas are going to need to be brought into alignment. We'll also need to decide what to use on the new table. My initial assumption is that its probably fine to use a non-foreign key on the new table, worst case is we end up storing some extra overrides for longer than needed and doing some unnecessary squashing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We'll have to split persons into a separate dataset eventually (other PG or non-PG data-store), so it's fine to start decoupling ourselves from the non-person tables 👍
You're right in your analysis that the cost of non-cascading is one unnecessary squash at the end, that's probably not that bad. But FYI, we don't cascade on team deletion anymore because there's too many tables, see delete_bulky_postgres_data. You can add the new table there and get proper deletions on team deletion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should be in a good spot now, assuming I'm using the metrics abstractions correctly (still haven't gone deep into Prometheus yet.)
Should be safe to start running the deferred overrides worker after this is merged, and safe to enable deferred override writes after that. We can start testing squashes as well, though that process is going to require some reasonably large changes as well due to the overrides table schema changes noted here: #19112 (comment)
private writer: PersonOverrideWriter | ||
|
||
constructor(private postgres: PostgresRouter, private kafkaProducer: KafkaProducerWrapper) { | ||
this.writer = new PersonOverrideWriter(this.postgres) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Next step is going to be making this PersonOverrideWriter
able to be substituted with one that doesn't require the mapping table and constraints, and preparing the process for moving data from the existing table to the new one.
const kafkaProducer = hub?.kafkaProducer ?? (await createKafkaProducerWrapper(serverConfig)) | ||
|
||
personOverridesPeriodicTask = new DeferredPersonOverrideWorker(postgres, kafkaProducer).runTask(5000) | ||
personOverridesPeriodicTask.promise.catch(async () => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Exposing the promise
here seems kind of leaky but not so much that I want to forward catch
through as PeriodicTask.catch
right now.
5a8bfe6
to
2414894
Compare
2414894
to
fb43fbd
Compare
Problem
This adds the ability for the plugin server use the deferred person overrides as implemented in #19007, disabled by default.
Both the deferred override writer (in the ingest-consumer) and worker paths are off by default unless the
POE_DEFERRED_WRITES_ENABLED
configuration setting is enabled (and in that case, thePOE_EMBRACE_JOIN_FOR_TEAMS
still controls which teams actually are opted in to writes.) Theperson-overrides
plugin-server mode can also be used to start an overrides-only worker, bypassing thePOE_DEFERRED_WRITES_ENABLED
check.How did you test this code?
POE_DEFERRED_WRITES_ENABLED=1
andPOE_EMBRACE_JOIN_FOR_TEAMS=*
, watched override logs happening,PeriodicTask
.