From a1e9761b3ad18ea78fe2f7589aba485b59b2a648 Mon Sep 17 00:00:00 2001 From: David Newell Date: Fri, 4 Oct 2024 15:09:04 +0100 Subject: [PATCH] chore: dual write exceptions in new step (#25343) --- plugin-server/src/config/config.ts | 2 + plugin-server/src/config/kafka-topics.ts | 4 +- plugin-server/src/types.ts | 5 +- ...roduceExceptionSymbolificationEventStep.ts | 26 ++++++++++ .../worker/ingestion/event-pipeline/runner.ts | 13 ++++- .../ingestion/event-pipeline/runner.test.ts | 48 +++++++++++++++++++ posthog/kafka_client/topics.py | 2 + 7 files changed, 96 insertions(+), 4 deletions(-) create mode 100644 plugin-server/src/worker/ingestion/event-pipeline/produceExceptionSymbolificationEventStep.ts diff --git a/plugin-server/src/config/config.ts b/plugin-server/src/config/config.ts index c87b83fea6a0c..8d8b98ca3f5d9 100644 --- a/plugin-server/src/config/config.ts +++ b/plugin-server/src/config/config.ts @@ -6,6 +6,7 @@ import { KAFKA_EVENTS_JSON, KAFKA_EVENTS_PLUGIN_INGESTION, KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW, + KAFKA_EXCEPTION_SYMBOLIFICATION_EVENTS, } from './kafka-topics' export const DEFAULT_HTTP_SERVER_PORT = 6738 @@ -110,6 +111,7 @@ export function getDefaultConfig(): PluginsServerConfig { CLICKHOUSE_DISABLE_EXTERNAL_SCHEMAS_TEAMS: '', CLICKHOUSE_JSON_EVENTS_KAFKA_TOPIC: KAFKA_EVENTS_JSON, CLICKHOUSE_HEATMAPS_KAFKA_TOPIC: KAFKA_CLICKHOUSE_HEATMAP_EVENTS, + EXCEPTIONS_SYMBOLIFICATION_KAFKA_TOPIC: KAFKA_EXCEPTION_SYMBOLIFICATION_EVENTS, PERSON_INFO_CACHE_TTL: 5 * 60, // 5 min KAFKA_HEALTHCHECK_SECONDS: 20, OBJECT_STORAGE_ENABLED: true, diff --git a/plugin-server/src/config/kafka-topics.ts b/plugin-server/src/config/kafka-topics.ts index 5ff55c487524a..d7ea527904477 100644 --- a/plugin-server/src/config/kafka-topics.ts +++ b/plugin-server/src/config/kafka-topics.ts @@ -38,8 +38,10 @@ export const KAFKA_CLICKHOUSE_SESSION_REPLAY_EVENTS = `${prefix}clickhouse_sessi export const KAFKA_PERFORMANCE_EVENTS = `${prefix}clickhouse_performance_events${suffix}` // write heatmap events to ClickHouse export const KAFKA_CLICKHOUSE_HEATMAP_EVENTS = `${prefix}clickhouse_heatmap_events${suffix}` +// write exception events to ClickHouse +export const KAFKA_EXCEPTION_SYMBOLIFICATION_EVENTS = `${prefix}exception_symbolification_events${suffix}` -// log entries for ingestion into clickhouse +// log entries for ingestion into ClickHouse export const KAFKA_LOG_ENTRIES = `${prefix}log_entries${suffix}` // CDP topics diff --git a/plugin-server/src/types.ts b/plugin-server/src/types.ts index 71bf199ea2d4f..ead2b392739c3 100644 --- a/plugin-server/src/types.ts +++ b/plugin-server/src/types.ts @@ -151,8 +151,9 @@ export interface PluginsServerConfig extends CdpConfig { CLICKHOUSE_SECURE: boolean // whether to secure ClickHouse connection CLICKHOUSE_DISABLE_EXTERNAL_SCHEMAS: boolean // whether to disallow external schemas like protobuf for clickhouse kafka engine CLICKHOUSE_DISABLE_EXTERNAL_SCHEMAS_TEAMS: string // (advanced) a comma separated list of teams to disable clickhouse external schemas for - CLICKHOUSE_JSON_EVENTS_KAFKA_TOPIC: string // (advanced) topic to send events to for clickhouse ingestion - CLICKHOUSE_HEATMAPS_KAFKA_TOPIC: string // (advanced) topic to send heatmap data to for clickhouse ingestion + CLICKHOUSE_JSON_EVENTS_KAFKA_TOPIC: string // (advanced) topic to send events for clickhouse ingestion + CLICKHOUSE_HEATMAPS_KAFKA_TOPIC: string // (advanced) topic to send heatmap data for clickhouse ingestion + EXCEPTIONS_SYMBOLIFICATION_KAFKA_TOPIC: string // (advanced) topic to send exception event data for stack trace processing // Redis url pretty much only used locally / self hosted REDIS_URL: string // Redis params for the ingestion services diff --git a/plugin-server/src/worker/ingestion/event-pipeline/produceExceptionSymbolificationEventStep.ts b/plugin-server/src/worker/ingestion/event-pipeline/produceExceptionSymbolificationEventStep.ts new file mode 100644 index 0000000000000..91173639a3c0f --- /dev/null +++ b/plugin-server/src/worker/ingestion/event-pipeline/produceExceptionSymbolificationEventStep.ts @@ -0,0 +1,26 @@ +import { RawClickHouseEvent } from '../../../types' +import { status } from '../../../utils/status' +import { EventPipelineRunner } from './runner' + +export function produceExceptionSymbolificationEventStep( + runner: EventPipelineRunner, + event: RawClickHouseEvent +): Promise<[Promise]> { + const ack = runner.hub.kafkaProducer + .produce({ + topic: runner.hub.EXCEPTIONS_SYMBOLIFICATION_KAFKA_TOPIC, + key: event.uuid, + value: Buffer.from(JSON.stringify(event)), + waitForAck: true, + }) + .catch((error) => { + status.warn('⚠️', 'Failed to produce exception event for symbolification', { + team_id: event.team_id, + uuid: event.uuid, + error, + }) + throw error + }) + + return Promise.resolve([ack]) +} diff --git a/plugin-server/src/worker/ingestion/event-pipeline/runner.ts b/plugin-server/src/worker/ingestion/event-pipeline/runner.ts index 4851d866a4015..f165deadbc337 100644 --- a/plugin-server/src/worker/ingestion/event-pipeline/runner.ts +++ b/plugin-server/src/worker/ingestion/event-pipeline/runner.ts @@ -26,6 +26,7 @@ import { pluginsProcessEventStep } from './pluginsProcessEventStep' import { populateTeamDataStep } from './populateTeamDataStep' import { prepareEventStep } from './prepareEventStep' import { processPersonsStep } from './processPersonsStep' +import { produceExceptionSymbolificationEventStep } from './produceExceptionSymbolificationEventStep' export type EventPipelineResult = { // Promises that the batch handler should await on before committing offsets, @@ -262,8 +263,18 @@ export class EventPipelineRunner { [this, enrichedIfErrorEvent, person, processPerson], event.team_id ) - kafkaAcks.push(eventAck) + + if (event.event === '$exception') { + const [exceptionAck] = await this.runStep( + produceExceptionSymbolificationEventStep, + [this, rawClickhouseEvent], + event.team_id + ) + kafkaAcks.push(exceptionAck) + return this.registerLastStep('produceExceptionSymbolificationEventStep', [rawClickhouseEvent], kafkaAcks) + } + return this.registerLastStep('createEventStep', [rawClickhouseEvent], kafkaAcks) } diff --git a/plugin-server/tests/worker/ingestion/event-pipeline/runner.test.ts b/plugin-server/tests/worker/ingestion/event-pipeline/runner.test.ts index ef9864ccb6b1a..2aad4445410f8 100644 --- a/plugin-server/tests/worker/ingestion/event-pipeline/runner.test.ts +++ b/plugin-server/tests/worker/ingestion/event-pipeline/runner.test.ts @@ -335,6 +335,54 @@ describe('EventPipelineRunner', () => { ]) }) }) + + describe('$exception events', () => { + let exceptionEvent: PipelineEvent + beforeEach(() => { + exceptionEvent = { + ...pipelineEvent, + event: '$exception', + properties: { + ...pipelineEvent.properties, + $heatmap_data: { + url1: ['data'], + url2: ['more data'], + }, + }, + } + + // setup just enough mocks that the right pipeline runs + + runner = new TestEventPipelineRunner(hub, exceptionEvent, new EventsProcessor(hub)) + + jest.mocked(populateTeamDataStep).mockResolvedValue(exceptionEvent as any) + + const heatmapPreIngestionEvent = { + ...preIngestionEvent, + event: '$exception', + properties: { + ...exceptionEvent.properties, + }, + } + jest.mocked(prepareEventStep).mockResolvedValue(heatmapPreIngestionEvent) + }) + + it('runs the expected steps for heatmap_data', async () => { + await runner.runEventPipeline(exceptionEvent) + + expect(runner.steps).toEqual([ + 'populateTeamDataStep', + 'pluginsProcessEventStep', + 'normalizeEventStep', + 'processPersonsStep', + 'prepareEventStep', + 'extractHeatmapDataStep', + 'enrichExceptionEventStep', + 'createEventStep', + 'produceExceptionSymbolificationEventStep', + ]) + }) + }) }) }) diff --git a/posthog/kafka_client/topics.py b/posthog/kafka_client/topics.py index 20e83f218169f..227ca18d666fc 100644 --- a/posthog/kafka_client/topics.py +++ b/posthog/kafka_client/topics.py @@ -23,6 +23,8 @@ KAFKA_CLICKHOUSE_HEATMAP_EVENTS = f"{KAFKA_PREFIX}clickhouse_heatmap_events{SUFFIX}" +KAFKA_EXCEPTION_SYMBOLIFICATION_EVENTS = f"{KAFKA_PREFIX}exception_symbolification_events{SUFFIX}" + # from capture to recordings consumer KAFKA_SESSION_RECORDING_EVENTS = f"{KAFKA_PREFIX}session_recording_events{SUFFIX}" # from capture to recordings blob ingestion consumer