-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor(plugin-server): Add deferred person override writer #19007
Conversation
…-deferred and deferred overwrite implementations.
…ping references.
…ecessary, but seems safer.
78bead1
to
206e787
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might make a couple test changes, but I think this is good enough to review now.
My planned next steps for subsequent PRs:
- add capability to plugin-server to periodically call
DeferredPersonOverrideWriter.processPendingOverrides
- add configuration setting to use the
DeferredPersonOverrideWriter
instead ofPersonOverrideWriter
during event ingestion when PoE writes are enabled - add schema/data migration to simplify the overrides table and remove immediate override writes (i.e. remove
PersonOverrideWriter
, all the override table update logic should be straightforward enough to just live inDeferredPersonOverrideWriter
at that point)
type MergeOperation = { | ||
team_id: number | ||
old_person_id: string | ||
override_person_id: string | ||
oldest_event: DateTime | ||
} | ||
|
||
public async addPersonOverride( | ||
tx: TransactionClient, | ||
teamId: number, | ||
oldPerson: Person, | ||
overridePerson: Person | ||
): Promise<ProducerRecord[]> { | ||
if (teamId != oldPerson.team_id || teamId != overridePerson.team_id) { | ||
throw new Error('cannot merge persons across different teams') | ||
} | ||
function getMergeOperation(teamId: number, oldPerson: Person, overridePerson: Person): MergeOperation { | ||
if (teamId != oldPerson.team_id || teamId != overridePerson.team_id) { | ||
throw new Error('cannot merge persons across different teams') | ||
} | ||
return { | ||
team_id: teamId, | ||
old_person_id: oldPerson.uuid, | ||
override_person_id: overridePerson.uuid, | ||
oldest_event: overridePerson.created_at, | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is kind of awkward/annoying but is necessary to ensure that
- the immediate and deferred implementations are substitutable with each other
- the non-deferred override writer doesn't require access to full
Person
instances, just what we're able to read out of the pending overrides table
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense.
@@ -694,7 +706,7 @@ class PersonOverrideWriter { | |||
UNION ALL | |||
SELECT id | |||
FROM posthog_personoverridemapping | |||
WHERE uuid = '${person.uuid}' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I noticed this while I was writing some tests that were using constant UUIDs (so only the team ID was changing between tests) and I started seeing strange results after person updates were applied. I think this was also causing this query to do full table scans each time since it would no longer be able to use the (team_id, uuid)
index. I don't think this was the cause of the performance problems, though.
* @param lockId the lock identifier/key used to ensure that only one | ||
* process is updating the overrides at a time |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Kind of annoying that you have to use a number for these, not a more descriptive string: https://www.postgresql.org/docs/current/functions-admin.html#FUNCTIONS-ADVISORY-LOCKS
/** | ||
* Process all pending overrides. An advisory lock is acquired prior to | ||
* processing to ensure that this function has exclusive access to the | ||
* pending overrides during the update process. | ||
* | ||
* @returns the number of overrides processed | ||
*/ | ||
public async processPendingOverrides(kafkaProducer: KafkaProducerWrapper): Promise<number> { | ||
const writer = new PersonOverrideWriter(this.postgres) | ||
|
||
return await this.postgres.transaction(PostgresUse.COMMON_WRITE, 'processPendingOverrides', async (tx) => { | ||
const { | ||
rows: [{ acquired }], | ||
} = await this.postgres.query( | ||
tx, | ||
SQL`SELECT pg_try_advisory_xact_lock(${this.lockId}) as acquired`, | ||
undefined, | ||
'processPendingOverrides' | ||
) | ||
if (!acquired) { | ||
throw new Error('could not acquire lock') | ||
} | ||
|
||
// n.b.: Ordering by id ensures we are processing in (roughly) FIFO order | ||
const { rows } = await this.postgres.query( | ||
tx, | ||
`SELECT * FROM posthog_pendingpersonoverride ORDER BY id`, | ||
undefined, | ||
'processPendingOverrides' | ||
) | ||
|
||
const messages: ProducerRecord[] = [] | ||
for (const { id, ...mergeOperation } of rows) { | ||
messages.push(...(await writer.addPersonOverride(tx, mergeOperation))) | ||
await this.postgres.query( | ||
tx, | ||
SQL`DELETE FROM posthog_pendingpersonoverride WHERE id = ${id}`, | ||
undefined, | ||
'processPendingOverrides' | ||
) | ||
} | ||
|
||
// n.b.: We publish the messages here (and wait for acks) to ensure | ||
// that all of our override updates are sent to Kafka before | ||
// prior to committing the transaction. If we're unable to publish, | ||
// we should discard updates and try again later when it's available | ||
await kafkaProducer.queueMessages(messages, true) | ||
|
||
return rows.length | ||
}) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is what is going to be called in a loop by the plugin server (logging, error handling, metrics, etc will happen up there.)
* @returns the number of overrides processed | ||
*/ | ||
public async processPendingOverrides(kafkaProducer: KafkaProducerWrapper): Promise<number> { | ||
const writer = new PersonOverrideWriter(this.postgres) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After this is running, I'll make the change to update/simplify the schema (removing the mapping table, etc.) I don't have a clear plan on how that data migration will actually work (recall that we want to preserve the existing overrides for internal use), but I figure that having this off the rest of the ingestion path will make that more manageable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd highly suggest doing the migration while only our team is using PoE and furthermore to be extra blunt here if we had a delay of a couple of days for our data to get into the overrides final table that wouldn't be a problem, I'd just give a quick fyi in some channel about that.
export class WaitEvent { | ||
private promise: Promise<void> | ||
private resolve: () => void | ||
|
||
constructor() { | ||
this.promise = new Promise((resolve) => { | ||
this.resolve = resolve | ||
}) | ||
} | ||
|
||
public set() { | ||
this.resolve() | ||
} | ||
|
||
public async wait() { | ||
return this.promise | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is more or less a translation of https://docs.python.org/3/library/threading.html#event-objects
async function fetchPostgresPersonIdOverrides(hub: Hub, teamId: number): Promise<[string, string][]> { | ||
const result = await hub.db.postgres.query( | ||
PostgresUse.COMMON_WRITE, | ||
` | ||
WITH overrides AS ( | ||
SELECT id, old_person_id, override_person_id | ||
FROM posthog_personoverride | ||
WHERE team_id = ${teamId} | ||
ORDER BY id | ||
) | ||
SELECT | ||
mapping.uuid AS old_person_id, | ||
overrides_mapping.uuid AS override_person_id | ||
FROM | ||
overrides AS first | ||
JOIN | ||
posthog_personoverridemapping AS mapping ON first.old_person_id = mapping.id | ||
JOIN ( | ||
SELECT | ||
second.id AS id, | ||
uuid | ||
FROM | ||
overrides AS second | ||
JOIN posthog_personoverridemapping AS mapping ON second.override_person_id = mapping.id | ||
) AS overrides_mapping ON overrides_mapping.id = first.id | ||
`, | ||
undefined, | ||
'fetchPersonIdOverrides' | ||
) | ||
return result.rows.map(({ old_person_id, override_person_id }) => [old_person_id, override_person_id]).sort() as [ | ||
string, | ||
string | ||
][] | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: this was moved from where it was previously declared in the describe
suite below so that it could be used across different override modes and in the deferred overrides unit tests -- this isn't anything new
interface PersonOverridesMode { | ||
getWriter(hub: Hub): PersonOverrideWriter | DeferredPersonOverrideWriter | ||
fetchPostgresPersonIdOverrides(hub: Hub, teamId: number): Promise<[string, string][]> | ||
} | ||
|
||
const PersonOverridesModes: Record<string, PersonOverridesMode | undefined> = { | ||
disabled: undefined, | ||
immediate: { | ||
getWriter: (hub) => new PersonOverrideWriter(hub.db.postgres), | ||
fetchPostgresPersonIdOverrides: (hub, teamId) => fetchPostgresPersonIdOverrides(hub, teamId), | ||
}, | ||
deferred: { | ||
// XXX: This is kind of a mess -- ideally it'd be preferable to just | ||
// instantiate the writer once and share it | ||
getWriter: (hub) => new DeferredPersonOverrideWriter(hub.db.postgres, 456), | ||
fetchPostgresPersonIdOverrides: async (hub, teamId) => { | ||
await new DeferredPersonOverrideWriter(hub.db.postgres, 456).processPendingOverrides(hub.db.kafkaProducer) | ||
return await fetchPostgresPersonIdOverrides(hub, teamId) | ||
}, | ||
}, | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't like this but I couldn't think of a better way to implement it without getting my hands dirtier in the existing tests than I would prefer -- somebody with more JS/TS fluency might have some better ideas on how to approach this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you just mean the duplication of DeferredPersonOverrideWriter
, all I can think is that you can make it a class to share that object -- but it doesn't seem like it really matters unless sharing the object is critical because of some state.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I'd prefer to do something like this but I can't figure out how to correctly type it. 😕
diff --git a/plugin-server/tests/worker/ingestion/person-state.test.ts b/plugin-server/tests/worker/ingestion/person-state.test.ts
index 48e35edea0..f4bdeb5232 100644
--- a/plugin-server/tests/worker/ingestion/person-state.test.ts
+++ b/plugin-server/tests/worker/ingestion/person-state.test.ts
@@ -60,24 +60,33 @@ async function fetchPostgresPersonIdOverrides(hub: Hub, teamId: number): Promise
}
interface PersonOverridesMode {
- getWriter(hub: Hub): PersonOverrideWriter | DeferredPersonOverrideWriter
- fetchPostgresPersonIdOverrides(hub: Hub, teamId: number): Promise<[string, string][]>
+ getWriter(): PersonOverrideWriter | DeferredPersonOverrideWriter
+ fetchPostgresPersonIdOverrides(teamId: number): Promise<[string, string][]>
}
-const PersonOverridesModes: Record<string, PersonOverridesMode | undefined> = {
+const PersonOverridesModes: Record<string, ((hub: Hub) => PersonOverridesMode) | undefined> = {
disabled: undefined,
- immediate: {
- getWriter: (hub) => new PersonOverrideWriter(hub.db.postgres),
- fetchPostgresPersonIdOverrides: (hub, teamId) => fetchPostgresPersonIdOverrides(hub, teamId),
+ immediate: class {
+ constructor(private hub: Hub) {}
+ getWriter() {
+ return new PersonOverrideWriter(this.hub.db.postgres)
+ }
+ fetchPostgresPersonIdOverrides(teamId: number) {
+ return fetchPostgresPersonIdOverrides(this.hub, teamId)
+ }
},
- deferred: {
- // XXX: This is kind of a mess -- ideally it'd be preferable to just
- // instantiate the writer once and share it
- getWriter: (hub) => new DeferredPersonOverrideWriter(hub.db.postgres, 456),
- fetchPostgresPersonIdOverrides: async (hub, teamId) => {
- await new DeferredPersonOverrideWriter(hub.db.postgres, 456).processPendingOverrides(hub.db.kafkaProducer)
- return await fetchPostgresPersonIdOverrides(hub, teamId)
- },
+ deferred: class {
+ private writer: DeferredPersonOverrideWriter
+ constructor(private hub: Hub) {
+ this.writer = new DeferredPersonOverrideWriter(this.hub.db.postgres, 456)
+ }
+ getWriter() {
+ return this.writer
+ }
+ async fetchPostgresPersonIdOverrides(teamId: number) {
+ await this.writer.processPendingOverrides(this.hub.db.kafkaProducer)
+ return await fetchPostgresPersonIdOverrides(this.hub, teamId)
+ }
},
}
Not a big deal, just kind of clumsy as-is.
throw new Error('could not acquire lock') | ||
} | ||
|
||
// n.b.: Ordering by id ensures we are processing in (roughly) FIFO order |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This stresses me out and confuses me a little bit: while the sequence generation should be monotonic, transactions might not become visible in a way that ensures a strict total ordering here (and trying to figure out if the rows are locked in a fashion that would ensure that rows become visible in order gives me a headache and is probably a brittle assumption to rely on anyway.)
These two merges (notated as override id <- old id):
- B <- C
- A <- B
Should result in these overrides (same notation):
- A <- C (updated from B <- C due to transitive update)
- A <- B
This will work correctly with the current override writer as long the events are processed in order because we look for events where the old person (in the case of the second event, B) is the same as an override person (which, if processed in order, we'd have a row for B from the first event):
posthog/plugin-server/src/worker/ingestion/person-state.ts
Lines 618 to 627 in 15b7f49
UPDATE | |
posthog_personoverride | |
SET | |
override_person_id = ${overridePersonMappingId}, version = COALESCE(version, 0)::numeric + 1 | |
WHERE | |
team_id = ${mergeOperation.team_id} AND override_person_id = ${oldPersonMappingId} | |
RETURNING | |
old_person_id, | |
version, | |
oldest_event |
… but I don't think this works if we happen to get events out of order for some reason (A <- B then B <- C) as we'd be looking for values on the "wrong" side of the relationship.
I think if we want to be safe here, we need to check for transitive updates on both sides of the merge operation, and probably do so recursively?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I could have sworn we discussed this and figured it all out but I'll have to look through our chats. 😰
How are you thinking we could check for transitive updates on both sides?
I wonder if this is why we had (for a moment, anyway) discussed how this "pending"
table could instead be the one and only source of truth, rather than rows of a queue that come and go. Like, I think we had discussed a table that was something like (team_id, old_person_id, override_person_id, shortened_path_person_id, oldest_event)
. So shortened_path_person_id
would be updated as needed, and it would of course need indexes in different places if we wanted to do transitive lookups.
Unless you have figured out the solution here maybe we should discuss outside of comments here again and write down our findings.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was mostly worried before about the potential to create cycles in the merge history by foregoing the exclusion constraint (simplest example being A <- B and B <- A showing up in the pending person overrides), but I don't think that is possible due to the person deletion here: we'd fail to get the lock when updating the person when attempting to merge in the other direction.1 This block also helps (assuming created_at
is immutable, and has a strict total ordering, i.e. no values are equal — which this probably isn't, but is probably close enough in practice), since that should keep us from incidentally creating a cycle between multiple persons because any sequence of merges within a set of users will always just end up pointing to the oldest user in the set.
That is the stuff that I recall talking about before, but there may have been more beyond that stuff that has fallen out of my brain already. The order of operations stuff is a bit different.
Thinking out loud a bit here (there are probably a bunch of graph theoretic terms that would make these explanations clearer — or at least more precise — bear with me):
Generalizing this as a graph, I think that as long as there aren't any cycles in it (the merge graph), we should at least in theory be able to add new edges to it (i.e. append person merges) in basically any order.
The person overrides table isn't really a representation of the graph (though pending person overrides would be, if we never deleted from it — it's essentially an adjacency list), and is instead an index of the shortest path between an arbitrary node and whatever node it points to that only has edges pointing at it and none away from it (i.e. only shows up in the merge graph as an override person, never as an old person.)
If we are processing events in order, A <- B <- C will only ever arrive as B <- C first, A <- B second. B <- C can't come after A <- B, because B would have been deleted. This makes it easier to keep our shortest path index, because once B <- C happens, we can just update all records for C to reference B and never consider it again, i.e. all records that had an override of C now can have an override of B. C <- D isn't a valid new entry, though it may have been an old one, updated in the previous step that now reflects that B <- D.
If we are processing events out of order, we lose this advantage. We can see either A <- B or B <- C first. If the operations come in order, we just do what was written above (and what the code already does.) If the operations come out of order, (A <- B, then B <- C) we need to update the B <- C event as part of the insertion operation to reflect the fact that we've already seen A <- B and do this prior to updating any records that already pointed at C to now point to A like before. Since we don't know if operations are arriving out of order, we need to treat them as if they were all the time. We don't need to do anything recursively because we're not dealing with a graph with intermediate edges (by design.)
I'm not sure how the squash operation would play into this yet. If we were to squash in between two out of order operations (e.g. A <- B, squash, B <- C), I think that means we could have some dangling merges that don't ever "complete" unless we keep a copy of the Postgres overrides around forever to identify the disconnect.
Footnotes
-
Aside: I guess that maybe these locks should be taken explicitly if they matter that much? Also, there is a caveat here that the person UUID can't be reused again, otherwise it could appear in the merge history later and cause a cycle — but since it's a time-based UUID, I really hope that's not a problem. ↩
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My 2 🪙 on how to think of the ordering problem
So if we think about what ingestion does, then it has a single transaction that merges the person and adds the info to posthog_pendingpersonoverride
table. In order to make this more clear I'll use capital letters to denote distinctIds that are being merged and lower case letters to denote personIDs that distinct_id maps to:
B(b) <- C(c) ==> B(b) & C(b)
A(a) <- B(b) ==> A(a) & B(a) & C(a)
when we think of person overrides we'd get:
b <- c ... applying transitive updates we get .... a <- c
a <- b
Now the other way (note how the second row has B(a) because person b doesn't exist anymore, which means we result in a <- c in the person overrides):
A(a) <- B(b) ==> A(a) & B(a)
B(a) <- C(c) ==> A(a) & B(a) & C(a)
when we think of person overrides we'd get:
a <- b
a <- c
Does this help?
For the squash part we discussed that if we cut off at a random point we'll just end up doing more updates, no problem ... e.g. if we squash, when we have b <- c
but don't yet know that a <- b
and hence a <- c
if we do squash on top of b <-c
we'd overwrite events to have person_id = b
that then combined with the overrides table (a <- b
) tells us that all those events are of person_id = a
. So it works fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this help?
I think that's a good explanation of the correct/expected behavior. It's also a helpful reminder that the person overrides are a derivative of the distinct IDs, as well.
The override writer right now only handles the first case.
For the squash part we discussed that if we cut off at a random point we'll just end up doing more updates, no problem ... e.g. if we squash, when we have b <- c but don't yet know that a <- b and hence a <- c if we do squash on top of b <-c we'd overwrite events to have person_id = b that then combined with the overrides table (a <- b) tells us that all those events are of person_id = a. So it works fine.
That example makes sense -- but in that example, the overrides are visible in order. If they arrive out of order, we'd end up in this scenario:
- pending override for A <- B becomes visible and we update overrides to reflect A <- B
- (squash happens, overrides are deleted)
- pending override for B <- C becomes visible (out of order for some reason) to reflect B <- C (because we no longer have knowledge of A <- B to update the transitive override)
Don't all of the events that were previously associated with C get "lost" in that case, because they no longer reference a user that exists?
I'm not sure there's a good way around that, but here are some options I can think of:
- ensure all inserts to the pending person overrides actually become visible in primary key order: either by locking the table as part of the merge transaction (yuck) or possibly by using serializable isolation level in the override worker and dealing with the potential for needing to retry due to serialization failures (also not great)
- not deleting overrides ever (simple, but unbounded growth is not ideal)
- define some transaction grace period, where we only select rows as inputs to squash that are older than X minutes, and assume that all transactions prior within that window have settled and that all valid insertions are now visible (maybe the best option?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's not possible. We use postgres & transactions. There's no way we'd be able to write them out of order. Which ever distinctID merge happens first sets the person overrides in place for that. Then the later happening transaction would operate on the personIDs that are now mapping to those distinctIDs. You can't see a
b <- a
after a c <- b
, because the latter would have combined b&c, so the lookup for b would have returned c & you'd end up writing c <- a
as the later insertion to the pending overrides table.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm going to merge this as-is for now and deal with these issues as a follow-up: I think the potential for pending overrides becoming visible out-of-order is ultimately low probability but high consequence, and I suspect we're very likely doing enough row locking here that the merge transaction that rows that affect the same persons are forced into a serial execution path anyway (but I'm not sure how to prove that -- and even if I could, I think that'd be a tenuous guarantee to rely on.)
I have a possible solution for correcting out-of-order visibility in here for pending overrides, but handling the interactions with squash is still an open question (to me, at least.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For squash. We should use PG as the source of truth. If we stop the overrides writer, we can squash safely. Happy to write this up in more detail if we're not convinced yet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, my wires were crossed here the other day and I wasn't thinking about this or communicating clearly: I was thinking about if we could reliably treat the data in the pending table as FIFO queue/log based on the ordering of the primary key, which is a different topic than transaction visibility. I'm not sure why I was mixing that up and combining the two topics. They are sort of related, but not really the same.
The data in the pending table should represent a consistent view of the state of overrides due to the transaction, and we shouldn't be able to end up in an inconsistent state where we see the side effects of a transaction prior to some other transaction that preceded it. We'd wouldn't be able to see A <- B without also seeing B <- C in the same set, regardless of their ordering within that set.
It is theoretically possible that the primary key of the records created within those transactions don't appear in the order that they were committed — but based on the ordering of statements within the merge transaction and the locks that will be involved with those statements, I don't think this is an issue in practice either, B <- C should have a lower primary key than A <- B because of the row locks involved (at least those involved with person update and deletion, maybe others) and those statements preceding the insert to the pending overrides table which generates the sequence value.
Sorry for making that so confusing earlier.
For squash: it seems like it'd be ideal if we could keep the overrides writer running while squashes are occurring? I'm not sure how long squashes will take (though hopefully we'll find out next week) but it seems like that could delay the visible side effects of merges for a little bit? I guess it's not the end of the world if that's that case, overrides will still happen eventually, they'll just take longer to observe. I think there are things we could to do make it go smoother if the squash process is slow enough to be problematic, but probably better to actually focus on getting it running first. 🙂
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
squash: yes we should be able to.
The interesting case is: If the overrides writer updates a row after squash started processing that row ... if we remove only the rows that we processed (e.g. the problem comes, when some row got updated, e.g. we handled B <- A and now we have C <- A (because afterwards overrides writer added C <- B). Now squash can still update all the events to have B instead of A, it would just not remove the C <- A row (might also be safe to remove, but I'm not sure, so would be easier not to). We can always keep extra rows in PG there without any risk.
const { rows } = await this.postgres.query( | ||
tx, | ||
`SELECT * FROM posthog_pendingpersonoverride ORDER BY id`, | ||
undefined, | ||
'processPendingOverrides' | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another thought that just came to mind: it might be worth splitting this up into batches to be a bit more defensive in the (hopefully unlikely) scenario where this result set becomes unreasonably large, just to avoid needing to have the entire set in memory as well as avoiding a long-running transaction blocking vacuum and potentially needing to get rolled back?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I think we'll want batching if only because I imagine we'll have big surges of pending rows if we ever stop ingestion for a while and then turn it on, catching up quickly. Some protection here would be good.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That makes sense. Rather than implement the batching in this method, I'm going to make it possible to pass a limit so that the caller of this method has a bit more flexibility to control how batching is performed, if they want it -- they'll have enough data from the return value to be able to determine whether or not their limit was reached.
type MergeOperation = { | ||
team_id: number | ||
old_person_id: string | ||
override_person_id: string | ||
oldest_event: DateTime | ||
} | ||
|
||
public async addPersonOverride( | ||
tx: TransactionClient, | ||
teamId: number, | ||
oldPerson: Person, | ||
overridePerson: Person | ||
): Promise<ProducerRecord[]> { | ||
if (teamId != oldPerson.team_id || teamId != overridePerson.team_id) { | ||
throw new Error('cannot merge persons across different teams') | ||
} | ||
function getMergeOperation(teamId: number, oldPerson: Person, overridePerson: Person): MergeOperation { | ||
if (teamId != oldPerson.team_id || teamId != overridePerson.team_id) { | ||
throw new Error('cannot merge persons across different teams') | ||
} | ||
return { | ||
team_id: teamId, | ||
old_person_id: oldPerson.uuid, | ||
override_person_id: overridePerson.uuid, | ||
oldest_event: overridePerson.created_at, | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense.
const { rows } = await this.postgres.query( | ||
tx, | ||
`SELECT * FROM posthog_pendingpersonoverride ORDER BY id`, | ||
undefined, | ||
'processPendingOverrides' | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I think we'll want batching if only because I imagine we'll have big surges of pending rows if we ever stop ingestion for a while and then turn it on, catching up quickly. Some protection here would be good.
// n.b.: We publish the messages here (and wait for acks) to ensure | ||
// that all of our override updates are sent to Kafka before | ||
// prior to committing the transaction. If we're unable to publish, | ||
// we should discard updates and try again later when it's available |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably worth calling out that it's safe for us to publish to Kafka and then crash, and republish the same or different rows again later (well, I hope it is).
interface PersonOverridesMode { | ||
getWriter(hub: Hub): PersonOverrideWriter | DeferredPersonOverrideWriter | ||
fetchPostgresPersonIdOverrides(hub: Hub, teamId: number): Promise<[string, string][]> | ||
} | ||
|
||
const PersonOverridesModes: Record<string, PersonOverridesMode | undefined> = { | ||
disabled: undefined, | ||
immediate: { | ||
getWriter: (hub) => new PersonOverrideWriter(hub.db.postgres), | ||
fetchPostgresPersonIdOverrides: (hub, teamId) => fetchPostgresPersonIdOverrides(hub, teamId), | ||
}, | ||
deferred: { | ||
// XXX: This is kind of a mess -- ideally it'd be preferable to just | ||
// instantiate the writer once and share it | ||
getWriter: (hub) => new DeferredPersonOverrideWriter(hub.db.postgres, 456), | ||
fetchPostgresPersonIdOverrides: async (hub, teamId) => { | ||
await new DeferredPersonOverrideWriter(hub.db.postgres, 456).processPendingOverrides(hub.db.kafkaProducer) | ||
return await fetchPostgresPersonIdOverrides(hub, teamId) | ||
}, | ||
}, | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you just mean the duplication of DeferredPersonOverrideWriter
, all I can think is that you can make it a class to share that object -- but it doesn't seem like it really matters unless sharing the object is critical because of some state.
throw new Error('could not acquire lock') | ||
} | ||
|
||
// n.b.: Ordering by id ensures we are processing in (roughly) FIFO order |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I could have sworn we discussed this and figured it all out but I'll have to look through our chats. 😰
How are you thinking we could check for transitive updates on both sides?
I wonder if this is why we had (for a moment, anyway) discussed how this "pending"
table could instead be the one and only source of truth, rather than rows of a queue that come and go. Like, I think we had discussed a table that was something like (team_id, old_person_id, override_person_id, shortened_path_person_id, oldest_event)
. So shortened_path_person_id
would be updated as needed, and it would of course need indexes in different places if we wanted to do transitive lookups.
Unless you have figured out the solution here maybe we should discuss outside of comments here again and write down our findings.
Problem
This adds the core functionality for dealing with deferred person overrides as described in PostHog/product-internal#522 (but doesn't yet integrate the processing loop into the plugin-server.) The motivation for this change is described at length in the linked PR, but the short explanation is that we expect that moving the maintenance of person overrides largely out of the existing transaction within the event pipeline and into an asynchronous path will allow us perform these updates with less overhead, as we can eliminate the potential for concurrent and potentially conflicting transactions by running a single worker.
Eliminating the potential for concurrent transactions also lets us remove the exclusion constraint on the person overrides table, which will itself let us simplify the person overrides table by removing the need for the person overrides mapping table — but this is left for a later change.
How did you test this code?
Updated the existing tests to cover deferred overrides and added a few unit tests for the deferred bits.