Skip to content

Commit

Permalink
feat: Log ingest warning on messages that are too large (#18318)
Browse files Browse the repository at this point in the history
  • Loading branch information
tkaemming authored Nov 3, 2023
1 parent 0d8c8fa commit 9ade506
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ const WARNING_TYPE_TO_DESCRIPTION = {
ignored_invalid_timestamp: 'Ignored an invalid timestamp, event was still ingested',
event_timestamp_in_future: 'An event was sent more than 23 hours in the future',
ingestion_capacity_overflow: 'Event ingestion has overflowed capacity',
message_size_too_large: 'Discarded event exceeding 1MB limit',
}

const WARNING_TYPE_RENDERER = {
Expand Down Expand Up @@ -126,6 +127,19 @@ const WARNING_TYPE_RENDERER = {
</>
)
},
message_size_too_large: function Render(warning: IngestionWarning): JSX.Element {
const details = warning.details as {
eventUuid: string
distinctId: string
}
return (
<>
Discarded event for distinct_id{' '}
<Link to={urls.personByDistinctId(details.distinctId)}>{details.distinctId}</Link> that exceeded 1MB in
size after processing (event uuid: <code>{details.eventUuid}</code>)
</>
)
},
}

export function IngestionWarningsView(): JSX.Element {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,24 @@ import { waitForExpect } from '../expectations'
let kafka: Kafka
let organizationId: string

let dlq: KafkaMessage[]
let dlqConsumer: Consumer
let warningMessages: KafkaMessage[]
let warningConsumer: Consumer

beforeAll(async () => {
kafka = new Kafka({ brokers: [defaultConfig.KAFKA_HOSTS], logLevel: logLevel.NOTHING })

// Make sure the dlq topic exists before starting the consumer
// Make sure the ingest warnings topic exists before starting the consumer
const admin = kafka.admin()
await admin.createTopics({ topics: [{ topic: 'events_plugin_ingestion_dlq' }] })
const topic = 'clickhouse_ingestion_warnings' // note: functional tests don't use _test suffix as in config
await admin.createTopics({ topics: [{ topic: topic }] })
await admin.disconnect()

dlq = []
dlqConsumer = kafka.consumer({ groupId: 'events_plugin_ingestion_test' })
await dlqConsumer.subscribe({ topic: 'events_plugin_ingestion_dlq', fromBeginning: true })
await dlqConsumer.run({
warningMessages = []
warningConsumer = kafka.consumer({ groupId: 'events_plugin_ingestion_test' })
await warningConsumer.subscribe({ topic: topic, fromBeginning: true })
await warningConsumer.run({
eachMessage: ({ message }) => {
dlq.push(message)
warningMessages.push(message)
return Promise.resolve()
},
})
Expand All @@ -33,19 +34,19 @@ beforeAll(async () => {
})

afterAll(async () => {
await dlqConsumer.disconnect()
await warningConsumer.disconnect()
})

test.concurrent('consumer handles messages just over 1MB gracefully', async () => {
test.concurrent('consumer produces ingest warnings for messages over 1MB', async () => {
// For this we basically want the plugin-server to try and produce a new
// message larger than 1MB. We do this by creating a person with a lot of
// properties. We will end up denormalizing the person properties onto the
// event, which already has the properties as $set therefore resulting in a
// message that's larger than 1MB. There may also be other attributes that
// are added to the event which pushes it over the limit.
//
// We verify that at least some error has happened by checking that there is
// a message in the DLQ.
// We verify that this is handled by checking that there is a message in the
// appropriate topic.
const token = new UUIDT().toString()
const teamId = await createTeam(organizationId, undefined, token)
const distinctId = new UUIDT().toString()
Expand All @@ -68,12 +69,16 @@ test.concurrent('consumer handles messages just over 1MB gracefully', async () =
properties: personProperties,
})

// Verify we have a message in the DLQ, along a Sentry event id in the
// header `sentry-event-id`.
const message = await waitForExpect(() => {
const [message] = dlq.filter((message) => message.headers?.['event-id']?.toString() === personEventUuid)
// Verify we have a message corresponding to the input event.
await waitForExpect(() => {
const [message] = warningMessages.filter((message: KafkaMessage) => {
if (message.value) {
const payload = JSON.parse(message.value.toString())
const details = JSON.parse(payload.details)
return details.eventUuid === personEventUuid && details.distinctId === distinctId
}
})
expect(message).toBeDefined()
return message
})
expect(message.headers?.['sentry-event-id']).toBeDefined()
})
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { Hub, PipelineEvent } from '../../../types'
import { formPipelineEvent } from '../../../utils/event'
import { retryIfRetriable } from '../../../utils/retries'
import { status } from '../../../utils/status'
import { ConfiguredLimiter, LoggingLimiter, WarningLimiter } from '../../../utils/token-bucket'
import { ConfiguredLimiter, LoggingLimiter, OverflowWarningLimiter } from '../../../utils/token-bucket'
import { EventPipelineResult, runEventPipeline } from '../../../worker/ingestion/event-pipeline/runner'
import { captureIngestionWarning } from '../../../worker/ingestion/utils'
import { ingestionPartitionKeyOverflowed } from '../analytics-events-ingestion-consumer'
Expand Down Expand Up @@ -139,7 +139,7 @@ export async function eachBatchParallelIngestion(
if (overflowMode == IngestionOverflowMode.Consume && currentBatch.length > 0) {
const team = await queue.pluginsServer.teamManager.getTeamForEvent(currentBatch[0].pluginEvent)
const distinct_id = currentBatch[0].pluginEvent.distinct_id
if (team && WarningLimiter.consume(`${team.id}:${distinct_id}`, 1)) {
if (team && OverflowWarningLimiter.consume(`${team.id}:${distinct_id}`, 1)) {
processingPromises.push(
captureIngestionWarning(queue.pluginsServer.db, team.id, 'ingestion_capacity_overflow', {
overflowDistinctId: distinct_id,
Expand Down
4 changes: 3 additions & 1 deletion plugin-server/src/utils/token-bucket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ export const ConfiguredLimiter: Limiter = new Limiter(
defaultConfig.EVENT_OVERFLOW_BUCKET_REPLENISH_RATE
)

export const WarningLimiter: Limiter = new Limiter(1, 1.0 / 3600)
export const OverflowWarningLimiter: Limiter = new Limiter(1, 1.0 / 3600)

export const MessageSizeTooLargeWarningLimiter: Limiter = new Limiter(1, 1.0 / 300)

export const LoggingLimiter: Limiter = new Limiter(1, 1.0 / 60)
29 changes: 23 additions & 6 deletions plugin-server/src/worker/ingestion/process-event.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@ import {
} from '../../types'
import { DB, GroupId } from '../../utils/db/db'
import { elementsToString, extractElements } from '../../utils/db/elements-chain'
import { MessageSizeTooLarge } from '../../utils/db/error'
import { KafkaProducerWrapper } from '../../utils/db/kafka-producer-wrapper'
import { safeClickhouseString, sanitizeEventName, timeoutGuard } from '../../utils/db/utils'
import { status } from '../../utils/status'
import { MessageSizeTooLargeWarningLimiter } from '../../utils/token-bucket'
import { castTimestampOrNow, UUID } from '../../utils/utils'
import { GroupTypeManager } from './group-type-manager'
import { addGroupProperties } from './groups'
Expand Down Expand Up @@ -225,12 +227,27 @@ export class EventsProcessor {
...groupsColumns,
}

const ack = this.kafkaProducer.produce({
topic: this.pluginsServer.CLICKHOUSE_JSON_EVENTS_KAFKA_TOPIC,
key: uuid,
value: Buffer.from(JSON.stringify(rawEvent)),
waitForAck: true,
})
const ack = this.kafkaProducer
.produce({
topic: this.pluginsServer.CLICKHOUSE_JSON_EVENTS_KAFKA_TOPIC,
key: uuid,
value: Buffer.from(JSON.stringify(rawEvent)),
waitForAck: true,
})
.catch(async (error) => {
// Some messages end up significantly larger than the original
// after plugin processing, person & group enrichment, etc.
if (error instanceof MessageSizeTooLarge) {
if (MessageSizeTooLargeWarningLimiter.consume(`${teamId}`, 1)) {
await captureIngestionWarning(this.db, teamId, 'message_size_too_large', {
eventUuid: uuid,
distinctId: distinctId,
})
}
} else {
throw error
}
})

return [rawEvent, ack]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import {
eachBatchParallelIngestion,
IngestionOverflowMode,
} from '../../../src/main/ingestion-queues/batch-processing/each-batch-ingestion'
import { WarningLimiter } from '../../../src/utils/token-bucket'
import { OverflowWarningLimiter } from '../../../src/utils/token-bucket'
import { captureIngestionWarning } from './../../../src/worker/ingestion/utils'

jest.mock('../../../src/utils/status')
Expand Down Expand Up @@ -62,7 +62,7 @@ describe('eachBatchParallelIngestion with overflow consume', () => {

it('raises ingestion warning when consuming from overflow', async () => {
const batch = createBatchWithMultipleEventsWithKeys([captureEndpointEvent])
const consume = jest.spyOn(WarningLimiter, 'consume').mockImplementation(() => true)
const consume = jest.spyOn(OverflowWarningLimiter, 'consume').mockImplementation(() => true)

queue.pluginsServer.teamManager.getTeamForEvent.mockResolvedValueOnce({ id: 1 })
await eachBatchParallelIngestion(batch, queue, IngestionOverflowMode.Consume)
Expand All @@ -87,7 +87,7 @@ describe('eachBatchParallelIngestion with overflow consume', () => {

it('does not raise ingestion warning when under threshold', async () => {
const batch = createBatchWithMultipleEventsWithKeys([captureEndpointEvent])
const consume = jest.spyOn(WarningLimiter, 'consume').mockImplementation(() => false)
const consume = jest.spyOn(OverflowWarningLimiter, 'consume').mockImplementation(() => false)

queue.pluginsServer.teamManager.getTeamForEvent.mockResolvedValueOnce({ id: 1 })
await eachBatchParallelIngestion(batch, queue, IngestionOverflowMode.Consume)
Expand Down

0 comments on commit 9ade506

Please sign in to comment.