Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(plugin-server): Add deferred person override writer #19007

Merged
merged 27 commits into from
Dec 5, 2023
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
6481654
Add `PendingPersonOverride` model.
tkaemming Nov 25, 2023
2c38030
Add automatic migration.
tkaemming Nov 29, 2023
0486f2b
Add unfortunate `MergeOperation` type to keep consistency between non…
tkaemming Nov 30, 2023
be35915
Add `DeferredPersonOverrideWriter`.
tkaemming Nov 30, 2023
69835f0
Start on dependency injection of override writer for better testing.
tkaemming Nov 30, 2023
cf3a096
Support disabled, immediate, and deferred overrides in tests.
tkaemming Nov 30, 2023
3c8fd4f
Minor improvements.
tkaemming Nov 30, 2023
2bc97ba
Syntactical sugar rush
tkaemming Nov 30, 2023
603090f
Fix bug in `addPersonOverrideMapping` that was causing cross-team map…
tkaemming Dec 1, 2023
9144472
Ensure items are processed serially: not sure if this is absolutely n…
tkaemming Dec 1, 2023
60e05e0
Add processing lock.
tkaemming Dec 1, 2023
5bd9d73
Add `Event` class for testing.
tkaemming Dec 1, 2023
206e787
Ensure advisory lock is held before proceeding with processing overri…
tkaemming Dec 1, 2023
19c363a
Add return value, docstring, slightly improve test.
tkaemming Dec 1, 2023
2a963a3
Add a basic test to shoveling data between tables.
tkaemming Dec 1, 2023
ee45b07
Ensure that the transaction is rolled back in case of Kafka error.
tkaemming Dec 1, 2023
5317947
Add comments to a couple of things that could use a bit more explanation
tkaemming Dec 2, 2023
a037805
Remove some duplication from tests.
tkaemming Dec 2, 2023
15b7f49
Consistently use overridesMode (even if not absolutely required)
tkaemming Dec 2, 2023
f37f1c2
Might as well make sure the data makes it to ClickHouse too.
tkaemming Dec 2, 2023
a89c926
Merge branch 'master' into poe-deferred-override-writer
tkaemming Dec 4, 2023
0b14b86
Apply suggestions from code review
tkaemming Dec 4, 2023
cef6d03
Better document ``MergeOperation``/``PersonOverride`` dependency.
tkaemming Dec 4, 2023
7fe6805
Add more detail around retry safety during deferred publish.
tkaemming Dec 4, 2023
ff8aeda
Rename `MergeOperation` to `PersonOverrideDetails` so that it's relat…
tkaemming Dec 4, 2023
01a172f
Add the ability to pass a limit to `processPendingOverrides`.
tkaemming Dec 5, 2023
2bf12eb
Merge branch 'master' into poe-deferred-override-writer
tkaemming Dec 5, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion latest_migrations.manifest
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ contenttypes: 0002_remove_content_type_name
ee: 0015_add_verified_properties
otp_static: 0002_throttling
otp_totp: 0002_auto_20190420_0723
posthog: 0367_job_inputs
posthog: 0368_pendingpersonoverride
sessions: 0001_initial
social_django: 0010_uid_db_index
two_factor: 0007_auto_20201201_1019
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { Person } from 'types'

import { normalizeEvent } from '../../../utils/event'
import { status } from '../../../utils/status'
import { PersonState } from '../person-state'
import { PersonOverrideWriter, PersonState } from '../person-state'
import { parseEventTimestamp } from '../timestamps'
import { EventPipelineRunner } from './runner'

Expand All @@ -29,7 +29,7 @@ export async function processPersonsStep(
timestamp,
runner.hub.db,
runner.hub.statsd,
runner.poEEmbraceJoin
runner.poEEmbraceJoin ? new PersonOverrideWriter(runner.hub.db.postgres) : undefined
).update()

return [event, person]
Expand Down
185 changes: 141 additions & 44 deletions plugin-server/src/worker/ingestion/person-state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { StatsD } from 'hot-shots'
import { ProducerRecord } from 'kafkajs'
import { DateTime } from 'luxon'
import { Counter } from 'prom-client'
import { KafkaProducerWrapper } from 'utils/db/kafka-producer-wrapper'

