Skip to content

Commit

Permalink
feat(ingestion): add DROP_EVENTS_BY_TOKEN option (#18822)
Browse files Browse the repository at this point in the history
  • Loading branch information
xvello authored Nov 23, 2023
1 parent d8f479a commit abb8c95
Show file tree
Hide file tree
Showing 10 changed files with 221 additions and 65 deletions.
16 changes: 16 additions & 0 deletions plugin-server/src/config/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ export function getDefaultConfig(): PluginsServerConfig {
CLOUD_DEPLOYMENT: null,
EXTERNAL_REQUEST_TIMEOUT_MS: 10 * 1000, // 10 seconds
DROP_EVENTS_BY_TOKEN_DISTINCT_ID: '',
DROP_EVENTS_BY_TOKEN: '',
POE_EMBRACE_JOIN_FOR_TEAMS: '',
RELOAD_PLUGIN_JITTER_MAX_MS: 60000,

Expand Down Expand Up @@ -245,3 +246,18 @@ export function buildIntegerMatcher(config: string | undefined, allowStar: boole
}
}
}

export function buildStringMatcher(config: string | undefined, allowStar: boolean): ValueMatcher<string> {
// Builds a ValueMatcher on a comma-separated list of values.
// Optionally, supports a '*' value to match everything
if (!config || config.trim().length == 0) {
return () => false
} else if (allowStar && config === '*') {
return () => true
} else {
const values = new Set(config.split(','))
return (v: string) => {
return values.has(v)
}
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { Message } from 'node-rdkafka'
import { Counter } from 'prom-client'

import { buildStringMatcher } from '../../config/config'
import { KAFKA_EVENTS_PLUGIN_INGESTION, prefix as KAFKA_PREFIX } from '../../config/kafka-topics'
import { Hub } from '../../types'
import { isIngestionOverflowEnabled } from '../../utils/env-utils'
Expand Down Expand Up @@ -47,8 +48,10 @@ export const startAnalyticsEventsIngestionConsumer = async ({
// enabling re-production of events to the OVERFLOW topic.

const overflowMode = isIngestionOverflowEnabled() ? IngestionOverflowMode.Reroute : IngestionOverflowMode.Disabled

const tokenBlockList = buildStringMatcher(hub.DROP_EVENTS_BY_TOKEN, false)
const batchHandler = async (messages: Message[], queue: IngestionConsumer): Promise<void> => {
await eachBatchParallelIngestion(messages, queue, overflowMode)
await eachBatchParallelIngestion(tokenBlockList, messages, queue, overflowMode)
}

const queue = new IngestionConsumer(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { Message } from 'node-rdkafka'

import { buildStringMatcher } from '../../config/config'
import { KAFKA_EVENTS_PLUGIN_INGESTION_HISTORICAL, prefix as KAFKA_PREFIX } from '../../config/kafka-topics'
import { Hub } from '../../types'
import { status } from '../../utils/status'
Expand All @@ -24,8 +25,9 @@ export const startAnalyticsEventsIngestionHistoricalConsumer = async ({
We don't want to move events to overflow from here, it's fine for the processing to
take longer, but we want the locality constraints to be respected like normal ingestion.
*/
const tokenBlockList = buildStringMatcher(hub.DROP_EVENTS_BY_TOKEN, false)
const batchHandler = async (messages: Message[], queue: IngestionConsumer): Promise<void> => {
await eachBatchParallelIngestion(messages, queue, IngestionOverflowMode.Disabled)
await eachBatchParallelIngestion(tokenBlockList, messages, queue, IngestionOverflowMode.Disabled)
}

const queue = new IngestionConsumer(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { Message } from 'node-rdkafka'

import { buildStringMatcher } from '../../config/config'
import { KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW, prefix as KAFKA_PREFIX } from '../../config/kafka-topics'
import { Hub } from '../../types'
import { status } from '../../utils/status'
Expand Down Expand Up @@ -29,9 +30,9 @@ export const startAnalyticsEventsIngestionOverflowConsumer = async ({
// workloads ran on the same process they would share the same consumer
// group id. In these cases, updating to this version will result in the
// re-exporting of events still in Kafka `clickhouse_events_json` topic.

const tokenBlockList = buildStringMatcher(hub.DROP_EVENTS_BY_TOKEN, false)
const batchHandler = async (messages: Message[], queue: IngestionConsumer): Promise<void> => {
await eachBatchParallelIngestion(messages, queue, IngestionOverflowMode.Consume)
await eachBatchParallelIngestion(tokenBlockList, messages, queue, IngestionOverflowMode.Consume)
}

const queue = new IngestionConsumer(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import * as Sentry from '@sentry/node'
import { Message, MessageHeader } from 'node-rdkafka'

import { KAFKA_EVENTS_PLUGIN_INGESTION_DLQ, KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW } from '../../../config/kafka-topics'
import { Hub, PipelineEvent } from '../../../types'
import { Hub, PipelineEvent, ValueMatcher } from '../../../types'
import { formPipelineEvent } from '../../../utils/event'
import { retryIfRetriable } from '../../../utils/retries'
import { status } from '../../../utils/status'
Expand All @@ -11,7 +11,7 @@ import { EventPipelineResult, runEventPipeline } from '../../../worker/ingestion
import { captureIngestionWarning } from '../../../worker/ingestion/utils'
import { ingestionPartitionKeyOverflowed } from '../analytics-events-ingestion-consumer'
import { IngestionConsumer } from '../kafka-queue'
import { latestOffsetTimestampGauge } from '../metrics'
import { eventDroppedCounter, latestOffsetTimestampGauge } from '../metrics'
import {
ingestionOverflowingMessagesTotal,
ingestionParallelism,
Expand Down Expand Up @@ -95,6 +95,7 @@ async function handleProcessingError(
}

export async function eachBatchParallelIngestion(
tokenBlockList: ValueMatcher<string>,
messages: Message[],
queue: IngestionConsumer,
overflowMode: IngestionOverflowMode
Expand All @@ -112,7 +113,7 @@ export async function eachBatchParallelIngestion(
* and a separate array for single messages, but let's look at profiles before optimizing.
*/
const prepareSpan = transaction.startChild({ op: 'prepareBatch' })
const splitBatch = splitIngestionBatch(messages, overflowMode)
const splitBatch = splitIngestionBatch(tokenBlockList, messages, overflowMode)
splitBatch.toProcess.sort((a, b) => a.length - b.length)

queue.pluginsServer.statsd?.histogram('ingest_event_batching.input_length', messages.length, {
Expand Down Expand Up @@ -280,6 +281,7 @@ async function emitToOverflow(queue: IngestionConsumer, kafkaMessages: Message[]
}

export function splitIngestionBatch(
tokenBlockList: ValueMatcher<string>,
kafkaMessages: Message[],
overflowMode: IngestionOverflowMode
): IngestionSplitBatch {
Expand All @@ -300,18 +302,44 @@ export function splitIngestionBatch(
* so we just return batches of one to increase concurrency.
* TODO: add a PipelineEvent[] field to IngestionSplitBatch for batches of 1
*/
output.toProcess = kafkaMessages.map((m) => new Array({ message: m, pluginEvent: formPipelineEvent(m) }))
for (const message of kafkaMessages) {
// Drop based on a token blocklist
const pluginEvent = formPipelineEvent(message)
if (pluginEvent.token && tokenBlockList(pluginEvent.token)) {
eventDroppedCounter
.labels({
event_type: 'analytics',
drop_cause: 'blocked_token',
})
.inc()
continue
}
output.toProcess.push(new Array({ message: message, pluginEvent }))
}
return output
}

const batches: Map<string, { message: Message; pluginEvent: PipelineEvent }[]> = new Map()
for (const message of kafkaMessages) {
if (overflowMode === IngestionOverflowMode.Reroute && message.key == null) {
// Overflow detected by capture, reroute to overflow topic
// Not applying tokenBlockList to save CPU. TODO: do so once token is in the message headers
output.toOverflow.push(message)
continue
}
const pluginEvent = formPipelineEvent(message)

// Drop based on a token blocklist
if (pluginEvent.token && tokenBlockList(pluginEvent.token)) {
eventDroppedCounter
.labels({
event_type: 'analytics',
drop_cause: 'blocked_token',
})
.inc()
continue
}

const eventKey = computeKey(pluginEvent)
if (
overflowMode === IngestionOverflowMode.Reroute &&
Expand Down
1 change: 1 addition & 0 deletions plugin-server/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ export interface PluginsServerConfig {
CLOUD_DEPLOYMENT: string | null
EXTERNAL_REQUEST_TIMEOUT_MS: number
DROP_EVENTS_BY_TOKEN_DISTINCT_ID: string
DROP_EVENTS_BY_TOKEN: string
POE_EMBRACE_JOIN_FOR_TEAMS: string
RELOAD_PLUGIN_JITTER_MAX_MS: number

Expand Down
29 changes: 28 additions & 1 deletion plugin-server/tests/config.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { buildIntegerMatcher, getDefaultConfig, overrideWithEnv } from '../src/config/config'
import { buildIntegerMatcher, buildStringMatcher, getDefaultConfig, overrideWithEnv } from '../src/config/config'

describe('config', () => {
test('overrideWithEnv 1', () => {
Expand Down Expand Up @@ -91,3 +91,30 @@ describe('buildIntegerMatcher', () => {
expect(matcher(5)).toBe(false)
})
})

describe('buildStringMatcher', () => {
test('empty input', () => {
const matcher = buildStringMatcher('', false)
expect(matcher('b')).toBe(false)
})
test('ignores star star when not allowed', () => {
const matcher = buildStringMatcher('*', false)
expect(matcher('b')).toBe(false)
})
test('matches star when allowed', () => {
const matcher = buildStringMatcher('*', true)
expect(matcher('b')).toBe(true)
})
test('can match on a single value', () => {
const matcher = buildStringMatcher('b', true)
expect(matcher('b')).toBe(true)
expect(matcher('a')).toBe(false)
})
test('can match on several values', () => {
const matcher = buildStringMatcher('b,c,d', true)
expect(matcher('b')).toBe(true)
expect(matcher('c')).toBe(true)
expect(matcher('d')).toBe(true)
expect(matcher('e')).toBe(false)
})
})
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { buildStringMatcher } from '../../../src/config/config'
import { KAFKA_EVENTS_PLUGIN_INGESTION, KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW } from '../../../src/config/kafka-topics'
import {
eachBatchParallelIngestion,
Expand All @@ -14,7 +15,7 @@ jest.mock('./../../../src/worker/ingestion/event-pipeline/runner', () => ({
runEventPipeline: jest.fn().mockResolvedValue('default value'),
}))

const captureEndpointEvent = {
const captureEndpointEvent1 = {
uuid: 'uuid1',
distinct_id: 'id',
ip: null,
Expand All @@ -23,7 +24,21 @@ const captureEndpointEvent = {
event: 'event',
properties: {},
}),
team_id: 1,
token: 'mytoken',
now: null,
sent_at: null,
}

const captureEndpointEvent2 = {
uuid: 'uuid2',
distinct_id: 'id',
ip: null,
site_url: '',
data: JSON.stringify({
event: 'event',
properties: {},
}),
token: 'othertoken',
now: null,
sent_at: null,
}
Expand Down Expand Up @@ -67,24 +82,26 @@ describe('eachBatchParallelIngestion with overflow reroute', () => {
{
partition: 0,
topic: KAFKA_EVENTS_PLUGIN_INGESTION,
value: JSON.stringify(captureEndpointEvent),
timestamp: captureEndpointEvent['timestamp'],
offset: captureEndpointEvent['offset'],
value: JSON.stringify(captureEndpointEvent1),
timestamp: captureEndpointEvent1['timestamp'],
offset: captureEndpointEvent1['offset'],
key: null,
token: 'ok',
},
]

const consume = jest.spyOn(ConfiguredLimiter, 'consume').mockImplementation(() => false)

await eachBatchParallelIngestion(batch, queue, IngestionOverflowMode.Reroute)
const tokenBlockList = buildStringMatcher('another_token,more_token', false)
await eachBatchParallelIngestion(tokenBlockList, batch, queue, IngestionOverflowMode.Reroute)

expect(consume).not.toHaveBeenCalled()
expect(captureIngestionWarning).not.toHaveBeenCalled()
expect(queue.pluginsServer.kafkaProducer.produce).toHaveBeenCalledWith({
topic: KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW,
value: JSON.stringify(captureEndpointEvent),
timestamp: captureEndpointEvent['timestamp'],
offset: captureEndpointEvent['offset'],
value: JSON.stringify(captureEndpointEvent1),
timestamp: captureEndpointEvent1['timestamp'],
offset: captureEndpointEvent1['offset'],
key: null,
waitForAck: true,
})
Expand All @@ -95,22 +112,23 @@ describe('eachBatchParallelIngestion with overflow reroute', () => {

it('reroutes excess events to OVERFLOW topic', async () => {
const now = Date.now()
const batch = createBatchWithMultipleEventsWithKeys([captureEndpointEvent], now)
const batch = createBatchWithMultipleEventsWithKeys([captureEndpointEvent1], now)
const consume = jest.spyOn(ConfiguredLimiter, 'consume').mockImplementation(() => false)

await eachBatchParallelIngestion(batch, queue, IngestionOverflowMode.Reroute)
const tokenBlockList = buildStringMatcher('another_token,more_token', false)
await eachBatchParallelIngestion(tokenBlockList, batch, queue, IngestionOverflowMode.Reroute)

expect(consume).toHaveBeenCalledWith(
captureEndpointEvent['team_id'] + ':' + captureEndpointEvent['distinct_id'],
captureEndpointEvent1['token'] + ':' + captureEndpointEvent1['distinct_id'],
1,
now
)
expect(captureIngestionWarning).not.toHaveBeenCalled()
expect(queue.pluginsServer.kafkaProducer.produce).toHaveBeenCalledWith({
topic: KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW,
value: JSON.stringify(captureEndpointEvent),
timestamp: captureEndpointEvent['timestamp'],
offset: captureEndpointEvent['offset'],
value: JSON.stringify(captureEndpointEvent1),
timestamp: captureEndpointEvent1['timestamp'],
offset: captureEndpointEvent1['offset'],
key: null,
waitForAck: true,
})
Expand All @@ -121,19 +139,47 @@ describe('eachBatchParallelIngestion with overflow reroute', () => {

it('does not reroute if not over capacity limit', async () => {
const now = Date.now()
const batch = createBatchWithMultipleEventsWithKeys([captureEndpointEvent], now)
const batch = createBatchWithMultipleEventsWithKeys([captureEndpointEvent1, captureEndpointEvent2], now)
const consume = jest.spyOn(ConfiguredLimiter, 'consume').mockImplementation(() => true)

await eachBatchParallelIngestion(batch, queue, IngestionOverflowMode.Reroute)
const tokenBlockList = buildStringMatcher('another_token,more_token', false)
await eachBatchParallelIngestion(tokenBlockList, batch, queue, IngestionOverflowMode.Reroute)

expect(consume).toHaveBeenCalledWith(
captureEndpointEvent['team_id'] + ':' + captureEndpointEvent['distinct_id'],
captureEndpointEvent1['token'] + ':' + captureEndpointEvent1['distinct_id'],
1,
now
)
expect(consume).toHaveBeenCalledWith(
captureEndpointEvent2['token'] + ':' + captureEndpointEvent2['distinct_id'],
1,
now
)
expect(captureIngestionWarning).not.toHaveBeenCalled()
expect(queue.pluginsServer.kafkaProducer.produce).not.toHaveBeenCalled()
// Event is processed
expect(runEventPipeline).toHaveBeenCalled()
expect(runEventPipeline).toHaveBeenCalledTimes(2)
})

it('does drop events from blocked tokens', async () => {
const now = Date.now()
const batch = createBatchWithMultipleEventsWithKeys(
[captureEndpointEvent1, captureEndpointEvent2, captureEndpointEvent1],
now
)
const consume = jest.spyOn(ConfiguredLimiter, 'consume').mockImplementation(() => true)

const tokenBlockList = buildStringMatcher('mytoken,another_token', false)
await eachBatchParallelIngestion(tokenBlockList, batch, queue, IngestionOverflowMode.Reroute)

// Event captureEndpointEvent1 is dropped , captureEndpointEvent2 goes though
expect(consume).toHaveBeenCalledWith(
captureEndpointEvent2['token'] + ':' + captureEndpointEvent2['distinct_id'],
1,
now
)
expect(captureIngestionWarning).not.toHaveBeenCalled()
expect(queue.pluginsServer.kafkaProducer.produce).not.toHaveBeenCalled()
expect(runEventPipeline).toHaveBeenCalledTimes(1)
})
})
Loading

0 comments on commit abb8c95

Please sign in to comment.