From 055776d461f7f00c55a76d6758e472dfdd588f73 Mon Sep 17 00:00:00 2001 From: Harry Waye Date: Thu, 29 Jun 2023 13:55:31 +0100 Subject: [PATCH] chore(exports): try calling heartbeat a bit more often (#16295) * chore(exports): try calling heartbeat a bit more often Looks like we end up rebalancing often. Possibly because we're not sending the heartbeats in time and the session timing out. * wip * fix tests --- .../src/main/ingestion-queues/batch-processing/each-batch.ts | 4 +++- plugin-server/tests/main/ingestion-queues/each-batch.test.ts | 4 ++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/plugin-server/src/main/ingestion-queues/batch-processing/each-batch.ts b/plugin-server/src/main/ingestion-queues/batch-processing/each-batch.ts index 47ade9d273f52..9a9d57782aae8 100644 --- a/plugin-server/src/main/ingestion-queues/batch-processing/each-batch.ts +++ b/plugin-server/src/main/ingestion-queues/batch-processing/each-batch.ts @@ -47,7 +47,9 @@ export async function eachBatch( } const lastBatchMessage = messageBatch[messageBatch.length - 1] - await Promise.all(messageBatch.map((message: KafkaMessage) => eachMessage(message, queue))) + await Promise.all( + messageBatch.map((message: KafkaMessage) => eachMessage(message, queue).finally(() => heartbeat())) + ) // this if should never be false, but who can trust computers these days if (lastBatchMessage) { diff --git a/plugin-server/tests/main/ingestion-queues/each-batch.test.ts b/plugin-server/tests/main/ingestion-queues/each-batch.test.ts index 32777f2d832f2..dac49117c4815 100644 --- a/plugin-server/tests/main/ingestion-queues/each-batch.test.ts +++ b/plugin-server/tests/main/ingestion-queues/each-batch.test.ts @@ -131,7 +131,7 @@ describe('eachBatchX', () => { describe('eachBatch', () => { it('calls eachMessage with the correct arguments', async () => { - const eachMessage = jest.fn() + const eachMessage = jest.fn(() => Promise.resolve()) const batch = createKafkaJSBatch(event) await eachBatch(batch, queue, eachMessage, groupIntoBatches, 'key') @@ -139,7 +139,7 @@ describe('eachBatchX', () => { }) it('tracks metrics based on the key', async () => { - const eachMessage = jest.fn() + const eachMessage = jest.fn(() => Promise.resolve()) await eachBatch(createKafkaJSBatch(event), queue, eachMessage, groupIntoBatches, 'my_key') expect(queue.pluginsServer.statsd.timing).toHaveBeenCalledWith(