Skip to content

Commit

Permalink
chore: refactor webhooks groupIntoBatches (#17395)
Browse files Browse the repository at this point in the history
  • Loading branch information
tiina303 authored Sep 13, 2023
1 parent 3b1f919 commit a75f09d
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,18 @@ import { EachBatchPayload, KafkaMessage } from 'kafkajs'
import { RawClickHouseEvent } from '../../../types'
import { convertToIngestionEvent } from '../../../utils/event'
import { status } from '../../../utils/status'
import { groupIntoBatches } from '../../../utils/utils'
import { runInstrumentedFunction } from '../../utils'
import { KafkaJSIngestionConsumer } from '../kafka-queue'
import { eventDroppedCounter, latestOffsetTimestampGauge } from '../metrics'
import { eachBatchHandlerHelper } from './each-batch-webhooks'

// Must require as `tsc` strips unused `import` statements and just requiring this seems to init some globals
require('@sentry/tracing')

export async function eachMessageAppsOnEventHandlers(
message: KafkaMessage,
clickHouseEvent: RawClickHouseEvent,
queue: KafkaJSIngestionConsumer
): Promise<void> {
const clickHouseEvent = JSON.parse(message.value!.toString()) as RawClickHouseEvent

const pluginConfigs = queue.pluginsServer.pluginConfigsPerTeam.get(clickHouseEvent.team_id)
if (pluginConfigs) {
// Elements parsing can be extremely slow, so we skip it for some plugins
Expand Down Expand Up @@ -50,7 +48,14 @@ export async function eachBatchAppsOnEventHandlers(
payload: EachBatchPayload,
queue: KafkaJSIngestionConsumer
): Promise<void> {
await eachBatch(payload, queue, eachMessageAppsOnEventHandlers, groupIntoBatches, 'async_handlers_on_event')
await eachBatchHandlerHelper(
payload,
(teamId) => queue.pluginsServer.pluginConfigsPerTeam.has(teamId),
(event) => eachMessageAppsOnEventHandlers(event, queue),
queue.pluginsServer.statsd,
queue.pluginsServer.WORKER_CONCURRENCY * queue.pluginsServer.TASKS_PER_WORKER,
'on_event'
)
}

export async function eachBatch(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ import { eventDroppedCounter, latestOffsetTimestampGauge } from '../metrics'
require('@sentry/tracing')

// exporting only for testing
export function groupIntoBatchesWebhooks(
export function groupIntoBatchesByUsage(
array: KafkaMessage[],
batchSize: number,
actionMatcher: ActionMatcher
shouldProcess: (teamId: number) => boolean
): { eventBatch: RawClickHouseEvent[]; lastOffset: string; lastTimestamp: string }[] {
// Most events will not trigger a webhook call, so we want to filter them out as soon as possible
// to achieve the highest effective concurrency when executing the actual HTTP calls.
Expand All @@ -32,7 +32,7 @@ export function groupIntoBatchesWebhooks(
let currentCount = 0
array.forEach((message, index) => {
const clickHouseEvent = JSON.parse(message.value!.toString()) as RawClickHouseEvent
if (actionMatcher.hasWebhooks(clickHouseEvent.team_id)) {
if (shouldProcess(clickHouseEvent.team_id)) {
currentBatch.push(clickHouseEvent)
currentCount++
} else {
Expand All @@ -58,18 +58,36 @@ export async function eachBatchWebhooksHandlers(
hookCannon: HookCommander,
statsd: StatsD | undefined,
concurrency: number
): Promise<void> {
await eachBatchHandlerHelper(
payload,
(teamId) => actionMatcher.hasWebhooks(teamId),
(event) => eachMessageWebhooksHandlers(event, actionMatcher, hookCannon, statsd),
statsd,
concurrency,
'webhooks'
)
}

export async function eachBatchHandlerHelper(
payload: EachBatchPayload,
shouldProcess: (teamId: number) => boolean,
eachMessageHandler: (event: RawClickHouseEvent) => Promise<void>,
statsd: StatsD | undefined,
concurrency: number,
stats_key: string
): Promise<void> {
// similar to eachBatch function in each-batch.ts, but without the dependency on the KafkaJSIngestionConsumer
// & handling the different batching return type
const key = 'async_handlers_webhooks'
const key = `async_handlers_${stats_key}`
const batchStartTimer = new Date()
const loggingKey = `each_batch_${key}`
const { batch, resolveOffset, heartbeat, commitOffsetsIfNecessary, isRunning, isStale }: EachBatchPayload = payload

const transaction = Sentry.startTransaction({ name: `eachBatchWebhooks` })
const transaction = Sentry.startTransaction({ name: `eachBatch${stats_key}` })

try {
const batchesWithOffsets = groupIntoBatchesWebhooks(batch.messages, concurrency, actionMatcher)
const batchesWithOffsets = groupIntoBatchesByUsage(batch.messages, concurrency, shouldProcess)

statsd?.histogram('ingest_event_batching.input_length', batch.messages.length, { key: key })
statsd?.histogram('ingest_event_batching.batch_count', batchesWithOffsets.length, { key: key })
Expand All @@ -88,9 +106,7 @@ export async function eachBatchWebhooksHandlers(
}

await Promise.all(
eventBatch.map((event: RawClickHouseEvent) =>
eachMessageWebhooksHandlers(event, actionMatcher, hookCannon, statsd).finally(() => heartbeat())
)
eventBatch.map((event: RawClickHouseEvent) => eachMessageHandler(event).finally(() => heartbeat()))
)

resolveOffset(lastOffset)
Expand Down
8 changes: 0 additions & 8 deletions plugin-server/src/utils/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -312,14 +312,6 @@ export function escapeClickHouseString(string: string): string {
return string.replace(/\\/g, '\\\\').replace(/'/g, "\\'")
}

export function groupIntoBatches<T>(array: T[], batchSize: number): T[][] {
const batches = []
for (let i = 0; i < array.length; i += batchSize) {
batches.push(array.slice(i, i + batchSize))
}
return batches
}

/** Standardize JS code used internally to form without extraneous indentation. Template literal function. */
export function code(strings: TemplateStringsArray): string {
const stringsConcat = strings.join('…')
Expand Down
39 changes: 6 additions & 33 deletions plugin-server/tests/main/ingestion-queues/each-batch.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,10 @@ import {
eachBatchLegacyIngestion,
splitKafkaJSIngestionBatch,
} from '../../../src/main/ingestion-queues/batch-processing/each-batch-ingestion-kafkajs'
import {
eachBatch,
eachBatchAppsOnEventHandlers,
} from '../../../src/main/ingestion-queues/batch-processing/each-batch-onevent'
import { eachBatchAppsOnEventHandlers } from '../../../src/main/ingestion-queues/batch-processing/each-batch-onevent'
import {
eachBatchWebhooksHandlers,
groupIntoBatchesWebhooks,
groupIntoBatchesByUsage,
} from '../../../src/main/ingestion-queues/batch-processing/each-batch-webhooks'
import {
ClickHouseTimestamp,
Expand All @@ -24,7 +21,6 @@ import {
PostIngestionEvent,
RawClickHouseEvent,
} from '../../../src/types'
import { groupIntoBatches } from '../../../src/utils/utils'
import { ActionManager } from '../../../src/worker/ingestion/action-manager'
import { ActionMatcher } from '../../../src/worker/ingestion/action-matcher'
import { HookCommander } from '../../../src/worker/ingestion/hooks'
Expand Down Expand Up @@ -150,26 +146,6 @@ describe('eachBatchX', () => {
}
})

describe('eachBatch', () => {
it('calls eachMessage with the correct arguments', async () => {
const eachMessage = jest.fn(() => Promise.resolve())
const batch = createKafkaJSBatch(event)
await eachBatch(batch, queue, eachMessage, groupIntoBatches, 'key')

expect(eachMessage).toHaveBeenCalledWith({ value: JSON.stringify(event) }, queue)
})

it('tracks metrics based on the key', async () => {
const eachMessage = jest.fn(() => Promise.resolve())
await eachBatch(createKafkaJSBatch(event), queue, eachMessage, groupIntoBatches, 'my_key')

expect(queue.pluginsServer.statsd.timing).toHaveBeenCalledWith(
'kafka_queue.each_batch_my_key',
expect.any(Date)
)
})
})

describe('eachBatchAppsOnEventHandlers', () => {
it('calls runAppsOnEventPipeline when useful', async () => {
queue.pluginsServer.pluginConfigsPerTeam.set(2, [pluginConfig39])
Expand Down Expand Up @@ -333,11 +309,9 @@ describe('eachBatchX', () => {
kafkaTimestamp: '2020-02-23 00:10:00.00' as ClickHouseTimestamp,
},
])
const actionManager = new ActionManager(queue.pluginsServer.postgres)
const actionMatcher = new ActionMatcher(queue.pluginsServer.postgres, actionManager)
// mock hasWebhooks 10 calls, 1,3,10 should return false, others true
actionMatcher.hasWebhooks = jest.fn((teamId) => teamId !== 1 && teamId !== 3 && teamId !== 10)
const result = groupIntoBatchesWebhooks(batch.batch.messages, 5, actionMatcher)
// teamIDs 1,3,10 should return false, others true
const toProcess = jest.fn((teamId) => teamId !== 1 && teamId !== 3 && teamId !== 10)
const result = groupIntoBatchesByUsage(batch.batch.messages, 5, toProcess)
expect(result).toEqual([
{
eventBatch: expect.arrayContaining([
Expand Down Expand Up @@ -375,8 +349,7 @@ describe('eachBatchX', () => {
])
// make sure that if the last message would be a new batch and if it's going to be excluded we
// still get the last batch as empty with the right offsite and timestamp
actionMatcher.hasWebhooks = jest.fn((teamId) => teamId !== 1 && teamId !== 3 && teamId !== 10)
const result2 = groupIntoBatchesWebhooks(batch.batch.messages, 7, actionMatcher)
const result2 = groupIntoBatchesByUsage(batch.batch.messages, 7, toProcess)
expect(result2).toEqual([
{
eventBatch: expect.arrayContaining([
Expand Down

0 comments on commit a75f09d

Please sign in to comment.