From 78bead12152d5fe129bb29a9ba2794e7025e5831 Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Thu, 30 Nov 2023 19:18:30 -0800 Subject: [PATCH] Add processing lock. --- plugin-server/src/worker/ingestion/person-state.ts | 14 +++++++++++++- .../tests/worker/ingestion/person-state.test.ts | 6 ++++-- 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/plugin-server/src/worker/ingestion/person-state.ts b/plugin-server/src/worker/ingestion/person-state.ts index e8328181fed4e7..50c81a0dfee789 100644 --- a/plugin-server/src/worker/ingestion/person-state.ts +++ b/plugin-server/src/worker/ingestion/person-state.ts @@ -717,7 +717,7 @@ export class PersonOverrideWriter { } export class DeferredPersonOverrideWriter { - constructor(private postgres: PostgresRouter) {} + constructor(private postgres: PostgresRouter, private lockId: number) {} public async addPersonOverride(tx: TransactionClient, mergeOperation: MergeOperation): Promise<ProducerRecord[]> { await this.postgres.query( @@ -745,6 +745,18 @@ export class DeferredPersonOverrideWriter { const writer = new PersonOverrideWriter(this.postgres) await this.postgres.transaction(PostgresUse.COMMON_WRITE, 'processPendingOverrides', async (tx) => { + const { + rows: [{ acquired }], + } = await this.postgres.query( + tx, + SQL`SELECT pg_try_advisory_xact_lock(${this.lockId}) as acquired`, + undefined, + 'processPendingOverrides' + ) + if (!acquired) { + throw new Error('could not acquire lock') + } + const { rows } = await this.postgres.query( tx, `SELECT * FROM posthog_pendingpersonoverride ORDER BY id`, diff --git a/plugin-server/tests/worker/ingestion/person-state.test.ts b/plugin-server/tests/worker/ingestion/person-state.test.ts index 5d6390bfeb2ef6..362157f5f0c370 100644 --- a/plugin-server/tests/worker/ingestion/person-state.test.ts +++ b/plugin-server/tests/worker/ingestion/person-state.test.ts @@ -69,9 +69,11 @@ const PersonOverridesModes: Record<string, PersonOverridesMode | undefined> = { fetchPostgresPersonIdOverrides: (hub, teamId) => fetchPostgresPersonIdOverrides(hub, teamId), }, deferred: { - getWriter: (hub) => new DeferredPersonOverrideWriter(hub.db.postgres), + // XXX: This is kind of a mess -- ideally it'd be preferable to just + // instantiate the writer once and share it + getWriter: (hub) => new DeferredPersonOverrideWriter(hub.db.postgres, 456), fetchPostgresPersonIdOverrides: async (hub, teamId) => { - await new DeferredPersonOverrideWriter(hub.db.postgres).processPendingOverrides(hub.db.kafkaProducer) + await new DeferredPersonOverrideWriter(hub.db.postgres, 456).processPendingOverrides(hub.db.kafkaProducer) return await fetchPostgresPersonIdOverrides(hub, teamId) }, },