diff --git a/plugin-server/src/main/ingestion-queues/session-recording/services/session-manager.ts b/plugin-server/src/main/ingestion-queues/session-recording/services/session-manager.ts index 50d8c587b772c..136a5db183c27 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/services/session-manager.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/services/session-manager.ts @@ -158,6 +158,11 @@ export class SessionManager { public async add(message: IncomingRecordingMessage): Promise { if (this.destroying) { + if (this.debug) { + status.warn('🚽', '[session-manager] add called but we are in a destroying state', { + ...this.logContext(), + }) + } return } @@ -200,9 +205,18 @@ export class SessionManager { } // NOTE: This is uncompressed size estimate but that's okay as we currently want to over-flush to see if we can shake out a bug - if (this.buffer.sizeEstimate >= this.serverConfig.SESSION_RECORDING_MAX_BUFFER_SIZE_KB * 1024) { + const shouldAttemptFlush = + this.buffer.sizeEstimate >= this.serverConfig.SESSION_RECORDING_MAX_BUFFER_SIZE_KB * 1024 + if (shouldAttemptFlush) { await this.flush('buffer_size') } + if (this.debug) { + status.info('🚽', `[session-manager] added message`, { + ...this.logContext(), + metadata: message.metadata, + shouldAttemptFlush, + }) + } } public get isEmpty(): boolean { diff --git a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts index 1d0c928b81b0c..c8961d1e1696b 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts @@ -263,10 +263,26 @@ export class SessionRecordingIngester { const overflowKey = `${team_id}:${session_id}` const { partition, highOffset } = event.metadata - if (this.debugPartition === partition) { + const isDebug = this.debugPartition === partition + if (isDebug) { status.info('🔁', '[blob_ingester_consumer] - [PARTITION DEBUG] - consuming event', { ...event.metadata }) } + function dropEvent(dropCause: string) { + eventDroppedCounter + .labels({ + event_type: 'session_recordings_blob_ingestion', + drop_cause: dropCause, + }) + .inc() + if (isDebug) { + status.info('🔁', '[blob_ingester_consumer] - [PARTITION DEBUG] - dropping event', { + ...event.metadata, + dropCause, + }) + } + } + // Check that we are not below the high-water mark for this partition (another consumer may have flushed further than us when revoking) if ( await this.persistentHighWaterMarker.isBelowHighWaterMark( @@ -275,24 +291,12 @@ export class SessionRecordingIngester { highOffset ) ) { - eventDroppedCounter - .labels({ - event_type: 'session_recordings_blob_ingestion', - drop_cause: 'high_water_mark_partition', - }) - .inc() - + dropEvent('high_water_mark_partition') return } if (await this.sessionHighWaterMarker.isBelowHighWaterMark(event.metadata, session_id, highOffset)) { - eventDroppedCounter - .labels({ - event_type: 'session_recordings_blob_ingestion', - drop_cause: 'high_water_mark', - }) - .inc() - + dropEvent('high_water_mark') return }