import { KAFKA_PERSON_OVERRIDE } from '../../config/kafka-topics'
import { Person, PropertyUpdateOperation, TimestampFormat } from '../../types'
Expand Down Expand Up @@ -92,7 +93,6 @@ export class PersonState {
private db: DB
private statsd: StatsD | undefined
public updateIsIdentified: boolean // TODO: remove this from the class and being hidden
private poEEmbraceJoin: boolean

constructor(
event: PluginEvent,
Expand All @@ -101,7 +101,7 @@ export class PersonState {
timestamp: DateTime,
db: DB,
statsd: StatsD | undefined = undefined,
poEEmbraceJoin = false,
private personOverrideWriter?: PersonOverrideWriter | DeferredPersonOverrideWriter,
uuid: UUIDT | undefined = undefined,
maxMergeAttempts: number = MAX_FAILED_PERSON_MERGE_ATTEMPTS
) {
Expand All @@ -119,9 +119,6 @@ export class PersonState {
// If set to true, we'll update `is_identified` at the end of `updateProperties`
// :KLUDGE: This is an indirect communication channel between `handleIdentifyOrAlias` and `updateProperties`
this.updateIsIdentified = false

// For persons on events embrace the join gradual roll-out, remove after fully rolled out
this.poEEmbraceJoin = poEEmbraceJoin
}

async update(): Promise<Person> {
Expand Down Expand Up @@ -451,7 +448,7 @@ export class PersonState {
const properties: Properties = { ...otherPerson.properties, ...mergeInto.properties }
this.applyEventPropertyUpdates(properties)

if (this.poEEmbraceJoin) {
if (this.personOverrideWriter) {
// Optimize merging persons to keep using the person id that has longer history,
// which means we'll have less events to update during the squash later
if (otherPerson.created_at < mergeInto.created_at) {
Expand Down Expand Up @@ -486,7 +483,7 @@ export class PersonState {
call: this.event.event, // $identify, $create_alias or $merge_dangerously
oldPersonIdentified: String(otherPerson.is_identified),
newPersonIdentified: String(mergeInto.is_identified),
poEEmbraceJoin: String(this.poEEmbraceJoin),
poEEmbraceJoin: String(!!this.personOverrideWriter),
})
.inc()

Expand Down Expand Up @@ -518,12 +515,10 @@ export class PersonState {
const deletePersonMessages = await this.db.deletePerson(otherPerson, tx)

let personOverrideMessages: ProducerRecord[] = []
if (this.poEEmbraceJoin) {
personOverrideMessages = await new PersonOverrideWriter(this.db.postgres).addPersonOverride(
if (this.personOverrideWriter) {
personOverrideMessages = await this.personOverrideWriter.addPersonOverride(
tx,
this.teamId,
otherPerson,
mergeInto
getMergeOperation(this.teamId, otherPerson, mergeInto)
)
}

Expand All @@ -544,37 +539,54 @@ export class PersonState {
call: this.event.event, // $identify, $create_alias or $merge_dangerously
oldPersonIdentified: String(otherPerson.is_identified),
newPersonIdentified: String(mergeInto.is_identified),
poEEmbraceJoin: String(this.poEEmbraceJoin),
poEEmbraceJoin: String(!!this.personOverrideWriter),
})
.inc()
return result
}
}

class PersonOverrideWriter {
constructor(private postgres: PostgresRouter) {}
type MergeOperation = {
team_id: number
old_person_id: string
override_person_id: string
oldest_event: DateTime
}
tkaemming marked this conversation as resolved.
Show resolved Hide resolved

public async addPersonOverride(
tx: TransactionClient,
teamId: number,
oldPerson: Person,
overridePerson: Person
): Promise<ProducerRecord[]> {
if (teamId != oldPerson.team_id || teamId != overridePerson.team_id) {
throw new Error('cannot merge persons across different teams')
}
function getMergeOperation(teamId: number, oldPerson: Person, overridePerson: Person): MergeOperation {
if (teamId != oldPerson.team_id || teamId != overridePerson.team_id) {
throw new Error('cannot merge persons across different teams')
}
return {
team_id: teamId,
old_person_id: oldPerson.uuid,
override_person_id: overridePerson.uuid,
oldest_event: overridePerson.created_at,
}
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is kind of awkward/annoying but is necessary to ensure that

  1. the immediate and deferred implementations are substitutable with each other
  2. the non-deferred override writer doesn't require access to full Person instances, just what we're able to read out of the pending overrides table

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense.


export class PersonOverrideWriter {
constructor(private postgres: PostgresRouter) {}

public async addPersonOverride(tx: TransactionClient, mergeOperation: MergeOperation): Promise<ProducerRecord[]> {
const mergedAt = DateTime.now()
const oldestEvent = overridePerson.created_at
/**
We'll need to do 4 updates:

1. Add the persons involved to the helper table (2 of them)
2. Add an override from oldPerson to override person
3. Update any entries that have oldPerson as the override person to now also point to the new override person. Note that we don't update `oldest_event`, because it's a heuristic (used to optimise squashing) tied to the old_person and nothing changed about the old_person who's events need to get squashed.
*/
const oldPersonId = await this.addPersonOverrideMapping(tx, oldPerson)
const overridePersonId = await this.addPersonOverrideMapping(tx, overridePerson)
const oldPersonMappingId = await this.addPersonOverrideMapping(
tx,
mergeOperation.team_id,
mergeOperation.old_person_id
)
const overridePersonMappingId = await this.addPersonOverrideMapping(
tx,
mergeOperation.team_id,
mergeOperation.override_person_id
)

await this.postgres.query(
tx,
Expand All @@ -586,10 +598,10 @@ class PersonOverrideWriter {
oldest_event,
version
) VALUES (
${teamId},
${oldPersonId},
${overridePersonId},
${oldestEvent},
${mergeOperation.team_id},
${oldPersonMappingId},
${overridePersonMappingId},
${mergeOperation.oldest_event},
0
)
`,
Expand All @@ -606,9 +618,9 @@ class PersonOverrideWriter {
UPDATE
posthog_personoverride
SET
override_person_id = ${overridePersonId}, version = COALESCE(version, 0)::numeric + 1
override_person_id = ${overridePersonMappingId}, version = COALESCE(version, 0)::numeric + 1
WHERE
team_id = ${teamId} AND override_person_id = ${oldPersonId}
team_id = ${mergeOperation.team_id} AND override_person_id = ${oldPersonMappingId}
RETURNING
old_person_id,
version,
Expand Down Expand Up @@ -637,21 +649,21 @@ class PersonOverrideWriter {
messages: [
{
value: JSON.stringify({
team_id: teamId,
team_id: mergeOperation.team_id,
old_person_id: mergeOperation.old_person_id,
override_person_id: mergeOperation.override_person_id,
oldest_event: castTimestampOrNow(mergeOperation.oldest_event, TimestampFormat.ClickHouse),
merged_at: castTimestampOrNow(mergedAt, TimestampFormat.ClickHouse),
override_person_id: overridePerson.uuid,
old_person_id: oldPerson.uuid,
oldest_event: castTimestampOrNow(oldestEvent, TimestampFormat.ClickHouse),
version: 0,
}),
},
...transitiveUpdates.map(({ old_person_id, version, oldest_event }) => ({
value: JSON.stringify({
team_id: teamId,
merged_at: castTimestampOrNow(mergedAt, TimestampFormat.ClickHouse),
override_person_id: overridePerson.uuid,
team_id: mergeOperation.team_id,
old_person_id: old_person_id,
override_person_id: mergeOperation.override_person_id,
oldest_event: castTimestampOrNow(oldest_event, TimestampFormat.ClickHouse),
merged_at: castTimestampOrNow(mergedAt, TimestampFormat.ClickHouse),
version: version,
}),
})),
Expand All @@ -662,7 +674,7 @@ class PersonOverrideWriter {
return personOverrideMessages
}

private async addPersonOverrideMapping(tx: TransactionClient, person: Person): Promise<number> {
private async addPersonOverrideMapping(tx: TransactionClient, teamId: number, personId: string): Promise<number> {
/**
Update the helper table that serves as a mapping between a serial ID and a Person UUID.

Expand All @@ -684,8 +696,8 @@ class PersonOverrideWriter {
uuid
)
VALUES (
${person.team_id},
'${person.uuid}'
${teamId},
'${personId}'
)
ON CONFLICT("team_id", "uuid") DO NOTHING
RETURNING id
Expand All @@ -694,7 +706,7 @@ class PersonOverrideWriter {
UNION ALL
SELECT id
FROM posthog_personoverridemapping
WHERE uuid = '${person.uuid}'
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed this while I was writing some tests that were using constant UUIDs (so only the team ID was changing between tests) and I started seeing strange results after person updates were applied. I think this was also causing this query to do full table scans each time since it would no longer be able to use the (team_id, uuid) index. I don't think this was the cause of the performance problems, though.

WHERE team_id = ${teamId} AND uuid = '${personId}'
`,
undefined,
'personOverrideMapping'
Expand All @@ -704,6 +716,91 @@ class PersonOverrideWriter {
}
}

export class DeferredPersonOverrideWriter {
/**
* @param lockId the lock identifier/key used to ensure that only one
* process is updating the overrides at a time
Comment on lines +712 to +713
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kind of annoying that you have to use a number for these, not a more descriptive string: https://www.postgresql.org/docs/current/functions-admin.html#FUNCTIONS-ADVISORY-LOCKS

*/
constructor(private postgres: PostgresRouter, private lockId: number) {}

/**
* Enqueue an override for deferred processing.
*/
public async addPersonOverride(tx: TransactionClient, mergeOperation: MergeOperation): Promise<ProducerRecord[]> {
await this.postgres.query(
tx,
SQL`
INSERT INTO posthog_pendingpersonoverride (
team_id,
old_person_id,
override_person_id,
oldest_event
) VALUES (
${mergeOperation.team_id},
${mergeOperation.old_person_id},
${mergeOperation.override_person_id},
${mergeOperation.oldest_event}
)`,
undefined,
'pendingPersonOverride'
)

return []
}

/**
* Process all pending overrides. An advisory lock is acquired prior to
* processing to ensure that this function has exclusive access to the
* pending overrides during the update process.
*
* @returns the number of overrides processed
*/
public async processPendingOverrides(kafkaProducer: KafkaProducerWrapper): Promise<number> {
const writer = new PersonOverrideWriter(this.postgres)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After this is running, I'll make the change to update/simplify the schema (removing the mapping table, etc.) I don't have a clear plan on how that data migration will actually work (recall that we want to preserve the existing overrides for internal use), but I figure that having this off the rest of the ingestion path will make that more manageable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd highly suggest doing the migration while only our team is using PoE and furthermore to be extra blunt here if we had a delay of a couple of days for our data to get into the overrides final table that wouldn't be a problem, I'd just give a quick fyi in some channel about that.


return await this.postgres.transaction(PostgresUse.COMMON_WRITE, 'processPendingOverrides', async (tx) => {
const {
rows: [{ acquired }],
} = await this.postgres.query(
tx,
SQL`SELECT pg_try_advisory_xact_lock(${this.lockId}) as acquired`,
undefined,
'processPendingOverrides'
)
if (!acquired) {
throw new Error('could not acquire lock')
}

// n.b.: Ordering by id ensures we are processing in (roughly) FIFO order
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This stresses me out and confuses me a little bit: while the sequence generation should be monotonic, transactions might not become visible in a way that ensures a strict total ordering here (and trying to figure out if the rows are locked in a fashion that would ensure that rows become visible in order gives me a headache and is probably a brittle assumption to rely on anyway.)

These two merges (notated as override id <- old id):

  1. B <- C
  2. A <- B

Should result in these overrides (same notation):

  1. A <- C (updated from B <- C due to transitive update)
  2. A <- B

This will work correctly with the current override writer as long the events are processed in order because we look for events where the old person (in the case of the second event, B) is the same as an override person (which, if processed in order, we'd have a row for B from the first event):

UPDATE
posthog_personoverride
SET
override_person_id = ${overridePersonMappingId}, version = COALESCE(version, 0)::numeric + 1
WHERE
team_id = ${mergeOperation.team_id} AND override_person_id = ${oldPersonMappingId}
RETURNING
old_person_id,
version,
oldest_event

… but I don't think this works if we happen to get events out of order for some reason (A <- B then B <- C) as we'd be looking for values on the "wrong" side of the relationship.

I think if we want to be safe here, we need to check for transitive updates on both sides of the merge operation, and probably do so recursively?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could have sworn we discussed this and figured it all out but I'll have to look through our chats. 😰

How are you thinking we could check for transitive updates on both sides?

I wonder if this is why we had (for a moment, anyway) discussed how this "pending" table could instead be the one and only source of truth, rather than rows of a queue that come and go. Like, I think we had discussed a table that was something like (team_id, old_person_id, override_person_id, shortened_path_person_id, oldest_event). So shortened_path_person_id would be updated as needed, and it would of course need indexes in different places if we wanted to do transitive lookups.

Unless you have figured out the solution here maybe we should discuss outside of comments here again and write down our findings.

Copy link
Contributor Author

@tkaemming tkaemming Dec 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was mostly worried before about the potential to create cycles in the merge history by foregoing the exclusion constraint (simplest example being A <- B and B <- A showing up in the pending person overrides), but I don't think that is possible due to the person deletion here: we'd fail to get the lock when updating the person when attempting to merge in the other direction.1 This block also helps (assuming created_at is immutable, and has a strict total ordering, i.e. no values are equal — which this probably isn't, but is probably close enough in practice), since that should keep us from incidentally creating a cycle between multiple persons because any sequence of merges within a set of users will always just end up pointing to the oldest user in the set.

That is the stuff that I recall talking about before, but there may have been more beyond that stuff that has fallen out of my brain already. The order of operations stuff is a bit different.

Thinking out loud a bit here (there are probably a bunch of graph theoretic terms that would make these explanations clearer — or at least more precise — bear with me):

Generalizing this as a graph, I think that as long as there aren't any cycles in it (the merge graph), we should at least in theory be able to add new edges to it (i.e. append person merges) in basically any order.

The person overrides table isn't really a representation of the graph (though pending person overrides would be, if we never deleted from it — it's essentially an adjacency list), and is instead an index of the shortest path between an arbitrary node and whatever node it points to that only has edges pointing at it and none away from it (i.e. only shows up in the merge graph as an override person, never as an old person.)

If we are processing events in order, A <- B <- C will only ever arrive as B <- C first, A <- B second. B <- C can't come after A <- B, because B would have been deleted. This makes it easier to keep our shortest path index, because once B <- C happens, we can just update all records for C to reference B and never consider it again, i.e. all records that had an override of C now can have an override of B. C <- D isn't a valid new entry, though it may have been an old one, updated in the previous step that now reflects that B <- D.

If we are processing events out of order, we lose this advantage. We can see either A <- B or B <- C first. If the operations come in order, we just do what was written above (and what the code already does.) If the operations come out of order, (A <- B, then B <- C) we need to update the B <- C event as part of the insertion operation to reflect the fact that we've already seen A <- B and do this prior to updating any records that already pointed at C to now point to A like before. Since we don't know if operations are arriving out of order, we need to treat them as if they were all the time. We don't need to do anything recursively because we're not dealing with a graph with intermediate edges (by design.)

I'm not sure how the squash operation would play into this yet. If we were to squash in between two out of order operations (e.g. A <- B, squash, B <- C), I think that means we could have some dangling merges that don't ever "complete" unless we keep a copy of the Postgres overrides around forever to identify the disconnect.

Footnotes

  1. Aside: I guess that maybe these locks should be taken explicitly if they matter that much? Also, there is a caveat here that the person UUID can't be reused again, otherwise it could appear in the merge history later and cause a cycle — but since it's a time-based UUID, I really hope that's not a problem.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My 2 🪙 on how to think of the ordering problem

So if we think about what ingestion does, then it has a single transaction that merges the person and adds the info to posthog_pendingpersonoverride table. In order to make this more clear I'll use capital letters to denote distinctIds that are being merged and lower case letters to denote personIDs that distinct_id maps to:

B(b) <- C(c)   ==> B(b) & C(b)  
A(a) <- B(b)   ==> A(a) & B(a) & C(a)

when we think of person overrides we'd get:

b <- c   ... applying transitive updates we get .... a <- c
a <- b

Now the other way (note how the second row has B(a) because person b doesn't exist anymore, which means we result in a <- c in the person overrides):

A(a) <- B(b)  ==> A(a) & B(a)
B(a) <- C(c)  ==> A(a) & B(a) & C(a)

when we think of person overrides we'd get:

a <- b
a <- c

Does this help?

For the squash part we discussed that if we cut off at a random point we'll just end up doing more updates, no problem ... e.g. if we squash, when we have b <- c but don't yet know that a <- b and hence a <- c if we do squash on top of b <-c we'd overwrite events to have person_id = b that then combined with the overrides table (a <- b) tells us that all those events are of person_id = a. So it works fine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this help?

I think that's a good explanation of the correct/expected behavior. It's also a helpful reminder that the person overrides are a derivative of the distinct IDs, as well.

The override writer right now only handles the first case.

For the squash part we discussed that if we cut off at a random point we'll just end up doing more updates, no problem ... e.g. if we squash, when we have b <- c but don't yet know that a <- b and hence a <- c if we do squash on top of b <-c we'd overwrite events to have person_id = b that then combined with the overrides table (a <- b) tells us that all those events are of person_id = a. So it works fine.

That example makes sense -- but in that example, the overrides are visible in order. If they arrive out of order, we'd end up in this scenario:

  1. pending override for A <- B becomes visible and we update overrides to reflect A <- B
  2. (squash happens, overrides are deleted)
  3. pending override for B <- C becomes visible (out of order for some reason) to reflect B <- C (because we no longer have knowledge of A <- B to update the transitive override)

Don't all of the events that were previously associated with C get "lost" in that case, because they no longer reference a user that exists?

I'm not sure there's a good way around that, but here are some options I can think of:

  1. ensure all inserts to the pending person overrides actually become visible in primary key order: either by locking the table as part of the merge transaction (yuck) or possibly by using serializable isolation level in the override worker and dealing with the potential for needing to retry due to serialization failures (also not great)
  2. not deleting overrides ever (simple, but unbounded growth is not ideal)
  3. define some transaction grace period, where we only select rows as inputs to squash that are older than X minutes, and assume that all transactions prior within that window have settled and that all valid insertions are now visible (maybe the best option?)

Copy link
Contributor

@tiina303 tiina303 Dec 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's not possible. We use postgres & transactions. There's no way we'd be able to write them out of order. Which ever distinctID merge happens first sets the person overrides in place for that. Then the later happening transaction would operate on the personIDs that are now mapping to those distinctIDs. You can't see a
b <- a after a c <- b, because the latter would have combined b&c, so the lookup for b would have returned c & you'd end up writing c <- a as the later insertion to the pending overrides table.

Copy link
Contributor Author

@tkaemming tkaemming Dec 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm going to merge this as-is for now and deal with these issues as a follow-up: I think the potential for pending overrides becoming visible out-of-order is ultimately low probability but high consequence, and I suspect we're very likely doing enough row locking here that the merge transaction that rows that affect the same persons are forced into a serial execution path anyway (but I'm not sure how to prove that -- and even if I could, I think that'd be a tenuous guarantee to rely on.)

I have a possible solution for correcting out-of-order visibility in here for pending overrides, but handling the interactions with squash is still an open question (to me, at least.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For squash. We should use PG as the source of truth. If we stop the overrides writer, we can squash safely. Happy to write this up in more detail if we're not convinced yet.

Copy link
Contributor Author

@tkaemming tkaemming Dec 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, my wires were crossed here the other day and I wasn't thinking about this or communicating clearly: I was thinking about if we could reliably treat the data in the pending table as FIFO queue/log based on the ordering of the primary key, which is a different topic than transaction visibility. I'm not sure why I was mixing that up and combining the two topics. They are sort of related, but not really the same.

The data in the pending table should represent a consistent view of the state of overrides due to the transaction, and we shouldn't be able to end up in an inconsistent state where we see the side effects of a transaction prior to some other transaction that preceded it. We'd wouldn't be able to see A <- B without also seeing B <- C in the same set, regardless of their ordering within that set.

It is theoretically possible that the primary key of the records created within those transactions don't appear in the order that they were committed — but based on the ordering of statements within the merge transaction and the locks that will be involved with those statements, I don't think this is an issue in practice either, B <- C should have a lower primary key than A <- B because of the row locks involved (at least those involved with person update and deletion, maybe others) and those statements preceding the insert to the pending overrides table which generates the sequence value.

Sorry for making that so confusing earlier.

For squash: it seems like it'd be ideal if we could keep the overrides writer running while squashes are occurring? I'm not sure how long squashes will take (though hopefully we'll find out next week) but it seems like that could delay the visible side effects of merges for a little bit? I guess it's not the end of the world if that's that case, overrides will still happen eventually, they'll just take longer to observe. I think there are things we could to do make it go smoother if the squash process is slow enough to be problematic, but probably better to actually focus on getting it running first. 🙂

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

squash: yes we should be able to.
The interesting case is: If the overrides writer updates a row after squash started processing that row ... if we remove only the rows that we processed (e.g. the problem comes, when some row got updated, e.g. we handled B <- A and now we have C <- A (because afterwards overrides writer added C <- B). Now squash can still update all the events to have B instead of A, it would just not remove the C <- A row (might also be safe to remove, but I'm not sure, so would be easier not to). We can always keep extra rows in PG there without any risk.

const { rows } = await this.postgres.query(
tx,
`SELECT * FROM posthog_pendingpersonoverride ORDER BY id`,
undefined,
'processPendingOverrides'
)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another thought that just came to mind: it might be worth splitting this up into batches to be a bit more defensive in the (hopefully unlikely) scenario where this result set becomes unreasonably large, just to avoid needing to have the entire set in memory as well as avoiding a long-running transaction blocking vacuum and potentially needing to get rolled back?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think we'll want batching if only because I imagine we'll have big surges of pending rows if we ever stop ingestion for a while and then turn it on, catching up quickly. Some protection here would be good.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense. Rather than implement the batching in this method, I'm going to make it possible to pass a limit so that the caller of this method has a bit more flexibility to control how batching is performed, if they want it -- they'll have enough data from the return value to be able to determine whether or not their limit was reached.


const messages: ProducerRecord[] = []
for (const { id, ...mergeOperation } of rows) {
messages.push(...(await writer.addPersonOverride(tx, mergeOperation)))
await this.postgres.query(
tx,
SQL`DELETE FROM posthog_pendingpersonoverride WHERE id = ${id}`,
undefined,
'processPendingOverrides'
)
}

// n.b.: We publish the messages here (and wait for acks) to ensure
// that all of our override updates are sent to Kafka before
tkaemming marked this conversation as resolved.
Show resolved Hide resolved
// prior to committing the transaction. If we're unable to publish,
// we should discard updates and try again later when it's available
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably worth calling out that it's safe for us to publish to Kafka and then crash, and republish the same or different rows again later (well, I hope it is).

await kafkaProducer.queueMessages(messages, true)

return rows.length
})
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is what is going to be called in a loop by the plugin server (logging, error handling, metrics, etc will happen up there.)

}

export function ageInMonthsLowCardinality(timestamp: DateTime): number {
const ageInMonths = Math.max(-Math.floor(timestamp.diffNow('months').months), 0)
// for getting low cardinality for statsd metrics tags, which can cause issues in e.g. InfluxDB:
Expand Down
19 changes: 19 additions & 0 deletions plugin-server/tests/helpers/promises.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,22 @@ export function createPromise<T = void>(): MockPromise<T> {

return result as MockPromise<T>
}

export class WaitEvent {
private promise: Promise<void>
private resolve: () => void

constructor() {
this.promise = new Promise((resolve) => {
this.resolve = resolve
})
}

public set() {
this.resolve()
}

public async wait() {
return this.promise
}
}
Comment on lines +17 to +34
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Loading
Loading