From b707cf548476edaf368aa8fd3531a24da33a869b Mon Sep 17 00:00:00 2001 From: Paul D'Ambra Date: Thu, 18 Jan 2024 08:38:54 +0000 Subject: [PATCH] chore: add two new metrics for blobby commits (#19818) * chore: add two new metrics for blobby commits * fix --- .../session-recording/services/session-manager.ts | 2 +- .../session-recordings-consumer.ts | 15 +++++++++++++++ 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/plugin-server/src/main/ingestion-queues/session-recording/services/session-manager.ts b/plugin-server/src/main/ingestion-queues/session-recording/services/session-manager.ts index c03f6a06dd820..9957bde31d5e2 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/services/session-manager.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/services/session-manager.ts @@ -189,7 +189,7 @@ export class SessionManager { throw error } - // NOTE: This is uncompressed size estimate but thats okay as we currently want to over-flush to see if we can shake out a bug + // NOTE: This is uncompressed size estimate but that's okay as we currently want to over-flush to see if we can shake out a bug if (this.buffer.sizeEstimate >= this.serverConfig.SESSION_RECORDING_MAX_BUFFER_SIZE_KB * 1024) { await this.flush('buffer_size') } diff --git a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts index d0d3d58da7f07..725a11f185cda 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts @@ -90,6 +90,17 @@ const counterKafkaMessageReceived = new Counter({ labelNames: ['partition'], }) +const counterCommitSkippedDueToPotentiallyBlockingSession = new Counter({ + name: 'recording_blob_ingestion_commit_skipped_due_to_potentially_blocking_session', + help: 'The number of times we skipped committing due to a potentially blocking session', +}) + +const histogramActiveSessionsWhenCommitIsBlocked = new Histogram({ + name: 'recording_blob_ingestion_active_sessions_when_commit_is_blocked', + help: 'The number of active sessions on a partition when we skip committing due to a potentially blocking session', + buckets: [0, 1, 2, 3, 4, 5, 10, 20, 50, 100, 1000, 10000, Infinity], +}) + type PartitionMetrics = { lastMessageTimestamp?: number lastMessageOffset?: number @@ -648,9 +659,11 @@ export class SessionRecordingIngester { let potentiallyBlockingSession: SessionManager | undefined + let activeSessionsOnThisPartition = 0 for (const sessionManager of blockingSessions) { if (sessionManager.partition === partition) { const lowestOffset = sessionManager.getLowestOffset() + activeSessionsOnThisPartition++ if ( lowestOffset !== null && lowestOffset < (potentiallyBlockingSession?.getLowestOffset() || Infinity) @@ -684,6 +697,8 @@ export class SessionRecordingIngester { highestOffsetToCommit, } ) + counterCommitSkippedDueToPotentiallyBlockingSession.inc() + histogramActiveSessionsWhenCommitIsBlocked.observe(activeSessionsOnThisPartition) return }