Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(plugin-server): Add flat person override table and writer #19220

Merged
merged 21 commits into from
Dec 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
fa3fc28
Add basic tests for ensuring implementations remain in sync.
tkaemming Dec 6, 2023
2ee74a3
Add flat table override definition.
tkaemming Dec 7, 2023
4f04b50
Add automatic migration.
tkaemming Dec 7, 2023
9d75860
Add flat person override writer and add to override writer test suite.
tkaemming Dec 7, 2023
55c0aaf
Improve test structure.
tkaemming Dec 7, 2023
24dfd48
Provide the override writer as a constructor argument to the override…
tkaemming Dec 7, 2023
3834e98
Consolidate tests on the `getPersonOverrides` writer method for easie…
tkaemming Dec 7, 2023
201f74d
Use set instead of sorted arrays.
tkaemming Dec 7, 2023
ce1365b
Add new test variant for flat overrides to the big test suite.
tkaemming Dec 7, 2023
922d11d
Test twiddling
tkaemming Dec 7, 2023
845cf50
Add `POE_DEFERRED_WRITES_USE_FLAT_OVERRIDES` setting.
tkaemming Dec 7, 2023
abe8f8a
Remove comment about foreign key on team_id.
tkaemming Dec 8, 2023
61fb17e
Split up non-deferred from deferred overrides, remove another comment.
tkaemming Dec 8, 2023
72e2c9c
Use `serverConfig` instead of optional `hub` when setting up
tkaemming Dec 15, 2023
1a1d5a0
Add index
tkaemming Dec 15, 2023
796983f
Write comment for pending overrides.
tkaemming Dec 15, 2023
9515c37
Tweak woring
tkaemming Dec 15, 2023
fe8c88a
Add comment to flat person model.
tkaemming Dec 15, 2023
b042f2f
Remove errant punctuation
tkaemming Dec 16, 2023
7b8176b
Add back constraints.
tkaemming Dec 18, 2023
367b735
Merge branch 'master' into poe-deferred-override-worker-flat, update …
tkaemming Dec 18, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion latest_migrations.manifest
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ contenttypes: 0002_remove_content_type_name
ee: 0015_add_verified_properties
otp_static: 0002_throttling
otp_totp: 0002_auto_20190420_0723
posthog: 0376_externaldataschema_last_synced_at
posthog: 0377_flatpersonoverride
sessions: 0001_initial
social_django: 0010_uid_db_index
two_factor: 0007_auto_20201201_1019
1 change: 1 addition & 0 deletions plugin-server/src/config/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ export function getDefaultConfig(): PluginsServerConfig {
DROP_EVENTS_BY_TOKEN_DISTINCT_ID: '',
DROP_EVENTS_BY_TOKEN: '',
POE_DEFERRED_WRITES_ENABLED: false,
POE_DEFERRED_WRITES_USE_FLAT_OVERRIDES: false,
POE_EMBRACE_JOIN_FOR_TEAMS: '',
RELOAD_PLUGIN_JITTER_MAX_MS: 60000,
RUSTY_HOOK_FOR_TEAMS: '',
Expand Down
14 changes: 12 additions & 2 deletions plugin-server/src/main/pluginsServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@ import { status } from '../utils/status'
import { delay } from '../utils/utils'
import { AppMetrics } from '../worker/ingestion/app-metrics'
import { OrganizationManager } from '../worker/ingestion/organization-manager'
import { DeferredPersonOverrideWorker } from '../worker/ingestion/person-state'
import {
DeferredPersonOverrideWorker,
FlatPersonOverrideWriter,
PersonOverrideWriter,
} from '../worker/ingestion/person-state'
import { TeamManager } from '../worker/ingestion/team-manager'
import Piscina, { makePiscina as defaultMakePiscina } from '../worker/piscina'
import { GraphileWorker } from './graphile-worker/graphile-worker'
Expand Down Expand Up @@ -437,7 +441,13 @@ export async function startPluginsServer(
const postgres = hub?.postgres ?? new PostgresRouter(serverConfig)
const kafkaProducer = hub?.kafkaProducer ?? (await createKafkaProducerWrapper(serverConfig))

personOverridesPeriodicTask = new DeferredPersonOverrideWorker(postgres, kafkaProducer).runTask(5000)
personOverridesPeriodicTask = new DeferredPersonOverrideWorker(
postgres,
kafkaProducer,
serverConfig.POE_DEFERRED_WRITES_USE_FLAT_OVERRIDES
? new FlatPersonOverrideWriter(postgres)
: new PersonOverrideWriter(postgres)
).runTask(5000)
personOverridesPeriodicTask.promise.catch(async () => {
status.error('⚠️', 'Person override worker task crashed! Requesting shutdown...')
await closeJobs()
Expand Down
1 change: 1 addition & 0 deletions plugin-server/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ export interface PluginsServerConfig {
DROP_EVENTS_BY_TOKEN: string
POE_EMBRACE_JOIN_FOR_TEAMS: string
POE_DEFERRED_WRITES_ENABLED: boolean
POE_DEFERRED_WRITES_USE_FLAT_OVERRIDES: boolean
RELOAD_PLUGIN_JITTER_MAX_MS: number
RUSTY_HOOK_FOR_TEAMS: string
RUSTY_HOOK_URL: string
Expand Down
141 changes: 135 additions & 6 deletions plugin-server/src/worker/ingestion/person-state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -706,13 +706,142 @@ export class PersonOverrideWriter {

return id
}

public async getPersonOverrides(teamId: number): Promise<PersonOverrideDetails[]> {
const { rows } = await this.postgres.query(
PostgresUse.COMMON_WRITE,
SQL`
SELECT
override.team_id,
old_person.uuid as old_person_id,
override_person.uuid as override_person_id,
oldest_event
FROM posthog_personoverride override
LEFT OUTER JOIN posthog_personoverridemapping old_person
ON override.team_id = old_person.team_id AND override.old_person_id = old_person.id
LEFT OUTER JOIN posthog_personoverridemapping override_person
ON override.team_id = override_person.team_id AND override.override_person_id = override_person.id
WHERE override.team_id = ${teamId}
`,
undefined,
'getPersonOverrides'
)
return rows.map((row) => ({
...row,
oldest_event: DateTime.fromISO(row.oldest_event),
}))
Comment on lines +710 to +732
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method was pulled in from tests so that I could swap the entire backing implementation at once without having to coordinate it across multiple abstractions. This method is really only intended to be used in tests, though.

(I did rewrite the query here because I found the other one challenging to read, though looking back at it now, I do think that the CTE having the predicate associated with it versus being specified here 3x is a nice idea.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense to me. I have a minor preference for naming things like fooForTests or something to avoid footguns. It'd be pretty weird for someone to reach for this... but...

}
}

export class FlatPersonOverrideWriter {
constructor(private postgres: PostgresRouter) {}

public async addPersonOverride(
tx: TransactionClient,
overrideDetails: PersonOverrideDetails
): Promise<ProducerRecord[]> {
const mergedAt = DateTime.now()

await this.postgres.query(
tx,
SQL`
INSERT INTO posthog_flatpersonoverride (
team_id,
old_person_id,
override_person_id,
oldest_event,
version
) VALUES (
${overrideDetails.team_id},
${overrideDetails.old_person_id},
${overrideDetails.override_person_id},
${overrideDetails.oldest_event},
0
)
`,
undefined,
'personOverride'
)

const { rows: transitiveUpdates } = await this.postgres.query(
tx,
SQL`
UPDATE
posthog_flatpersonoverride
SET
override_person_id = ${overrideDetails.override_person_id},
version = COALESCE(version, 0)::numeric + 1
WHERE
team_id = ${overrideDetails.team_id} AND override_person_id = ${overrideDetails.old_person_id}
RETURNING
old_person_id,
version,
oldest_event
`,
undefined,
'transitivePersonOverrides'
)

status.debug('🔁', 'person_overrides_updated', { transitiveUpdates })

const personOverrideMessages: ProducerRecord[] = [
{
topic: KAFKA_PERSON_OVERRIDE,
messages: [
{
value: JSON.stringify({
team_id: overrideDetails.team_id,
old_person_id: overrideDetails.old_person_id,
override_person_id: overrideDetails.override_person_id,
oldest_event: castTimestampOrNow(overrideDetails.oldest_event, TimestampFormat.ClickHouse),
merged_at: castTimestampOrNow(mergedAt, TimestampFormat.ClickHouse),
version: 0,
}),
},
...transitiveUpdates.map(({ old_person_id, version, oldest_event }) => ({
value: JSON.stringify({
team_id: overrideDetails.team_id,
old_person_id: old_person_id,
override_person_id: overrideDetails.override_person_id,
oldest_event: castTimestampOrNow(oldest_event, TimestampFormat.ClickHouse),
merged_at: castTimestampOrNow(mergedAt, TimestampFormat.ClickHouse),
version: version,
}),
})),
Comment on lines +791 to +810
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be made to be less duplicative with the non-flat version, but I don't think it's worth the investment given that the non-flat implementations days should be numbered at this point.

],
},
]

return personOverrideMessages
}

public async getPersonOverrides(teamId: number): Promise<PersonOverrideDetails[]> {
const { rows } = await this.postgres.query(
PostgresUse.COMMON_WRITE,
SQL`
SELECT
team_id,
old_person_id,
override_person_id,
oldest_event
FROM posthog_flatpersonoverride
WHERE team_id = ${teamId}
`,
undefined,
'getPersonOverrides'
)
return rows.map((row) => ({
...row,
team_id: parseInt(row.team_id), // XXX: pg returns bigint as str (reasonably so)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not a good idea in general terms, but like the other getPersonOverrides above, this is only intended to be used in tests.

On second thought, there's probably no harm in replacing parseInt with BigInt, though? It seems like it should be safe…?

> BigInt('10') == 10
true

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, seems harmless (either way?).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, Jest doesn't like it after all,

diff --git a/plugin-server/src/worker/ingestion/person-state.ts b/plugin-server/src/worker/ingestion/person-state.ts
index f49ee1ad33..e33fcad987 100644
--- a/plugin-server/src/worker/ingestion/person-state.ts
+++ b/plugin-server/src/worker/ingestion/person-state.ts
@@ -832,7 +832,7 @@ export class FlatPersonOverrideWriter {
         )
         return rows.map((row) => ({
             ...row,
-            team_id: parseInt(row.team_id), // XXX: pg returns bigint as str (reasonably so)
+            team_id: BigInt(row.team_id), // pg returns bigint as str
             oldest_event: DateTime.fromISO(row.oldest_event),
         }))
     }

causes this error:

  ● person overrides writer: flat › handles direct overrides

    expect(received).toEqual(expected) // deep equality

    - Expected  - 1
    + Received  + 1

      Array [
        Object {
          "old_person_id": "018c6f1a-36df-0000-4ee6-e4ef7823a3f8",
          "oldest_event": "1969-12-31T16:00:00.000-08:00",
          "override_person_id": "018c6f1a-36df-0001-a5b9-21531ca2319e",
    -     "team_id": 474353818,
    +     "team_id": 474353818n,
        },
      ]

      2152 |         })
      2153 |
    > 2154 |         expect(await writer.getPersonOverrides(teamId)).toEqual([{ ...defaults, ...override }])
           |                                                         ^
      2155 |     })
      2156 |
      2157 |     it('handles transitive overrides', async () => {

      at Object.toEqual (tests/worker/ingestion/person-state.test.ts:2154:57)

Not sure why, as the values compare just fine:

> 486884794 == 486884794n
true

Looks like that equality test comes from somewhere in here, so it's not using the equality operator directly: https://github.com/jestjs/jest/blob/e54c0ebb048e10331345dbe99f8ec07654a43f1c/packages/expect-utils/src/jasmineUtils.ts#L63-L212

teamId is used all over in these tests so it seems like a whole different can of worms to try and change it to BigInt consistently, so going to avoid falling down that rabbit hole and leave it as-is, I guess. (posthog_team.id isn't a bigint on the table schema, either, FWIW -- at least as it exists in the repository, I didn't check the production schema.)

Copy link
Contributor

@bretthoerner bretthoerner Dec 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

posthog_team.id isn't a bigint on the table schema, either, FWIW

Oh, weird. To be honesty when I read your comment I was 🤨 that we're planning for >2billion teams already. Not that it isn't nice to be prepared, but this seems pretty low priority to think about, lol.

edit: Actually the Django Person model doesn't even claim to use BigInt if I'm reading correctly. I get the feeling this was a copy-pasta in the old Overrides where it makes more sense for Persons themselves to be BigInt.

Copy link
Contributor Author

@tkaemming tkaemming Dec 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I just grabbed it from PersonOverrideMapping myself. I just assumed there was a "always use bigserial for primary keys" to avoid surprises later regardless of how probable they'd be to become real issues.

Knowing that, maybe it makes sense to drop these back down to a plain old 4 byte integer for consistency with the actual team primary key? Though these tables should ideally stay pretty small through overrides, so maybe it doesn't matter much at all? 🤷‍♂️

I do like the idea of 2 billion teams though.

oldest_event: DateTime.fromISO(row.oldest_event),
}))
}
}

const deferredPersonOverridesWrittenCounter = new Counter({
name: 'deferred_person_overrides_written',
help: 'Number of person overrides that have been written as pending',
})

export class DeferredPersonOverrideWriter {
constructor(private postgres: PostgresRouter) {}

Expand Down Expand Up @@ -759,11 +888,11 @@ export class DeferredPersonOverrideWorker {
// it just needs to be consistent across all processes.
public readonly lockId = 567

private writer: PersonOverrideWriter

constructor(private postgres: PostgresRouter, private kafkaProducer: KafkaProducerWrapper) {
this.writer = new PersonOverrideWriter(this.postgres)
}
constructor(
private postgres: PostgresRouter,
private kafkaProducer: KafkaProducerWrapper,
private writer: PersonOverrideWriter | FlatPersonOverrideWriter
) {}

/**
* Process all (or up to the given limit) pending overrides.
Expand Down
Loading
Loading