Skip to content

Commit

Permalink
feat(plugin-server): Preserve distinct ID locality on overflow rerout…
Browse files Browse the repository at this point in the history
…ing (#21358)
  • Loading branch information
tkaemming authored Apr 8, 2024
1 parent 909e7a9 commit 091e725
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 44 deletions.
2 changes: 2 additions & 0 deletions plugin-server/src/config/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ export function getDefaultConfig(): PluginsServerConfig {
TASKS_PER_WORKER: 10,
INGESTION_CONCURRENCY: 10,
INGESTION_BATCH_SIZE: 500,
INGESTION_OVERFLOW_ENABLED: false,
INGESTION_OVERFLOW_PRESERVE_PARTITION_LOCALITY: false,
PLUGINS_DEFAULT_LOG_LEVEL: isTestEnv() ? PluginLogLevel.Full : PluginLogLevel.Log,
LOG_LEVEL: isTestEnv() ? LogLevel.Warn : LogLevel.Info,
SENTRY_DSN: null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import { Counter } from 'prom-client'
import { buildStringMatcher } from '../../config/config'
import { KAFKA_EVENTS_PLUGIN_INGESTION, prefix as KAFKA_PREFIX } from '../../config/kafka-topics'
import { Hub } from '../../types'
import { isIngestionOverflowEnabled } from '../../utils/env-utils'
import { status } from '../../utils/status'
import { eachBatchParallelIngestion, IngestionOverflowMode } from './batch-processing/each-batch-ingestion'
import { IngestionConsumer } from './kafka-queue'
Expand All @@ -24,7 +23,7 @@ export const startAnalyticsEventsIngestionConsumer = async ({
Consumes analytics events from the Kafka topic `events_plugin_ingestion`
and processes them for ingestion into ClickHouse.
Before processing, if isIngestionOverflowEnabled and an event has
Before processing, if overflow rerouting is enabled and an event has
overflowed the capacity for its (team_id, distinct_id) pair, it will not
be processed here but instead re-produced into the
`events_plugin_ingestion_overflow` topic for later processing.
Expand All @@ -47,7 +46,11 @@ export const startAnalyticsEventsIngestionConsumer = async ({
// deployment, we require an env variable to be set to confirm this before
// enabling re-production of events to the OVERFLOW topic.

const overflowMode = isIngestionOverflowEnabled() ? IngestionOverflowMode.Reroute : IngestionOverflowMode.Disabled
const overflowMode = hub.INGESTION_OVERFLOW_ENABLED
? hub.INGESTION_OVERFLOW_PRESERVE_PARTITION_LOCALITY
? IngestionOverflowMode.Reroute
: IngestionOverflowMode.RerouteRandomly
: IngestionOverflowMode.Disabled

const tokenBlockList = buildStringMatcher(hub.DROP_EVENTS_BY_TOKEN, false)
const batchHandler = async (messages: Message[], queue: IngestionConsumer): Promise<void> => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ require('@sentry/tracing')

export enum IngestionOverflowMode {
Disabled,
Reroute,
Reroute, // preserves partition locality
RerouteRandomly, // discards partition locality
ConsumeSplitByDistinctId,
ConsumeSplitEvenly,
}
Expand Down Expand Up @@ -217,7 +218,7 @@ export async function eachBatchParallelIngestion(
op: 'emitToOverflow',
data: { eventCount: splitBatch.toOverflow.length },
})
processingPromises.push(emitToOverflow(queue, splitBatch.toOverflow))
processingPromises.push(emitToOverflow(queue, splitBatch.toOverflow, overflowMode))
overflowSpan.finish()
}

Expand Down Expand Up @@ -257,14 +258,18 @@ export function computeKey(pluginEvent: PipelineEvent): string {
return `${pluginEvent.team_id ?? pluginEvent.token}:${pluginEvent.distinct_id}`
}

async function emitToOverflow(queue: IngestionConsumer, kafkaMessages: Message[]) {
async function emitToOverflow(queue: IngestionConsumer, kafkaMessages: Message[], overflowMode: IngestionOverflowMode) {
ingestionOverflowingMessagesTotal.inc(kafkaMessages.length)
const useRandomPartitioning = overflowMode === IngestionOverflowMode.RerouteRandomly
await Promise.all(
kafkaMessages.map((message) =>
queue.pluginsServer.kafkaProducer.produce({
topic: KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW,
value: message.value,
key: null, // No locality guarantees in overflow
// ``message.key`` should not be undefined here, but in the
// (extremely) unlikely event that it is, set it to ``null``
// instead as that behavior is safer.
key: useRandomPartitioning ? null : message.key ?? null,
headers: message.headers,
waitForAck: true,
})
Expand All @@ -286,6 +291,9 @@ export function splitIngestionBatch(
toProcess: [],
toOverflow: [],
}
const shouldRerouteToOverflow = [IngestionOverflowMode.Reroute, IngestionOverflowMode.RerouteRandomly].includes(
overflowMode
)

if (overflowMode === IngestionOverflowMode.ConsumeSplitEvenly) {
/**
Expand Down Expand Up @@ -314,7 +322,7 @@ export function splitIngestionBatch(

const batches: Map<string, { message: Message; pluginEvent: PipelineEvent }[]> = new Map()
for (const message of kafkaMessages) {
if (overflowMode === IngestionOverflowMode.Reroute && message.key == null) {
if (shouldRerouteToOverflow && message.key == null) {
// Overflow detected by capture, reroute to overflow topic
// Not applying tokenBlockList to save CPU. TODO: do so once token is in the message headers
output.toOverflow.push(message)
Expand All @@ -334,12 +342,8 @@ export function splitIngestionBatch(
}

const eventKey = computeKey(pluginEvent)
if (
overflowMode === IngestionOverflowMode.Reroute &&
!ConfiguredLimiter.consume(eventKey, 1, message.timestamp)
) {
if (shouldRerouteToOverflow && !ConfiguredLimiter.consume(eventKey, 1, message.timestamp)) {
// Local overflow detection triggering, reroute to overflow topic too
message.key = null
ingestionPartitionKeyOverflowed.labels(`${pluginEvent.team_id ?? pluginEvent.token}`).inc()
if (LoggingLimiter.consume(eventKey, 1)) {
status.warn('🪣', `Local overflow detection triggered on key ${eventKey}`)
Expand Down
2 changes: 2 additions & 0 deletions plugin-server/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ export interface PluginsServerConfig {
TASKS_PER_WORKER: number // number of parallel tasks per worker thread
INGESTION_CONCURRENCY: number // number of parallel event ingestion queues per batch
INGESTION_BATCH_SIZE: number // kafka consumer batch size
INGESTION_OVERFLOW_ENABLED: boolean // whether or not overflow rerouting is enabled (only used by analytics-ingestion)
INGESTION_OVERFLOW_PRESERVE_PARTITION_LOCALITY: boolean // whether or not Kafka message keys should be preserved or discarded when messages are rerouted to overflow
TASK_TIMEOUT: number // how many seconds until tasks are timed out
DATABASE_URL: string // Postgres database URL
DATABASE_READONLY_URL: string // Optional read-only replica to the main Postgres database
Expand Down
5 changes: 0 additions & 5 deletions plugin-server/src/utils/env-utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,6 @@ export const isProdEnv = (): boolean => determineNodeEnv() === NodeEnv.Productio

export const isCloud = (): boolean => !!process.env.CLOUD_DEPLOYMENT

export function isIngestionOverflowEnabled(): boolean {
const ingestionOverflowEnabled = process.env.INGESTION_OVERFLOW_ENABLED
return stringToBoolean(ingestionOverflowEnabled)
}

export function isOverflowBatchByDistinctId(): boolean {
const overflowBatchByDistinctId = process.env.INGESTION_OVERFLOW_BATCH_BY_DISTINCT_ID
return stringToBoolean(overflowBatchByDistinctId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,32 +109,28 @@ describe('eachBatchParallelIngestion with overflow reroute', () => {
expect(runEventPipeline).not.toHaveBeenCalled()
})

it('reroutes excess events to OVERFLOW topic', async () => {
const now = Date.now()
const event = captureEndpointEvent1
const [message] = createBatchWithMultipleEvents([event], now)
const consume = jest.spyOn(ConfiguredLimiter, 'consume').mockImplementation(() => false)
const produce = jest.spyOn(queue.pluginsServer.kafkaProducer, 'produce')

const tokenBlockList = buildStringMatcher('another_token,more_token', false)
await eachBatchParallelIngestion(tokenBlockList, [message], queue, IngestionOverflowMode.Reroute)

expect(consume).toHaveBeenCalledWith(
computeKey(event), // NOTE: can't use ``message.key`` here as it will already have been mutated
1,
now
)
expect(captureIngestionWarning).not.toHaveBeenCalled()
expect(produce).toHaveBeenCalledWith({
topic: KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW,
value: message.value,
key: null,
waitForAck: true,
})

// Event is not processed here
expect(runEventPipeline).not.toHaveBeenCalled()
})
it.each([IngestionOverflowMode.Reroute, IngestionOverflowMode.RerouteRandomly])(
'reroutes excess events to OVERFLOW topic (mode=%p)',
async (overflowMode) => {
const now = Date.now()
const event = captureEndpointEvent1
const [message] = createBatchWithMultipleEvents([event], now)
const consume = jest.spyOn(ConfiguredLimiter, 'consume').mockImplementation(() => false)
const produce = jest.spyOn(queue.pluginsServer.kafkaProducer, 'produce')

const tokenBlockList = buildStringMatcher('another_token,more_token', false)
await eachBatchParallelIngestion(tokenBlockList, [message], queue, overflowMode)

expect(consume).toHaveBeenCalledWith(message.key, 1, now)
expect(captureIngestionWarning).not.toHaveBeenCalled()
expect(produce).toHaveBeenCalledWith({
topic: KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW,
value: message.value,
key: overflowMode === IngestionOverflowMode.Reroute ? message.key : null,
waitForAck: true,
})
}
)

it('does not reroute if not over capacity limit', async () => {
const now = Date.now()
Expand Down

0 comments on commit 091e725

Please sign in to comment.