Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(plugin-server): Preserve distinct ID locality on overflow rerouting #21358

Merged
merged 4 commits into from
Apr 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions plugin-server/src/config/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ export function getDefaultConfig(): PluginsServerConfig {
TASKS_PER_WORKER: 10,
INGESTION_CONCURRENCY: 10,
INGESTION_BATCH_SIZE: 500,
INGESTION_OVERFLOW_ENABLED: false,
INGESTION_OVERFLOW_PRESERVE_PARTITION_LOCALITY: false,
PLUGINS_DEFAULT_LOG_LEVEL: isTestEnv() ? PluginLogLevel.Full : PluginLogLevel.Log,
LOG_LEVEL: isTestEnv() ? LogLevel.Warn : LogLevel.Info,
SENTRY_DSN: null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import { Counter } from 'prom-client'
import { buildStringMatcher } from '../../config/config'
import { KAFKA_EVENTS_PLUGIN_INGESTION, prefix as KAFKA_PREFIX } from '../../config/kafka-topics'
import { Hub } from '../../types'
import { isIngestionOverflowEnabled } from '../../utils/env-utils'
import { status } from '../../utils/status'
import { eachBatchParallelIngestion, IngestionOverflowMode } from './batch-processing/each-batch-ingestion'
import { IngestionConsumer } from './kafka-queue'
Expand All @@ -24,7 +23,7 @@ export const startAnalyticsEventsIngestionConsumer = async ({
Consumes analytics events from the Kafka topic `events_plugin_ingestion`
and processes them for ingestion into ClickHouse.

Before processing, if isIngestionOverflowEnabled and an event has
Before processing, if overflow rerouting is enabled and an event has
overflowed the capacity for its (team_id, distinct_id) pair, it will not
be processed here but instead re-produced into the
`events_plugin_ingestion_overflow` topic for later processing.
Expand All @@ -47,7 +46,11 @@ export const startAnalyticsEventsIngestionConsumer = async ({
// deployment, we require an env variable to be set to confirm this before
// enabling re-production of events to the OVERFLOW topic.

const overflowMode = isIngestionOverflowEnabled() ? IngestionOverflowMode.Reroute : IngestionOverflowMode.Disabled
const overflowMode = hub.INGESTION_OVERFLOW_ENABLED
? hub.INGESTION_OVERFLOW_PRESERVE_PARTITION_LOCALITY
? IngestionOverflowMode.Reroute
: IngestionOverflowMode.RerouteRandomly
: IngestionOverflowMode.Disabled

const tokenBlockList = buildStringMatcher(hub.DROP_EVENTS_BY_TOKEN, false)
const batchHandler = async (messages: Message[], queue: IngestionConsumer): Promise<void> => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ require('@sentry/tracing')

export enum IngestionOverflowMode {
Disabled,
Reroute,
Reroute, // preserves partition locality
RerouteRandomly, // discards partition locality
ConsumeSplitByDistinctId,
ConsumeSplitEvenly,
}
Expand Down Expand Up @@ -217,7 +218,7 @@ export async function eachBatchParallelIngestion(
op: 'emitToOverflow',
data: { eventCount: splitBatch.toOverflow.length },
})
processingPromises.push(emitToOverflow(queue, splitBatch.toOverflow))
processingPromises.push(emitToOverflow(queue, splitBatch.toOverflow, overflowMode))
overflowSpan.finish()
}

Expand Down Expand Up @@ -257,14 +258,18 @@ export function computeKey(pluginEvent: PipelineEvent): string {
return `${pluginEvent.team_id ?? pluginEvent.token}:${pluginEvent.distinct_id}`
}

async function emitToOverflow(queue: IngestionConsumer, kafkaMessages: Message[]) {
async function emitToOverflow(queue: IngestionConsumer, kafkaMessages: Message[], overflowMode: IngestionOverflowMode) {
ingestionOverflowingMessagesTotal.inc(kafkaMessages.length)
const useRandomPartitioning = overflowMode === IngestionOverflowMode.RerouteRandomly
await Promise.all(
kafkaMessages.map((message) =>
queue.pluginsServer.kafkaProducer.produce({
topic: KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW,
value: message.value,
key: null, // No locality guarantees in overflow
// ``message.key`` should not be undefined here, but in the
// (extremely) unlikely event that it is, set it to ``null``
// instead as that behavior is safer.
key: useRandomPartitioning ? null : message.key ?? null,
headers: message.headers,
waitForAck: true,
})
Expand All @@ -286,6 +291,9 @@ export function splitIngestionBatch(
toProcess: [],
toOverflow: [],
}
const shouldRerouteToOverflow = [IngestionOverflowMode.Reroute, IngestionOverflowMode.RerouteRandomly].includes(
overflowMode
)

if (overflowMode === IngestionOverflowMode.ConsumeSplitEvenly) {
/**
Expand Down Expand Up @@ -314,7 +322,7 @@ export function splitIngestionBatch(

const batches: Map<string, { message: Message; pluginEvent: PipelineEvent }[]> = new Map()
for (const message of kafkaMessages) {
if (overflowMode === IngestionOverflowMode.Reroute && message.key == null) {
if (shouldRerouteToOverflow && message.key == null) {
// Overflow detected by capture, reroute to overflow topic
// Not applying tokenBlockList to save CPU. TODO: do so once token is in the message headers
output.toOverflow.push(message)
Expand All @@ -334,12 +342,8 @@ export function splitIngestionBatch(
}

const eventKey = computeKey(pluginEvent)
if (
overflowMode === IngestionOverflowMode.Reroute &&
!ConfiguredLimiter.consume(eventKey, 1, message.timestamp)
) {
if (shouldRerouteToOverflow && !ConfiguredLimiter.consume(eventKey, 1, message.timestamp)) {
// Local overflow detection triggering, reroute to overflow topic too
message.key = null
ingestionPartitionKeyOverflowed.labels(`${pluginEvent.team_id ?? pluginEvent.token}`).inc()
if (LoggingLimiter.consume(eventKey, 1)) {
status.warn('🪣', `Local overflow detection triggered on key ${eventKey}`)
Expand Down
2 changes: 2 additions & 0 deletions plugin-server/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ export interface PluginsServerConfig {
TASKS_PER_WORKER: number // number of parallel tasks per worker thread
INGESTION_CONCURRENCY: number // number of parallel event ingestion queues per batch
INGESTION_BATCH_SIZE: number // kafka consumer batch size
INGESTION_OVERFLOW_ENABLED: boolean // whether or not overflow rerouting is enabled (only used by analytics-ingestion)
INGESTION_OVERFLOW_PRESERVE_PARTITION_LOCALITY: boolean // whether or not Kafka message keys should be preserved or discarded when messages are rerouted to overflow
TASK_TIMEOUT: number // how many seconds until tasks are timed out
DATABASE_URL: string // Postgres database URL
DATABASE_READONLY_URL: string // Optional read-only replica to the main Postgres database
Expand Down
5 changes: 0 additions & 5 deletions plugin-server/src/utils/env-utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,6 @@ export const isProdEnv = (): boolean => determineNodeEnv() === NodeEnv.Productio

export const isCloud = (): boolean => !!process.env.CLOUD_DEPLOYMENT

export function isIngestionOverflowEnabled(): boolean {
const ingestionOverflowEnabled = process.env.INGESTION_OVERFLOW_ENABLED
return stringToBoolean(ingestionOverflowEnabled)
}

export function isOverflowBatchByDistinctId(): boolean {
const overflowBatchByDistinctId = process.env.INGESTION_OVERFLOW_BATCH_BY_DISTINCT_ID
return stringToBoolean(overflowBatchByDistinctId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,32 +109,28 @@ describe('eachBatchParallelIngestion with overflow reroute', () => {
expect(runEventPipeline).not.toHaveBeenCalled()
})

it('reroutes excess events to OVERFLOW topic', async () => {
const now = Date.now()
const event = captureEndpointEvent1
const [message] = createBatchWithMultipleEvents([event], now)
const consume = jest.spyOn(ConfiguredLimiter, 'consume').mockImplementation(() => false)
const produce = jest.spyOn(queue.pluginsServer.kafkaProducer, 'produce')

const tokenBlockList = buildStringMatcher('another_token,more_token', false)
await eachBatchParallelIngestion(tokenBlockList, [message], queue, IngestionOverflowMode.Reroute)

expect(consume).toHaveBeenCalledWith(
computeKey(event), // NOTE: can't use ``message.key`` here as it will already have been mutated
1,
now
)
expect(captureIngestionWarning).not.toHaveBeenCalled()
expect(produce).toHaveBeenCalledWith({
topic: KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW,
value: message.value,
key: null,
waitForAck: true,
})

// Event is not processed here
expect(runEventPipeline).not.toHaveBeenCalled()
})
it.each([IngestionOverflowMode.Reroute, IngestionOverflowMode.RerouteRandomly])(
'reroutes excess events to OVERFLOW topic (mode=%p)',
async (overflowMode) => {
const now = Date.now()
const event = captureEndpointEvent1
const [message] = createBatchWithMultipleEvents([event], now)
const consume = jest.spyOn(ConfiguredLimiter, 'consume').mockImplementation(() => false)
const produce = jest.spyOn(queue.pluginsServer.kafkaProducer, 'produce')

const tokenBlockList = buildStringMatcher('another_token,more_token', false)
await eachBatchParallelIngestion(tokenBlockList, [message], queue, overflowMode)

expect(consume).toHaveBeenCalledWith(message.key, 1, now)
expect(captureIngestionWarning).not.toHaveBeenCalled()
expect(produce).toHaveBeenCalledWith({
topic: KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW,
value: message.value,
key: overflowMode === IngestionOverflowMode.Reroute ? message.key : null,
waitForAck: true,
})
}
)

it('does not reroute if not over capacity limit', async () => {
const now = Date.now()
Expand Down
Loading