Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(plugin-server): Preserve distinct ID locality on overflow rerouting #20945

Merged
merged 6 commits into from
Apr 2, 2024
2 changes: 2 additions & 0 deletions plugin-server/src/config/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ export function getDefaultConfig(): PluginsServerConfig {
TASKS_PER_WORKER: 10,
INGESTION_CONCURRENCY: 10,
INGESTION_BATCH_SIZE: 500,
INGESTION_OVERFLOW_ENABLED: false,
INGESTION_OVERFLOW_PRESERVE_PARTITION_LOCALITY: false,
PLUGINS_DEFAULT_LOG_LEVEL: isTestEnv() ? PluginLogLevel.Full : PluginLogLevel.Log,
LOG_LEVEL: isTestEnv() ? LogLevel.Warn : LogLevel.Info,
SENTRY_DSN: null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import { Counter } from 'prom-client'
import { buildStringMatcher } from '../../config/config'
import { KAFKA_EVENTS_PLUGIN_INGESTION, prefix as KAFKA_PREFIX } from '../../config/kafka-topics'
import { Hub } from '../../types'
import { isIngestionOverflowEnabled } from '../../utils/env-utils'
import { status } from '../../utils/status'
import { eachBatchParallelIngestion, IngestionOverflowMode } from './batch-processing/each-batch-ingestion'
import { IngestionConsumer } from './kafka-queue'
Expand All @@ -24,7 +23,7 @@ export const startAnalyticsEventsIngestionConsumer = async ({
Consumes analytics events from the Kafka topic `events_plugin_ingestion`
and processes them for ingestion into ClickHouse.

Before processing, if isIngestionOverflowEnabled and an event has
Before processing, if overflow rerouting is enabled and an event has
overflowed the capacity for its (team_id, distinct_id) pair, it will not
be processed here but instead re-produced into the
`events_plugin_ingestion_overflow` topic for later processing.
Expand All @@ -47,7 +46,11 @@ export const startAnalyticsEventsIngestionConsumer = async ({
// deployment, we require an env variable to be set to confirm this before
// enabling re-production of events to the OVERFLOW topic.

const overflowMode = isIngestionOverflowEnabled() ? IngestionOverflowMode.Reroute : IngestionOverflowMode.Disabled
const overflowMode = hub.INGESTION_OVERFLOW_ENABLED
? hub.INGESTION_OVERFLOW_PRESERVE_PARTITION_LOCALITY
? IngestionOverflowMode.Reroute
: IngestionOverflowMode.RerouteRandomly
: IngestionOverflowMode.Disabled

const tokenBlockList = buildStringMatcher(hub.DROP_EVENTS_BY_TOKEN, false)
const batchHandler = async (messages: Message[], queue: IngestionConsumer): Promise<void> => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ require('@sentry/tracing')

export enum IngestionOverflowMode {
Disabled,
Reroute,
Reroute, // preserves partition locality
RerouteRandomly, // discards partition locality
ConsumeSplitByDistinctId,
ConsumeSplitEvenly,
}
Expand Down Expand Up @@ -217,7 +218,9 @@ export async function eachBatchParallelIngestion(
op: 'emitToOverflow',
data: { eventCount: splitBatch.toOverflow.length },
})
processingPromises.push(emitToOverflow(queue, splitBatch.toOverflow))
processingPromises.push(
emitToOverflow(queue, splitBatch.toOverflow, overflowMode === IngestionOverflowMode.RerouteRandomly)
)
overflowSpan.finish()
}

Expand Down Expand Up @@ -257,14 +260,14 @@ function computeKey(pluginEvent: PipelineEvent): string {
return `${pluginEvent.team_id ?? pluginEvent.token}:${pluginEvent.distinct_id}`
}

async function emitToOverflow(queue: IngestionConsumer, kafkaMessages: Message[]) {
async function emitToOverflow(queue: IngestionConsumer, kafkaMessages: Message[], useRandomPartitioner: boolean) {
ingestionOverflowingMessagesTotal.inc(kafkaMessages.length)
await Promise.all(
kafkaMessages.map((message) =>
queue.pluginsServer.kafkaProducer.produce({
topic: KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW,
value: message.value,
key: null, // No locality guarantees in overflow
key: useRandomPartitioner ? undefined : message.key,
Copy link
Contributor

@tiina303 tiina303 Apr 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there's a change here where we set it to undefined instead of null

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I can tell, this was the issue.

The produce call here eventually forwards to a HighLevelProducer instance, which does some extra stuff with the key in produce. If the key ends up being undefined it just quietly doesn't produce the message, and the callback is never called.

This is also the behavior that I saw testing this manually in the node REPL:

> const { HighLevelProducer } = require('node-rdkafka')
undefined
> const p = new HighLevelProducer({'bootstrap.servers': 'kafka:9092'})
undefined
> p.connect()
// truncated
> > p.produce('garbage-topic', null, Buffer.from('message'), 'key', undefined, (...a) => { console.log('callback:', a) })
undefined
> callback: [ null, 1 ]
> p.produce('garbage-topic', null, Buffer.from('message'), undefined, undefined, (...a) => { console.log('callback:', a) })
undefined
// nothing ever happens here

What I figure happened is that any message that should have been routed to overflow never resolved it's promise, and the consumers simply stopped making forward progress once they saw a batch containing one of those messages.

The key property is typed as MessageKey, which does include undefined, and the HighLevelProducer.produce signature accepts any, which explains why this wasn't caught by the type checker.

headers: message.headers,
waitForAck: true,
})
Expand All @@ -286,6 +289,9 @@ export function splitIngestionBatch(
toProcess: [],
toOverflow: [],
}
const shouldRerouteToOverflow = [IngestionOverflowMode.Reroute, IngestionOverflowMode.RerouteRandomly].includes(
overflowMode
)

if (overflowMode === IngestionOverflowMode.ConsumeSplitEvenly) {
/**
Expand Down Expand Up @@ -314,7 +320,7 @@ export function splitIngestionBatch(

const batches: Map<string, { message: Message; pluginEvent: PipelineEvent }[]> = new Map()
for (const message of kafkaMessages) {
if (overflowMode === IngestionOverflowMode.Reroute && message.key == null) {
if (shouldRerouteToOverflow && message.key == null) {
// Overflow detected by capture, reroute to overflow topic
// Not applying tokenBlockList to save CPU. TODO: do so once token is in the message headers
output.toOverflow.push(message)
Expand All @@ -334,12 +340,8 @@ export function splitIngestionBatch(
}

const eventKey = computeKey(pluginEvent)
if (
overflowMode === IngestionOverflowMode.Reroute &&
!ConfiguredLimiter.consume(eventKey, 1, message.timestamp)
) {
if (shouldRerouteToOverflow && !ConfiguredLimiter.consume(eventKey, 1, message.timestamp)) {
// Local overflow detection triggering, reroute to overflow topic too
message.key = null
ingestionPartitionKeyOverflowed.labels(`${pluginEvent.team_id ?? pluginEvent.token}`).inc()
if (LoggingLimiter.consume(eventKey, 1)) {
status.warn('🪣', `Local overflow detection triggered on key ${eventKey}`)
Expand Down
2 changes: 2 additions & 0 deletions plugin-server/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ export interface PluginsServerConfig {
TASKS_PER_WORKER: number // number of parallel tasks per worker thread
INGESTION_CONCURRENCY: number // number of parallel event ingestion queues per batch
INGESTION_BATCH_SIZE: number // kafka consumer batch size
INGESTION_OVERFLOW_ENABLED: boolean // whether or not overflow rerouting is enabled (only used by analytics-ingestion)
INGESTION_OVERFLOW_PRESERVE_PARTITION_LOCALITY: boolean // whether or not Kafka message keys should be preserved or discarded when messages are rerouted to overflow
TASK_TIMEOUT: number // how many seconds until tasks are timed out
DATABASE_URL: string // Postgres database URL
DATABASE_READONLY_URL: string // Optional read-only replica to the main Postgres database
Expand Down
5 changes: 0 additions & 5 deletions plugin-server/src/utils/env-utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,6 @@ export const isProdEnv = (): boolean => determineNodeEnv() === NodeEnv.Productio

export const isCloud = (): boolean => !!process.env.CLOUD_DEPLOYMENT

export function isIngestionOverflowEnabled(): boolean {
const ingestionOverflowEnabled = process.env.INGESTION_OVERFLOW_ENABLED
return stringToBoolean(ingestionOverflowEnabled)
}

export function isOverflowBatchByDistinctId(): boolean {
const overflowBatchByDistinctId = process.env.INGESTION_OVERFLOW_BATCH_BY_DISTINCT_ID
return stringToBoolean(overflowBatchByDistinctId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,32 +107,35 @@ describe('eachBatchParallelIngestion with overflow reroute', () => {
expect(runEventPipeline).not.toHaveBeenCalled()
})

it('reroutes excess events to OVERFLOW topic', async () => {
const now = Date.now()
const batch = createBatchWithMultipleEventsWithKeys([captureEndpointEvent1], now)
const consume = jest.spyOn(ConfiguredLimiter, 'consume').mockImplementation(() => false)

const tokenBlockList = buildStringMatcher('another_token,more_token', false)
await eachBatchParallelIngestion(tokenBlockList, batch, queue, IngestionOverflowMode.Reroute)

expect(consume).toHaveBeenCalledWith(
captureEndpointEvent1['token'] + ':' + captureEndpointEvent1['distinct_id'],
1,
now
)
expect(captureIngestionWarning).not.toHaveBeenCalled()
expect(queue.pluginsServer.kafkaProducer.produce).toHaveBeenCalledWith({
topic: KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW,
value: JSON.stringify(captureEndpointEvent1),
timestamp: captureEndpointEvent1['timestamp'],
offset: captureEndpointEvent1['offset'],
key: null,
waitForAck: true,
})
it.each([IngestionOverflowMode.Reroute, IngestionOverflowMode.RerouteRandomly])(
'reroutes excess events to OVERFLOW topic (mode=%p)',
async (overflowMode) => {
const now = Date.now()
const batch = createBatchWithMultipleEventsWithKeys([captureEndpointEvent1], now)
const consume = jest.spyOn(ConfiguredLimiter, 'consume').mockImplementation(() => false)

const tokenBlockList = buildStringMatcher('another_token,more_token', false)
await eachBatchParallelIngestion(tokenBlockList, batch, queue, overflowMode)

expect(consume).toHaveBeenCalledWith(
captureEndpointEvent1['token'] + ':' + captureEndpointEvent1['distinct_id'],
1,
now
)
expect(captureIngestionWarning).not.toHaveBeenCalled()
expect(queue.pluginsServer.kafkaProducer.produce).toHaveBeenCalledWith({
topic: KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW,
value: JSON.stringify(captureEndpointEvent1),
timestamp: captureEndpointEvent1['timestamp'],
offset: captureEndpointEvent1['offset'],
key: overflowMode === IngestionOverflowMode.Reroute ? batch[0].key : undefined,
waitForAck: true,
})

// Event is not processed here
expect(runEventPipeline).not.toHaveBeenCalled()
})
// Event is not processed here
expect(runEventPipeline).not.toHaveBeenCalled()
}
)

it('does not reroute if not over capacity limit', async () => {
const now = Date.now()
Expand Down
Loading