diff --git a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts index 18bd06a710c3f..45b6dffcc98ed 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts @@ -3,7 +3,7 @@ import crypto from 'crypto' import { Redis } from 'ioredis' import { mkdirSync, rmSync } from 'node:fs' import { CODES, features, KafkaConsumer, librdkafkaVersion, Message, TopicPartition } from 'node-rdkafka' -import { Counter, Gauge, Histogram } from 'prom-client' +import { Counter, Gauge, Histogram, Summary } from 'prom-client' import { sessionRecordingConsumerConfig } from '../../../config/config' import { @@ -113,6 +113,12 @@ const histogramActiveSessionsWhenCommitIsBlocked = new Histogram({ buckets: [0, 1, 2, 3, 4, 5, 10, 20, 50, 100, 1000, 10000, Infinity], }) +export const sessionInfoSummary = new Summary({ + name: 'recording_blob_ingestion_session_info_bytes', + help: 'Size of aggregated session information being processed', + percentiles: [0.1, 0.25, 0.5, 0.9, 0.99], +}) + type PartitionMetrics = { lastMessageTimestamp?: number lastMessageOffset?: number @@ -313,6 +319,8 @@ export class SessionRecordingIngester { ) } + sessionInfoSummary.observe(event.metadata.rawSize) + await Promise.allSettled([ this.sessions[key]?.add(event), this.overflowDetection?.observe(session_id, event.metadata.rawSize, event.metadata.timestamp),