diff --git a/plugin-server/tests/helpers/promises.ts b/plugin-server/tests/helpers/promises.ts index 3abd8dc082553..4daea9156aa02 100644 --- a/plugin-server/tests/helpers/promises.ts +++ b/plugin-server/tests/helpers/promises.ts @@ -14,7 +14,7 @@ export function createPromise(): MockPromise { return result as MockPromise } -export class Event { +export class WaitEvent { private promise: Promise private resolve: () => void diff --git a/plugin-server/tests/worker/ingestion/person-state.test.ts b/plugin-server/tests/worker/ingestion/person-state.test.ts index b535250033a5c..daf7c51fff579 100644 --- a/plugin-server/tests/worker/ingestion/person-state.test.ts +++ b/plugin-server/tests/worker/ingestion/person-state.test.ts @@ -14,6 +14,7 @@ import { PersonState, } from '../../../src/worker/ingestion/person-state' import { delayUntilEventIngested } from '../../helpers/clickhouse' +import { WaitEvent } from '../../helpers/promises' import { createOrganization, createTeam, fetchPostgresPersons, insertRow } from '../../helpers/sql' jest.setTimeout(5000) // 5 sec timeout @@ -2139,3 +2140,56 @@ describe('PersonState.update()', () => { }) }) }) + +describe('DeferredPersonOverrideWriter', () => { + let hub: Hub + let closeHub: () => Promise + + const lockId = 456 + let writer: DeferredPersonOverrideWriter + + beforeAll(async () => { + ;[hub, closeHub] = await createHub({}) + writer = new DeferredPersonOverrideWriter(hub.db.postgres, lockId) + }) + + afterAll(async () => { + await closeHub() + }) + + it('ensures advisory lock is held before processing', async () => { + const { postgres, kafkaProducer } = hub.db + + let acquiredLock: boolean + const tryLockComplete = new WaitEvent() + const readyToReleaseLock = new WaitEvent() + + const transactionHolder = postgres + .transaction(PostgresUse.COMMON_WRITE, '', async (tx) => { + const { rows } = await postgres.query( + tx, + `SELECT pg_try_advisory_lock(${lockId}) as acquired, pg_backend_pid()`, + undefined, + '' + ) + ;[{ acquired: acquiredLock }] = rows + tryLockComplete.set() + await readyToReleaseLock.wait() + }) + .then(() => { + acquiredLock = false + }) + + try { + await tryLockComplete.wait() + expect(acquiredLock!).toBe(true) + await expect(writer.processPendingOverrides(kafkaProducer)).rejects.toThrow(Error('could not acquire lock')) + } finally { + readyToReleaseLock.set() + await transactionHolder + } + + expect(acquiredLock!).toBe(false) + await expect(writer.processPendingOverrides(kafkaProducer)).resolves.not.toThrow() + }) +})