Skip to content

Commit

Permalink
Add processing lock.
Browse files Browse the repository at this point in the history
  • Loading branch information
tkaemming committed Dec 1, 2023
1 parent 4b66c1d commit 78bead1
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 3 deletions.
14 changes: 13 additions & 1 deletion plugin-server/src/worker/ingestion/person-state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -717,7 +717,7 @@ export class PersonOverrideWriter {
}

export class DeferredPersonOverrideWriter {
constructor(private postgres: PostgresRouter) {}
constructor(private postgres: PostgresRouter, private lockId: number) {}

public async addPersonOverride(tx: TransactionClient, mergeOperation: MergeOperation): Promise<ProducerRecord[]> {
await this.postgres.query(
Expand Down Expand Up @@ -745,6 +745,18 @@ export class DeferredPersonOverrideWriter {
const writer = new PersonOverrideWriter(this.postgres)

await this.postgres.transaction(PostgresUse.COMMON_WRITE, 'processPendingOverrides', async (tx) => {
const {
rows: [{ acquired }],
} = await this.postgres.query(
tx,
SQL`SELECT pg_try_advisory_xact_lock(${this.lockId}) as acquired`,
undefined,
'processPendingOverrides'
)
if (!acquired) {
throw new Error('could not acquire lock')
}

const { rows } = await this.postgres.query(
tx,
`SELECT * FROM posthog_pendingpersonoverride ORDER BY id`,
Expand Down
6 changes: 4 additions & 2 deletions plugin-server/tests/worker/ingestion/person-state.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,11 @@ const PersonOverridesModes: Record<string, PersonOverridesMode | undefined> = {
fetchPostgresPersonIdOverrides: (hub, teamId) => fetchPostgresPersonIdOverrides(hub, teamId),
},
deferred: {
getWriter: (hub) => new DeferredPersonOverrideWriter(hub.db.postgres),
// XXX: This is kind of a mess -- ideally it'd be preferable to just
// instantiate the writer once and share it
getWriter: (hub) => new DeferredPersonOverrideWriter(hub.db.postgres, 456),
fetchPostgresPersonIdOverrides: async (hub, teamId) => {
await new DeferredPersonOverrideWriter(hub.db.postgres).processPendingOverrides(hub.db.kafkaProducer)
await new DeferredPersonOverrideWriter(hub.db.postgres, 456).processPendingOverrides(hub.db.kafkaProducer)
return await fetchPostgresPersonIdOverrides(hub, teamId)
},
},
Expand Down

0 comments on commit 78bead1

Please sign in to comment.