Skip to content

Commit

Permalink
Add test for out of order processing -- not sure yet if this will be …
Browse files Browse the repository at this point in the history
…needed
  • Loading branch information
tkaemming committed Dec 1, 2023
1 parent 603090f commit 4b66c1d
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 5 deletions.
11 changes: 6 additions & 5 deletions plugin-server/src/worker/ingestion/person-state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -751,17 +751,18 @@ export class DeferredPersonOverrideWriter {
undefined,
'processPendingOverrides'
)
const results = rows.map(async ({ id, ...mergeOperation }) => {
const messages = await writer.addPersonOverride(tx, mergeOperation)

const messages: ProducerRecord[] = []
for (const { id, ...mergeOperation } of rows) {
messages.push(...(await writer.addPersonOverride(tx, mergeOperation)))
await this.postgres.query(
tx,
SQL`DELETE FROM posthog_pendingpersonoverride WHERE id = ${id}`,
undefined,
'processPendingOverrides'
)
return messages
})
const messages = (await Promise.all(results)).flat()
}

await kafkaProducer.queueMessages(messages, true)
})
}
Expand Down
44 changes: 44 additions & 0 deletions plugin-server/tests/worker/ingestion/person-state.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2137,3 +2137,47 @@ describe('PersonState.update()', () => {
})
})
})

describe('DeferredPersonOverrideWriter', () => {
let hub: Hub
let closeHub: () => Promise<void>

let teamId: number
let organizationId: string

beforeAll(async () => {
;[hub, closeHub] = await createHub({})
organizationId = await createOrganization(hub.db.postgres)
})

beforeEach(async () => {
teamId = await createTeam(hub.db.postgres, organizationId)
})

afterAll(async () => {
await closeHub()
})

it('handles out of order merges correctly', async () => {
const a = 'aaaaaaaa-0000-0000-0000-000000000000'
const b = 'bbbbbbbb-0000-0000-0000-000000000000'
const c = 'cccccccc-0000-0000-0000-000000000000'

const writer = new DeferredPersonOverrideWriter(hub.db.postgres)

const mergeDefaults = { team_id: teamId, oldest_event: DateTime.fromMillis(0) }

await hub.db.postgres.transaction(PostgresUse.COMMON_WRITE, '', async (tx) => {
await writer.addPersonOverride(tx, { old_person_id: b, override_person_id: c, ...mergeDefaults })
await writer.addPersonOverride(tx, { old_person_id: a, override_person_id: b, ...mergeDefaults })
})

await writer.processPendingOverrides(hub.db.kafkaProducer)

const overrides = await fetchPostgresPersonIdOverrides(hub, teamId)
expect(overrides).toEqual([
[a, c],
[b, c],
])
})
})

0 comments on commit 4b66c1d

Please sign in to comment.