diff --git a/plugin-server/src/worker/ingestion/person-state.ts b/plugin-server/src/worker/ingestion/person-state.ts index bb21ad8f1a894b..e8328181fed4e7 100644 --- a/plugin-server/src/worker/ingestion/person-state.ts +++ b/plugin-server/src/worker/ingestion/person-state.ts @@ -751,17 +751,18 @@ export class DeferredPersonOverrideWriter { undefined, 'processPendingOverrides' ) - const results = rows.map(async ({ id, ...mergeOperation }) => { - const messages = await writer.addPersonOverride(tx, mergeOperation) + + const messages: ProducerRecord[] = [] + for (const { id, ...mergeOperation } of rows) { + messages.push(...(await writer.addPersonOverride(tx, mergeOperation))) await this.postgres.query( tx, SQL`DELETE FROM posthog_pendingpersonoverride WHERE id = ${id}`, undefined, 'processPendingOverrides' ) - return messages - }) - const messages = (await Promise.all(results)).flat() + } + await kafkaProducer.queueMessages(messages, true) }) } diff --git a/plugin-server/tests/worker/ingestion/person-state.test.ts b/plugin-server/tests/worker/ingestion/person-state.test.ts index 49966e8b77ae12..5d6390bfeb2ef6 100644 --- a/plugin-server/tests/worker/ingestion/person-state.test.ts +++ b/plugin-server/tests/worker/ingestion/person-state.test.ts @@ -2137,3 +2137,47 @@ describe('PersonState.update()', () => { }) }) }) + +describe('DeferredPersonOverrideWriter', () => { + let hub: Hub + let closeHub: () => Promise + + let teamId: number + let organizationId: string + + beforeAll(async () => { + ;[hub, closeHub] = await createHub({}) + organizationId = await createOrganization(hub.db.postgres) + }) + + beforeEach(async () => { + teamId = await createTeam(hub.db.postgres, organizationId) + }) + + afterAll(async () => { + await closeHub() + }) + + it('handles out of order merges correctly', async () => { + const a = 'aaaaaaaa-0000-0000-0000-000000000000' + const b = 'bbbbbbbb-0000-0000-0000-000000000000' + const c = 'cccccccc-0000-0000-0000-000000000000' + + const writer = new DeferredPersonOverrideWriter(hub.db.postgres) + + const mergeDefaults = { team_id: teamId, oldest_event: DateTime.fromMillis(0) } + + await hub.db.postgres.transaction(PostgresUse.COMMON_WRITE, '', async (tx) => { + await writer.addPersonOverride(tx, { old_person_id: b, override_person_id: c, ...mergeDefaults }) + await writer.addPersonOverride(tx, { old_person_id: a, override_person_id: b, ...mergeDefaults }) + }) + + await writer.processPendingOverrides(hub.db.kafkaProducer) + + const overrides = await fetchPostgresPersonIdOverrides(hub, teamId) + expect(overrides).toEqual([ + [a, c], + [b, c], + ]) + }) +})