Skip to content

Commit

Permalink
Ensure advisory lock is held before proceeding with processing overri…
Browse files Browse the repository at this point in the history
…des.
  • Loading branch information
tkaemming committed Dec 1, 2023
1 parent 5bd9d73 commit 206e787
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 1 deletion.
2 changes: 1 addition & 1 deletion plugin-server/tests/helpers/promises.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ export function createPromise<T = void>(): MockPromise<T> {
return result as MockPromise<T>
}

export class Event {
export class WaitEvent {
private promise: Promise<void>
private resolve: () => void

Expand Down
54 changes: 54 additions & 0 deletions plugin-server/tests/worker/ingestion/person-state.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import {
PersonState,
} from '../../../src/worker/ingestion/person-state'
import { delayUntilEventIngested } from '../../helpers/clickhouse'
import { WaitEvent } from '../../helpers/promises'
import { createOrganization, createTeam, fetchPostgresPersons, insertRow } from '../../helpers/sql'

jest.setTimeout(5000) // 5 sec timeout
Expand Down Expand Up @@ -2139,3 +2140,56 @@ describe('PersonState.update()', () => {
})
})
})

describe('DeferredPersonOverrideWriter', () => {
let hub: Hub
let closeHub: () => Promise<void>

const lockId = 456
let writer: DeferredPersonOverrideWriter

beforeAll(async () => {
;[hub, closeHub] = await createHub({})
writer = new DeferredPersonOverrideWriter(hub.db.postgres, lockId)
})

afterAll(async () => {
await closeHub()
})

it('ensures advisory lock is held before processing', async () => {
const { postgres, kafkaProducer } = hub.db

let acquiredLock: boolean
const tryLockComplete = new WaitEvent()
const readyToReleaseLock = new WaitEvent()

const transactionHolder = postgres
.transaction(PostgresUse.COMMON_WRITE, '', async (tx) => {
const { rows } = await postgres.query(
tx,
`SELECT pg_try_advisory_lock(${lockId}) as acquired, pg_backend_pid()`,
undefined,
''
)
;[{ acquired: acquiredLock }] = rows
tryLockComplete.set()
await readyToReleaseLock.wait()
})
.then(() => {
acquiredLock = false
})

try {
await tryLockComplete.wait()
expect(acquiredLock!).toBe(true)
await expect(writer.processPendingOverrides(kafkaProducer)).rejects.toThrow(Error('could not acquire lock'))
} finally {
readyToReleaseLock.set()
await transactionHolder
}

expect(acquiredLock!).toBe(false)
await expect(writer.processPendingOverrides(kafkaProducer)).resolves.not.toThrow()
})
})

0 comments on commit 206e787

Please sign in to comment.