Skip to content

Commit

Permalink
chore(exports): try calling heartbeat a bit more often (#16295)
Browse files Browse the repository at this point in the history
* chore(exports): try calling heartbeat a bit more often

Looks like we end up rebalancing often. Possibly because we're not
sending the heartbeats in time and the session timing out.

* wip

* fix tests
  • Loading branch information
Harry Waye authored Jun 29, 2023
1 parent a168062 commit 055776d
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ export async function eachBatch(
}

const lastBatchMessage = messageBatch[messageBatch.length - 1]
await Promise.all(messageBatch.map((message: KafkaMessage) => eachMessage(message, queue)))
await Promise.all(
messageBatch.map((message: KafkaMessage) => eachMessage(message, queue).finally(() => heartbeat()))
)

// this if should never be false, but who can trust computers these days
if (lastBatchMessage) {
Expand Down
4 changes: 2 additions & 2 deletions plugin-server/tests/main/ingestion-queues/each-batch.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -131,15 +131,15 @@ describe('eachBatchX', () => {

describe('eachBatch', () => {
it('calls eachMessage with the correct arguments', async () => {
const eachMessage = jest.fn()
const eachMessage = jest.fn(() => Promise.resolve())
const batch = createKafkaJSBatch(event)
await eachBatch(batch, queue, eachMessage, groupIntoBatches, 'key')

expect(eachMessage).toHaveBeenCalledWith({ value: JSON.stringify(event) }, queue)
})

it('tracks metrics based on the key', async () => {
const eachMessage = jest.fn()
const eachMessage = jest.fn(() => Promise.resolve())
await eachBatch(createKafkaJSBatch(event), queue, eachMessage, groupIntoBatches, 'my_key')

expect(queue.pluginsServer.statsd.timing).toHaveBeenCalledWith(
Expand Down

0 comments on commit 055776d

Please sign in to comment.