Skip to content

Commit

Permalink
chore: refactor webhooks groupIntoBatches
Browse files Browse the repository at this point in the history
  • Loading branch information
tiina303 committed Sep 12, 2023
1 parent b2035c6 commit fe55576
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ import { eventDroppedCounter, latestOffsetTimestampGauge } from '../metrics'
require('@sentry/tracing')

// exporting only for testing
export function groupIntoBatchesWebhooks(
export function groupIntoBatchesByUsage(
array: KafkaMessage[],
batchSize: number,
actionMatcher: ActionMatcher
shouldProcess: (teamId: number) => boolean
): { eventBatch: RawClickHouseEvent[]; lastOffset: string; lastTimestamp: string }[] {
// Most events will not trigger a webhook call, so we want to filter them out as soon as possible
// to achieve the highest effective concurrency when executing the actual HTTP calls.
Expand All @@ -32,7 +32,7 @@ export function groupIntoBatchesWebhooks(
let currentCount = 0
array.forEach((message, index) => {
const clickHouseEvent = JSON.parse(message.value!.toString()) as RawClickHouseEvent
if (actionMatcher.hasWebhooks(clickHouseEvent.team_id)) {
if (shouldProcess(clickHouseEvent.team_id)) {
currentBatch.push(clickHouseEvent)
currentCount++
} else {
Expand Down Expand Up @@ -69,7 +69,9 @@ export async function eachBatchWebhooksHandlers(
const transaction = Sentry.startTransaction({ name: `eachBatchWebhooks` })

try {
const batchesWithOffsets = groupIntoBatchesWebhooks(batch.messages, concurrency, actionMatcher)
const batchesWithOffsets = groupIntoBatchesByUsage(batch.messages, concurrency, (teamId) =>
actionMatcher.hasWebhooks(teamId)
)

statsd?.histogram('ingest_event_batching.input_length', batch.messages.length, { key: key })
statsd?.histogram('ingest_event_batching.batch_count', batchesWithOffsets.length, { key: key })
Expand Down
13 changes: 5 additions & 8 deletions plugin-server/tests/main/ingestion-queues/each-batch.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import {
} from '../../../src/main/ingestion-queues/batch-processing/each-batch-onevent'
import {
eachBatchWebhooksHandlers,
groupIntoBatchesWebhooks,
groupIntoBatchesByUsage,
} from '../../../src/main/ingestion-queues/batch-processing/each-batch-webhooks'
import {
ClickHouseTimestamp,
Expand Down Expand Up @@ -333,11 +333,9 @@ describe('eachBatchX', () => {
kafkaTimestamp: '2020-02-23 00:10:00.00' as ClickHouseTimestamp,
},
])
const actionManager = new ActionManager(queue.pluginsServer.postgres)
const actionMatcher = new ActionMatcher(queue.pluginsServer.postgres, actionManager)
// mock hasWebhooks 10 calls, 1,3,10 should return false, others true
actionMatcher.hasWebhooks = jest.fn((teamId) => teamId !== 1 && teamId !== 3 && teamId !== 10)
const result = groupIntoBatchesWebhooks(batch.batch.messages, 5, actionMatcher)
// teamIDs 1,3,10 should return false, others true
const toProcess = jest.fn((teamId) => teamId !== 1 && teamId !== 3 && teamId !== 10)
const result = groupIntoBatchesByUsage(batch.batch.messages, 5, toProcess)
expect(result).toEqual([
{
eventBatch: expect.arrayContaining([
Expand Down Expand Up @@ -375,8 +373,7 @@ describe('eachBatchX', () => {
])
// make sure that if the last message would be a new batch and if it's going to be excluded we
// still get the last batch as empty with the right offsite and timestamp
actionMatcher.hasWebhooks = jest.fn((teamId) => teamId !== 1 && teamId !== 3 && teamId !== 10)
const result2 = groupIntoBatchesWebhooks(batch.batch.messages, 7, actionMatcher)
const result2 = groupIntoBatchesByUsage(batch.batch.messages, 7, toProcess)
expect(result2).toEqual([
{
eventBatch: expect.arrayContaining([
Expand Down

0 comments on commit fe55576

Please sign in to comment.