Skip to content

Commit

Permalink
feat(err): Emit to clickhouse from cymbal (#26244)
Browse files Browse the repository at this point in the history
  • Loading branch information
oliverb123 authored Nov 19, 2024
1 parent 1466949 commit 6a424df
Show file tree
Hide file tree
Showing 18 changed files with 145 additions and 60 deletions.
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import { Person, PreIngestionEvent, RawClickHouseEvent } from '../../../types'
import { Person, PreIngestionEvent, RawKafkaEvent } from '../../../types'
import { EventPipelineRunner } from './runner'

export function createEventStep(
runner: EventPipelineRunner,
event: PreIngestionEvent,
person: Person,
processPerson: boolean
): [RawClickHouseEvent, Promise<void>] {
): RawKafkaEvent {
return runner.eventsProcessor.createEvent(event, person, processPerson)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import { RawKafkaEvent } from '../../../types'
import { EventPipelineRunner } from './runner'

export function emitEventStep(runner: EventPipelineRunner, event: RawKafkaEvent): [Promise<void>] {
return [runner.eventsProcessor.emitEvent(event)]
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import { RawClickHouseEvent } from '../../../types'
import { RawKafkaEvent } from '../../../types'
import { status } from '../../../utils/status'
import { EventPipelineRunner } from './runner'

export function produceExceptionSymbolificationEventStep(
runner: EventPipelineRunner,
event: RawClickHouseEvent
event: RawKafkaEvent
): Promise<[Promise<void>]> {
const ack = runner.hub.kafkaProducer
.produce({
Expand Down
14 changes: 8 additions & 6 deletions plugin-server/src/worker/ingestion/event-pipeline/runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { status } from '../../../utils/status'
import { EventsProcessor } from '../process-event'
import { captureIngestionWarning, generateEventDeadLetterQueueMessage } from '../utils'
import { createEventStep } from './createEventStep'
import { emitEventStep } from './emitEventStep'
import { enrichExceptionEventStep } from './enrichExceptionEventStep'
import { extractHeatmapDataStep } from './extractHeatmapDataStep'
import {
Expand Down Expand Up @@ -259,24 +260,25 @@ export class EventPipelineRunner {
event.team_id
)

const [rawClickhouseEvent, eventAck] = await this.runStep(
const rawEvent = await this.runStep(
createEventStep,
[this, enrichedIfErrorEvent, person, processPerson],
event.team_id
)
kafkaAcks.push(eventAck)

if (event.event === '$exception' && event.team_id == 2) {
const [exceptionAck] = await this.runStep(
produceExceptionSymbolificationEventStep,
[this, rawClickhouseEvent],
[this, rawEvent],
event.team_id
)
kafkaAcks.push(exceptionAck)
return this.registerLastStep('produceExceptionSymbolificationEventStep', [rawClickhouseEvent], kafkaAcks)
return this.registerLastStep('produceExceptionSymbolificationEventStep', [rawEvent], kafkaAcks)
} else {
const [clickhouseAck] = await this.runStep(emitEventStep, [this, rawEvent], event.team_id)
kafkaAcks.push(clickhouseAck)
return this.registerLastStep('emitEventStep', [rawEvent], kafkaAcks)
}

return this.registerLastStep('createEventStep', [rawClickhouseEvent], kafkaAcks)
}

registerLastStep(stepName: string, args: any[], ackPromises?: Array<Promise<void>>): EventPipelineResult {
Expand Down
20 changes: 10 additions & 10 deletions plugin-server/src/worker/ingestion/process-event.ts
Original file line number Diff line number Diff line change
Expand Up @@ -204,11 +204,7 @@ export class EventsProcessor {
return res
}

createEvent(
preIngestionEvent: PreIngestionEvent,
person: Person,
processPerson: boolean
): [RawKafkaEvent, Promise<void>] {
createEvent(preIngestionEvent: PreIngestionEvent, person: Person, processPerson: boolean): RawKafkaEvent {
const { eventUuid: uuid, event, teamId, projectId, distinctId, properties, timestamp } = preIngestionEvent

let elementsChain = ''
Expand Down Expand Up @@ -264,27 +260,31 @@ export class EventsProcessor {
person_mode: personMode,
}

return rawEvent
}

emitEvent(rawEvent: RawKafkaEvent): Promise<void> {
const ack = this.kafkaProducer
.produce({
topic: this.pluginsServer.CLICKHOUSE_JSON_EVENTS_KAFKA_TOPIC,
key: uuid,
key: rawEvent.uuid,
value: Buffer.from(JSON.stringify(rawEvent)),
waitForAck: true,
})
.catch(async (error) => {
// Some messages end up significantly larger than the original
// after plugin processing, person & group enrichment, etc.
if (error instanceof MessageSizeTooLarge) {
await captureIngestionWarning(this.db.kafkaProducer, teamId, 'message_size_too_large', {
eventUuid: uuid,
distinctId: distinctId,
await captureIngestionWarning(this.db.kafkaProducer, rawEvent.team_id, 'message_size_too_large', {
eventUuid: rawEvent.uuid,
distinctId: rawEvent.distinct_id,
})
} else {
throw error
}
})

return [rawEvent, ack]
return ack
}

private async upsertGroup(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,5 +144,24 @@ Array [
true,
],
],
Array [
"emitEventStep",
Array [
Object {
"created_at": "2024-11-18 14:54:33.606",
"distinct_id": "my_id",
"elements_chain": "",
"event": "$pageview",
"person_created_at": "2024-11-18 14:54:33",
"person_mode": "full",
"person_properties": "{}",
"project_id": 1,
"properties": "{}",
"team_id": 2,
"timestamp": "2020-02-23 02:15:00.000",
"uuid": "uuid1",
},
],
],
]
`;
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ describe('prepareEventStep()', () => {
it('extracts elements_chain from properties', async () => {
const event: PluginEvent = { ...pluginEvent, ip: null, properties: { $elements_chain: 'random string', a: 1 } }
const preppedEvent = await prepareEventStep(runner, event)
const [chEvent, _] = runner.eventsProcessor.createEvent(preppedEvent, person)
const chEvent = runner.eventsProcessor.createEvent(preppedEvent, person)

expect(chEvent.elements_chain).toEqual('random string')
expect(chEvent.properties).toEqual('{"a":1}')
Expand All @@ -137,7 +137,7 @@ describe('prepareEventStep()', () => {
},
}
const preppedEvent = await prepareEventStep(runner, event)
const [chEvent, _] = runner.eventsProcessor.createEvent(preppedEvent, person)
const chEvent = runner.eventsProcessor.createEvent(preppedEvent, person)

expect(chEvent.elements_chain).toEqual('random string')
expect(chEvent.properties).toEqual('{"a":1}')
Expand All @@ -151,7 +151,7 @@ describe('prepareEventStep()', () => {
properties: { a: 1, $elements: [{ tag_name: 'div', nth_child: 1, nth_of_type: 2, $el_text: 'text' }] },
}
const preppedEvent = await prepareEventStep(runner, event)
const [chEvent, _] = runner.eventsProcessor.createEvent(preppedEvent, person)
const chEvent = runner.eventsProcessor.createEvent(preppedEvent, person)

expect(chEvent.elements_chain).toEqual('div:nth-child="1"nth-of-type="2"text="text"')
expect(chEvent.properties).toEqual('{"a":1}')
Expand Down
31 changes: 26 additions & 5 deletions plugin-server/tests/worker/ingestion/event-pipeline/runner.test.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import { PluginEvent } from '@posthog/plugin-scaffold'
import { DateTime } from 'luxon'

import { ISOTimestamp, Person, PipelineEvent, PreIngestionEvent } from '../../../../src/types'
import { ISOTimestamp, Person, PipelineEvent, PreIngestionEvent, RawKafkaEvent } from '../../../../src/types'
import { createEventsToDropByToken } from '../../../../src/utils/db/hub'
import { createEventStep } from '../../../../src/worker/ingestion/event-pipeline/createEventStep'
import { emitEventStep } from '../../../../src/worker/ingestion/event-pipeline/emitEventStep'
import * as metrics from '../../../../src/worker/ingestion/event-pipeline/metrics'
import { pluginsProcessEventStep } from '../../../../src/worker/ingestion/event-pipeline/pluginsProcessEventStep'
import { populateTeamDataStep } from '../../../../src/worker/ingestion/event-pipeline/populateTeamDataStep'
Expand All @@ -18,6 +19,7 @@ jest.mock('../../../../src/worker/ingestion/event-pipeline/pluginsProcessEventSt
jest.mock('../../../../src/worker/ingestion/event-pipeline/processPersonsStep')
jest.mock('../../../../src/worker/ingestion/event-pipeline/prepareEventStep')
jest.mock('../../../../src/worker/ingestion/event-pipeline/createEventStep')
jest.mock('../../../../src/worker/ingestion/event-pipeline/emitEventStep')
jest.mock('../../../../src/worker/ingestion/event-pipeline/runAsyncHandlersStep')

class TestEventPipelineRunner extends EventPipelineRunner {
Expand Down Expand Up @@ -74,6 +76,21 @@ const preIngestionEvent: PreIngestionEvent = {
elementsList: [],
}

const createdEvent: RawKafkaEvent = {
created_at: '2024-11-18 14:54:33.606',
distinct_id: 'my_id',
elements_chain: '',
event: '$pageview',
person_created_at: '2024-11-18 14:54:33',
person_mode: 'full',
person_properties: '{}',
project_id: 1,
properties: '{}',
team_id: 2,
timestamp: '2020-02-23 02:15:00.000',
uuid: 'uuid1',
}

const person: Person = {
id: 123,
team_id: 2,
Expand Down Expand Up @@ -112,7 +129,8 @@ describe('EventPipelineRunner', () => {
{ person, personUpdateProperties: {}, get: () => Promise.resolve(person) } as any,
])
jest.mocked(prepareEventStep).mockResolvedValue(preIngestionEvent)
jest.mocked(createEventStep).mockResolvedValue([null, Promise.resolve()])
jest.mocked(createEventStep).mockResolvedValue(createdEvent)
jest.mocked(emitEventStep).mockResolvedValue([Promise.resolve()])
jest.mocked(processOnEventStep).mockResolvedValue(null)
})

Expand All @@ -129,6 +147,7 @@ describe('EventPipelineRunner', () => {
'extractHeatmapDataStep',
'enrichExceptionEventStep',
'createEventStep',
'emitEventStep',
])
expect(runner.stepsWithArgs).toMatchSnapshot()
})
Expand Down Expand Up @@ -158,6 +177,7 @@ describe('EventPipelineRunner', () => {
'extractHeatmapDataStep',
'enrichExceptionEventStep',
'createEventStep',
'emitEventStep',
])
})

Expand All @@ -179,11 +199,11 @@ describe('EventPipelineRunner', () => {
const result = await runner.runEventPipeline(pipelineEvent)
expect(result.error).toBeUndefined()

expect(pipelineStepMsSummarySpy).toHaveBeenCalledTimes(8)
expect(pipelineStepMsSummarySpy).toHaveBeenCalledTimes(9)
expect(pipelineLastStepCounterSpy).toHaveBeenCalledTimes(1)
expect(eventProcessedAndIngestedCounterSpy).toHaveBeenCalledTimes(1)
expect(pipelineStepMsSummarySpy).toHaveBeenCalledWith('createEventStep')
expect(pipelineLastStepCounterSpy).toHaveBeenCalledWith('createEventStep')
expect(pipelineStepMsSummarySpy).toHaveBeenCalledWith('emitEventStep')
expect(pipelineLastStepCounterSpy).toHaveBeenCalledWith('emitEventStep')
expect(pipelineStepErrorCounterSpy).not.toHaveBeenCalled()
})

Expand Down Expand Up @@ -380,6 +400,7 @@ describe('EventPipelineRunner', () => {
'extractHeatmapDataStep',
'enrichExceptionEventStep',
'createEventStep',
'emitEventStep',
])
})
})
Expand Down
42 changes: 25 additions & 17 deletions plugin-server/tests/worker/ingestion/process-event.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ describe('EventsProcessor#createEvent()', () => {

it('emits event with person columns, re-using event properties', async () => {
const processPerson = true
eventsProcessor.createEvent(preIngestionEvent, person, processPerson)
eventsProcessor.emitEvent(eventsProcessor.createEvent(preIngestionEvent, person, processPerson))

await eventsProcessor.kafkaProducer.flush()

Expand Down Expand Up @@ -147,10 +147,12 @@ describe('EventsProcessor#createEvent()', () => {
)

const processPerson = true
eventsProcessor.createEvent(
{ ...preIngestionEvent, properties: { $group_0: 'group_key' } },
person,
processPerson
eventsProcessor.emitEvent(
eventsProcessor.createEvent(
{ ...preIngestionEvent, properties: { $group_0: 'group_key' } },
person,
processPerson
)
)

const events = await delayUntilEventIngested(() => hub.db.fetchEvents())
Expand All @@ -169,10 +171,12 @@ describe('EventsProcessor#createEvent()', () => {

it('when $process_person_profile=false, emits event with without person properties or groups', async () => {
const processPerson = false
eventsProcessor.createEvent(
{ ...preIngestionEvent, properties: { $group_0: 'group_key' } },
person,
processPerson
eventsProcessor.emitEvent(
eventsProcessor.createEvent(
{ ...preIngestionEvent, properties: { $group_0: 'group_key' } },
person,
processPerson
)
)

await eventsProcessor.kafkaProducer.flush()
Expand All @@ -199,10 +203,12 @@ describe('EventsProcessor#createEvent()', () => {
it('force_upgrade persons are recorded as such', async () => {
const processPerson = false
person.force_upgrade = true
eventsProcessor.createEvent(
{ ...preIngestionEvent, properties: { $group_0: 'group_key' } },
person,
processPerson
eventsProcessor.emitEvent(
eventsProcessor.createEvent(
{ ...preIngestionEvent, properties: { $group_0: 'group_key' } },
person,
processPerson
)
)

await eventsProcessor.kafkaProducer.flush()
Expand Down Expand Up @@ -236,10 +242,12 @@ describe('EventsProcessor#createEvent()', () => {
uuid: uuid,
}
const processPerson = true
eventsProcessor.createEvent(
{ ...preIngestionEvent, distinctId: 'no-such-person' },
nonExistingPerson,
processPerson
eventsProcessor.emitEvent(
eventsProcessor.createEvent(
{ ...preIngestionEvent, distinctId: 'no-such-person' },
nonExistingPerson,
processPerson
)
)
await eventsProcessor.kafkaProducer.flush()

Expand Down
19 changes: 16 additions & 3 deletions rust/common/kafka/src/kafka_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,15 +90,28 @@ pub async fn send_iter_to_kafka<T>(
topic: &str,
iter: impl IntoIterator<Item = T>,
) -> Result<(), KafkaProduceError>
where
T: Serialize,
{
send_keyed_iter_to_kafka(kafka_producer, topic, |_| None, iter).await
}

pub async fn send_keyed_iter_to_kafka<T>(
kafka_producer: &FutureProducer<KafkaContext>,
topic: &str,
key_extractor: impl Fn(&T) -> Option<String>,
iter: impl IntoIterator<Item = T>,
) -> Result<(), KafkaProduceError>
where
T: Serialize,
{
let mut payloads = Vec::new();

for i in iter {
let key = key_extractor(&i);
let payload = serde_json::to_string(&i)
.map_err(|e| KafkaProduceError::SerializationError { error: e })?;
payloads.push(payload);
payloads.push((key, payload));
}

if payloads.is_empty() {
Expand All @@ -107,12 +120,12 @@ where

let mut delivery_futures = Vec::new();

for payload in payloads {
for (key, payload) in payloads {
match kafka_producer.send_result(FutureRecord {
topic,
payload: Some(&payload),
partition: None,
key: None::<&str>,
key: key.as_deref(),
timestamp: None,
headers: None,
}) {
Expand Down
1 change: 1 addition & 0 deletions rust/common/types/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ pub enum PersonMode {
pub struct ClickHouseEvent {
pub uuid: Uuid,
pub team_id: i32,
pub project_id: i32,
pub event: String,
pub distinct_id: String,
#[serde(skip_serializing_if = "Option::is_none")]
Expand Down
Loading

0 comments on commit 6a424df

Please sign in to comment.