Skip to content

Commit

Permalink
feat: Heatmaps ingestion (#21629)
Browse files Browse the repository at this point in the history
  • Loading branch information
benjackwhite authored Apr 23, 2024
1 parent 5c273be commit 6183f88
Show file tree
Hide file tree
Showing 9 changed files with 5,618 additions and 2 deletions.
2 changes: 2 additions & 0 deletions plugin-server/src/config/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { LogLevel, PluginLogLevel, PluginsServerConfig, stringToPluginServerMode
import { isDevEnv, isTestEnv, stringToBoolean } from '../utils/env-utils'
import { KAFKAJS_LOG_LEVEL_MAPPING } from './constants'
import {
KAFKA_CLICKHOUSE_HEATMAP_EVENTS,
KAFKA_EVENTS_JSON,
KAFKA_EVENTS_PLUGIN_INGESTION,
KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW,
Expand Down Expand Up @@ -104,6 +105,7 @@ export function getDefaultConfig(): PluginsServerConfig {
KAFKA_PARTITIONS_CONSUMED_CONCURRENTLY: 1,
CLICKHOUSE_DISABLE_EXTERNAL_SCHEMAS_TEAMS: '',
CLICKHOUSE_JSON_EVENTS_KAFKA_TOPIC: KAFKA_EVENTS_JSON,
CLICKHOUSE_HEATMAPS_KAFKA_TOPIC: KAFKA_CLICKHOUSE_HEATMAP_EVENTS,
CONVERSION_BUFFER_ENABLED: false,
CONVERSION_BUFFER_ENABLED_TEAMS: '',
CONVERSION_BUFFER_TOPIC_ENABLED_TEAMS: '',
Expand Down
2 changes: 2 additions & 0 deletions plugin-server/src/config/kafka-topics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ export const KAFKA_CLICKHOUSE_SESSION_RECORDING_EVENTS = `${prefix}clickhouse_se
export const KAFKA_CLICKHOUSE_SESSION_REPLAY_EVENTS = `${prefix}clickhouse_session_replay_events${suffix}`
// write performance events to ClickHouse
export const KAFKA_PERFORMANCE_EVENTS = `${prefix}clickhouse_performance_events${suffix}`
// write heatmap events to ClickHouse
export const KAFKA_CLICKHOUSE_HEATMAP_EVENTS = `${prefix}clickhouse_heatmap_events${suffix}`

// log entries for ingestion into clickhouse
export const KAFKA_LOG_ENTRIES = `${prefix}log_entries${suffix}`
24 changes: 24 additions & 0 deletions plugin-server/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ export interface PluginsServerConfig {
CLICKHOUSE_DISABLE_EXTERNAL_SCHEMAS: boolean // whether to disallow external schemas like protobuf for clickhouse kafka engine
CLICKHOUSE_DISABLE_EXTERNAL_SCHEMAS_TEAMS: string // (advanced) a comma separated list of teams to disable clickhouse external schemas for
CLICKHOUSE_JSON_EVENTS_KAFKA_TOPIC: string // (advanced) topic to send events to for clickhouse ingestion
CLICKHOUSE_HEATMAPS_KAFKA_TOPIC: string // (advanced) topic to send heatmap data to for clickhouse ingestion
REDIS_URL: string
POSTHOG_REDIS_PASSWORD: string
POSTHOG_REDIS_HOST: string
Expand Down Expand Up @@ -1099,3 +1100,26 @@ export type RRWebEvent = Record<string, any> & {
export interface ValueMatcher<T> {
(value: T): boolean
}

export type RawClickhouseHeatmapEvent = {
/**
* session id lets us offer example recordings on high traffic parts of the page,
* and could let us offer more advanced filtering of heatmap data
* we will break the relationship between particular sessions and clicks in aggregating this data
* it should always be treated as an exemplar and not as concrete values
*/
session_id: string
distinct_id: string
viewport_width: number
viewport_height: number
pointer_target_fixed: boolean
current_url: string
// x is the x with resolution applied, the resolution converts high fidelity mouse positions into an NxN grid
x: number
// y is the y with resolution applied, the resolution converts high fidelity mouse positions into an NxN grid
y: number
scale_factor: 16 // in the future we may support other values
timestamp: string
type: string
team_id: number
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
import { URL } from 'url'

import { PreIngestionEvent, RawClickhouseHeatmapEvent, TimestampFormat } from '../../../types'
import { castTimestampOrNow } from '../../../utils/utils'
import { captureIngestionWarning } from '../utils'
import { EventPipelineRunner } from './runner'

// This represents the scale factor for the heatmap data. Essentially how much we are reducing the resolution by.
const SCALE_FACTOR = 16

type HeatmapDataItem = {
x: number
y: number
target_fixed: boolean
type: string
}

type HeatmapData = Record<string, HeatmapDataItem[]>

export function extractHeatmapDataStep(
runner: EventPipelineRunner,
event: PreIngestionEvent
): Promise<[PreIngestionEvent, Promise<void>[]]> {
const { eventUuid, teamId } = event

let acks: Promise<void>[] = []

try {
const heatmapEvents = extractScrollDepthHeatmapData(event) ?? []

// eslint-disable-next-line @typescript-eslint/no-floating-promises
acks = heatmapEvents.map((rawEvent) => {
return runner.hub.kafkaProducer.produce({
topic: runner.hub.CLICKHOUSE_HEATMAPS_KAFKA_TOPIC,
key: eventUuid,
value: Buffer.from(JSON.stringify(rawEvent)),
waitForAck: true,
})
})
} catch (e) {
acks.push(
captureIngestionWarning(runner.hub.kafkaProducer, teamId, 'invalid_heatmap_data', {
eventUuid,
})
)
}

// We don't want to ingest this data to the events table
delete event.properties['$heatmap_data']

return Promise.resolve([event, acks])
}

function replacePathInUrl(url: string, newPath: string): string {
const parsedUrl = new URL(url)
parsedUrl.pathname = newPath
return parsedUrl.toString()
}

function extractScrollDepthHeatmapData(event: PreIngestionEvent): RawClickhouseHeatmapEvent[] {
const { teamId, timestamp, properties } = event
const {
$viewport_height,
$viewport_width,
$session_id,
distinct_id,
$prev_pageview_pathname,
$prev_pageview_max_scroll,
$current_url,
$heatmap_data,
} = properties || {}

let heatmapData = $heatmap_data as HeatmapData | null

if ($prev_pageview_pathname && $current_url) {
// We are going to add the scroll depth info derived from the previous pageview to the current pageview's heatmap data
if (!heatmapData) {
heatmapData = {}
}

const previousUrl = replacePathInUrl($current_url, $prev_pageview_pathname)
heatmapData[previousUrl] = heatmapData[previousUrl] || []
heatmapData[previousUrl].push({
x: 0,
y: $prev_pageview_max_scroll,
target_fixed: false,
type: 'scrolldepth',
})
}

let heatmapEvents: RawClickhouseHeatmapEvent[] = []

if (!heatmapData) {
return []
}

Object.entries(heatmapData).forEach(([url, items]) => {
if (Array.isArray(items)) {
heatmapEvents = heatmapEvents.concat(
(items as any[]).map(
(hme: {
x: number
y: number
target_fixed: boolean
type: string
}): RawClickhouseHeatmapEvent => ({
type: hme.type,
x: Math.round(hme.x / SCALE_FACTOR),
y: Math.round(hme.y / SCALE_FACTOR),
pointer_target_fixed: hme.target_fixed,
viewport_height: Math.round($viewport_height / SCALE_FACTOR),
viewport_width: Math.round($viewport_width / SCALE_FACTOR),
current_url: url,
session_id: $session_id,
scale_factor: SCALE_FACTOR,
timestamp: castTimestampOrNow(timestamp ?? null, TimestampFormat.ClickHouse),
team_id: teamId,
distinct_id: distinct_id,
})
)
)
}
})

return heatmapEvents
}
13 changes: 12 additions & 1 deletion plugin-server/src/worker/ingestion/event-pipeline/runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { normalizeProcessPerson } from '../../../utils/event'
import { status } from '../../../utils/status'
import { captureIngestionWarning, generateEventDeadLetterQueueMessage } from '../utils'
import { createEventStep } from './createEventStep'
import { extractHeatmapDataStep } from './extractHeatmapDataStep'
import {
eventProcessedAndIngestedCounter,
pipelineLastStepCounter,
Expand Down Expand Up @@ -216,9 +217,19 @@ export class EventPipelineRunner {
event.team_id
)

const [preparedEventWithoutHeatmaps, heatmapKafkaAcks] = await this.runStep(
extractHeatmapDataStep,
[this, preparedEvent],
event.team_id
)

if (heatmapKafkaAcks.length > 0) {
kafkaAcks.push(...heatmapKafkaAcks)
}

const [rawClickhouseEvent, eventAck] = await this.runStep(
createEventStep,
[this, preparedEvent, person, processPerson],
[this, preparedEventWithoutHeatmaps, person, processPerson],
event.team_id
)

Expand Down
Loading

0 comments on commit 6183f88

Please sign in to comment.