Skip to content

Commit

Permalink
chore: dual write exceptions in new step (#25343)
Browse files Browse the repository at this point in the history
  • Loading branch information
daibhin authored Oct 4, 2024
1 parent c222752 commit a1e9761
Show file tree
Hide file tree
Showing 7 changed files with 96 additions and 4 deletions.
2 changes: 2 additions & 0 deletions plugin-server/src/config/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {
KAFKA_EVENTS_JSON,
KAFKA_EVENTS_PLUGIN_INGESTION,
KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW,
KAFKA_EXCEPTION_SYMBOLIFICATION_EVENTS,
} from './kafka-topics'

export const DEFAULT_HTTP_SERVER_PORT = 6738
Expand Down Expand Up @@ -110,6 +111,7 @@ export function getDefaultConfig(): PluginsServerConfig {
CLICKHOUSE_DISABLE_EXTERNAL_SCHEMAS_TEAMS: '',
CLICKHOUSE_JSON_EVENTS_KAFKA_TOPIC: KAFKA_EVENTS_JSON,
CLICKHOUSE_HEATMAPS_KAFKA_TOPIC: KAFKA_CLICKHOUSE_HEATMAP_EVENTS,
EXCEPTIONS_SYMBOLIFICATION_KAFKA_TOPIC: KAFKA_EXCEPTION_SYMBOLIFICATION_EVENTS,
PERSON_INFO_CACHE_TTL: 5 * 60, // 5 min
KAFKA_HEALTHCHECK_SECONDS: 20,
OBJECT_STORAGE_ENABLED: true,
Expand Down
4 changes: 3 additions & 1 deletion plugin-server/src/config/kafka-topics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,10 @@ export const KAFKA_CLICKHOUSE_SESSION_REPLAY_EVENTS = `${prefix}clickhouse_sessi
export const KAFKA_PERFORMANCE_EVENTS = `${prefix}clickhouse_performance_events${suffix}`
// write heatmap events to ClickHouse
export const KAFKA_CLICKHOUSE_HEATMAP_EVENTS = `${prefix}clickhouse_heatmap_events${suffix}`
// write exception events to ClickHouse
export const KAFKA_EXCEPTION_SYMBOLIFICATION_EVENTS = `${prefix}exception_symbolification_events${suffix}`

// log entries for ingestion into clickhouse
// log entries for ingestion into ClickHouse
export const KAFKA_LOG_ENTRIES = `${prefix}log_entries${suffix}`

// CDP topics
Expand Down
5 changes: 3 additions & 2 deletions plugin-server/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,9 @@ export interface PluginsServerConfig extends CdpConfig {
CLICKHOUSE_SECURE: boolean // whether to secure ClickHouse connection
CLICKHOUSE_DISABLE_EXTERNAL_SCHEMAS: boolean // whether to disallow external schemas like protobuf for clickhouse kafka engine
CLICKHOUSE_DISABLE_EXTERNAL_SCHEMAS_TEAMS: string // (advanced) a comma separated list of teams to disable clickhouse external schemas for
CLICKHOUSE_JSON_EVENTS_KAFKA_TOPIC: string // (advanced) topic to send events to for clickhouse ingestion
CLICKHOUSE_HEATMAPS_KAFKA_TOPIC: string // (advanced) topic to send heatmap data to for clickhouse ingestion
CLICKHOUSE_JSON_EVENTS_KAFKA_TOPIC: string // (advanced) topic to send events for clickhouse ingestion
CLICKHOUSE_HEATMAPS_KAFKA_TOPIC: string // (advanced) topic to send heatmap data for clickhouse ingestion
EXCEPTIONS_SYMBOLIFICATION_KAFKA_TOPIC: string // (advanced) topic to send exception event data for stack trace processing
// Redis url pretty much only used locally / self hosted
REDIS_URL: string
// Redis params for the ingestion services
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import { RawClickHouseEvent } from '../../../types'
import { status } from '../../../utils/status'
import { EventPipelineRunner } from './runner'

export function produceExceptionSymbolificationEventStep(
runner: EventPipelineRunner,
event: RawClickHouseEvent
): Promise<[Promise<void>]> {
const ack = runner.hub.kafkaProducer
.produce({
topic: runner.hub.EXCEPTIONS_SYMBOLIFICATION_KAFKA_TOPIC,
key: event.uuid,
value: Buffer.from(JSON.stringify(event)),
waitForAck: true,
})
.catch((error) => {
status.warn('⚠️', 'Failed to produce exception event for symbolification', {
team_id: event.team_id,
uuid: event.uuid,
error,
})
throw error
})

return Promise.resolve([ack])
}
13 changes: 12 additions & 1 deletion plugin-server/src/worker/ingestion/event-pipeline/runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import { pluginsProcessEventStep } from './pluginsProcessEventStep'
import { populateTeamDataStep } from './populateTeamDataStep'
import { prepareEventStep } from './prepareEventStep'
import { processPersonsStep } from './processPersonsStep'
import { produceExceptionSymbolificationEventStep } from './produceExceptionSymbolificationEventStep'

export type EventPipelineResult = {
// Promises that the batch handler should await on before committing offsets,
Expand Down Expand Up @@ -262,8 +263,18 @@ export class EventPipelineRunner {
[this, enrichedIfErrorEvent, person, processPerson],
event.team_id
)

kafkaAcks.push(eventAck)

if (event.event === '$exception') {
const [exceptionAck] = await this.runStep(
produceExceptionSymbolificationEventStep,
[this, rawClickhouseEvent],
event.team_id
)
kafkaAcks.push(exceptionAck)
return this.registerLastStep('produceExceptionSymbolificationEventStep', [rawClickhouseEvent], kafkaAcks)
}

return this.registerLastStep('createEventStep', [rawClickhouseEvent], kafkaAcks)
}

Expand Down
48 changes: 48 additions & 0 deletions plugin-server/tests/worker/ingestion/event-pipeline/runner.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,54 @@ describe('EventPipelineRunner', () => {
])
})
})

describe('$exception events', () => {
let exceptionEvent: PipelineEvent
beforeEach(() => {
exceptionEvent = {
...pipelineEvent,
event: '$exception',
properties: {
...pipelineEvent.properties,
$heatmap_data: {
url1: ['data'],
url2: ['more data'],
},
},
}

// setup just enough mocks that the right pipeline runs

runner = new TestEventPipelineRunner(hub, exceptionEvent, new EventsProcessor(hub))

jest.mocked(populateTeamDataStep).mockResolvedValue(exceptionEvent as any)

const heatmapPreIngestionEvent = {
...preIngestionEvent,
event: '$exception',
properties: {
...exceptionEvent.properties,
},
}
jest.mocked(prepareEventStep).mockResolvedValue(heatmapPreIngestionEvent)
})

it('runs the expected steps for heatmap_data', async () => {
await runner.runEventPipeline(exceptionEvent)

expect(runner.steps).toEqual([
'populateTeamDataStep',
'pluginsProcessEventStep',
'normalizeEventStep',
'processPersonsStep',
'prepareEventStep',
'extractHeatmapDataStep',
'enrichExceptionEventStep',
'createEventStep',
'produceExceptionSymbolificationEventStep',
])
})
})
})
})

Expand Down
2 changes: 2 additions & 0 deletions posthog/kafka_client/topics.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@

KAFKA_CLICKHOUSE_HEATMAP_EVENTS = f"{KAFKA_PREFIX}clickhouse_heatmap_events{SUFFIX}"

KAFKA_EXCEPTION_SYMBOLIFICATION_EVENTS = f"{KAFKA_PREFIX}exception_symbolification_events{SUFFIX}"

# from capture to recordings consumer
KAFKA_SESSION_RECORDING_EVENTS = f"{KAFKA_PREFIX}session_recording_events{SUFFIX}"
# from capture to recordings blob ingestion consumer
Expand Down

0 comments on commit a1e9761

Please sign in to comment.