Skip to content

Commit

Permalink
the spikiest of spikes
Browse files Browse the repository at this point in the history
  • Loading branch information
pauldambra committed Sep 22, 2023
1 parent e953b19 commit 5c11ba6
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 5 deletions.
3 changes: 3 additions & 0 deletions plugin-server/src/config/kafka-topics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,6 @@ export const KAFKA_CLICKHOUSE_SESSION_RECORDING_EVENTS = `${prefix}clickhouse_se
export const KAFKA_CLICKHOUSE_SESSION_REPLAY_EVENTS = `${prefix}clickhouse_session_replay_events${suffix}`
// write performance events to ClickHouse
export const KAFKA_PERFORMANCE_EVENTS = `${prefix}clickhouse_performance_events${suffix}`

// log entries for ingestion into clickhouse
export const KAFKA_LOG_ENTRIES = `${prefix}log_entries${suffix}`
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { DateTime } from 'luxon'
import { HighLevelProducer as RdKafkaProducer, NumberNullUndefined } from 'node-rdkafka-acosom'
import { Counter } from 'prom-client'

import { KAFKA_CLICKHOUSE_SESSION_REPLAY_EVENTS } from '../../../../config/kafka-topics'
import { KAFKA_CLICKHOUSE_SESSION_REPLAY_EVENTS, KAFKA_LOG_ENTRIES } from '../../../../config/kafka-topics'
import { createRdConnectionConfigFromEnvVars } from '../../../../kafka/config'
import { findOffsetsToCommit } from '../../../../kafka/consumer'
import { retryOnDependencyUnavailableError } from '../../../../kafka/error-handling'
Expand Down Expand Up @@ -116,7 +116,7 @@ export class ReplayEventsIngester {
}

try {
const replayRecord = createSessionReplayEvent(
const [replayRecord, consoleLogEntries] = createSessionReplayEvent(
randomUUID(),
event.team_id,
event.distinct_id,
Expand Down Expand Up @@ -160,13 +160,25 @@ export class ReplayEventsIngester {

replayEventsCounter.inc()

const s1 = JSON.stringify(replayRecord)
const consoleLogProduces = consoleLogEntries.map((consoleLogEntry) => {
const s = JSON.stringify(consoleLogEntry)
status.info('🔁', '[replay-events] console_log', { s, s1 })
return produce({
producer: this.producer,
topic: KAFKA_LOG_ENTRIES,
value: Buffer.from(s),
key: event.session_id,
})
})
return [
produce({
producer: this.producer,
topic: KAFKA_CLICKHOUSE_SESSION_REPLAY_EVENTS,
value: Buffer.from(JSON.stringify(replayRecord)),
value: Buffer.from(s1),
key: event.session_id,
}),
...consoleLogProduces,
]
} catch (error) {
status.error('⚠️', '[replay-events] processing_error', {
Expand Down
43 changes: 41 additions & 2 deletions plugin-server/src/worker/ingestion/process-event.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { DateTime } from 'luxon'

import { activeMilliseconds } from '../../main/ingestion-queues/session-recording/snapshot-segmenter'
import {
ClickHouseTimestamp,
Element,
GroupTypeIndex,
Hub,
Expand Down Expand Up @@ -286,13 +287,39 @@ export interface SummarizedSessionRecordingEvent {
message_count: number
}

type ConsoleLogEntry = {
team_id: number
message: string
log_level: 'info' | 'warn' | 'error'
log_source: 'session_replay'
// the session_id
log_source_id: string
// maybe we omit this?
instance_id: string
timestamp: ClickHouseTimestamp
}

function safeString(payload: string[]) {
let candidate = payload.join(' ')

if (candidate.startsWith('"') || candidate.startsWith("'")) {
candidate = candidate.substring(1)
}

if (candidate.endsWith('"') || candidate.endsWith("'")) {
candidate = candidate.substring(0, candidate.length - 1)
}

return candidate
}

export const createSessionReplayEvent = (
uuid: string,
team_id: number,
distinct_id: string,
session_id: string,
events: RRWebEvent[]
) => {
): [SummarizedSessionRecordingEvent, ConsoleLogEntry[]] => {
const timestamps = events
.filter((e) => !!e?.timestamp)
.map((e) => castTimestampOrNow(DateTime.fromMillis(e.timestamp), TimestampFormat.ClickHouse))
Expand All @@ -315,6 +342,8 @@ export const createSessionReplayEvent = (
let consoleWarnCount = 0
let consoleErrorCount = 0
let url: string | null = null
const consoleLogEntries: ConsoleLogEntry[] = []

events.forEach((event) => {
if (event.type === 3) {
mouseActivity += 1
Expand All @@ -337,6 +366,16 @@ export const createSessionReplayEvent = (
} else if (level === 'error') {
consoleErrorCount += 1
}
consoleLogEntries.push({
team_id,
// when is it not a single item array?
message: safeString(event.data.payload?.payload),
log_level: level,
log_source: 'session_replay',
log_source_id: session_id,
instance_id: event.data.instanceId,
timestamp: castTimestampOrNow(DateTime.fromMillis(event.timestamp), TimestampFormat.ClickHouse),
})
}
})

Expand Down Expand Up @@ -364,7 +403,7 @@ export const createSessionReplayEvent = (
message_count: 1,
}

return data
return [data, consoleLogEntries]
}

export function createPerformanceEvent(uuid: string, team_id: number, distinct_id: string, properties: Properties) {
Expand Down

0 comments on commit 5c11ba6

Please sign in to comment.