Skip to content

Commit

Permalink
Added check for pulled sessions
Browse files Browse the repository at this point in the history
  • Loading branch information
benjackwhite committed Sep 22, 2023
1 parent 0e3faf3 commit f49e34e
Showing 1 changed file with 5 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,11 @@ describe('ingester', () => {
otherIngester.onAssignPartitions([createTP(2), createTP(3)]),
]

// Should immediately be removed from the tracked sessions
expect(
Object.values(ingester.sessions).map((x) => `${x.partition}:${x.sessionId}:${x.buffer.count}`)
).toEqual(['1:session_id_1:1', '1:session_id_2:1'])

// Call the second ingester to receive the messages. The revocation should still be in progress meaning they are "paused" for a bit
// Once the revocation is complete the second ingester should receive the messages but drop most of them as they got flushes by the revoke
await otherIngester.handleEachBatch([
Expand Down

0 comments on commit f49e34e

Please sign in to comment.