diff --git a/plugin-server/src/kafka/batch-consumer.ts b/plugin-server/src/kafka/batch-consumer.ts index 47f9d566b8e16..2f1082f2aa5b4 100644 --- a/plugin-server/src/kafka/batch-consumer.ts +++ b/plugin-server/src/kafka/batch-consumer.ts @@ -281,6 +281,8 @@ export const startBatchConsumer = async ({ continue } + gaugeBatchUtilization.labels({ groupId }).set(messages.length / fetchBatchSize) + status.debug('🔁', 'main_loop_consumed', { messagesLength: messages.length }) if (!messages.length && !callEachBatchWhenEmpty) { status.debug('🔁', 'main_loop_empty_batch', { cause: 'empty' }) @@ -412,3 +414,9 @@ const kafkaAbsolutePartitionCount = new Gauge({ help: 'Number of partitions assigned to this consumer. (Absolute value from the consumer state.)', labelNames: ['topic'], }) + +const gaugeBatchUtilization = new Gauge({ + name: 'consumer_batch_utilization', + help: 'Indicates how big batches are we are processing compared to the max batch size. Useful as a scaling metric', + labelNames: ['groupId'], +}) diff --git a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts index 741f6a13978f4..feed88b570d5f 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts @@ -119,11 +119,6 @@ export const sessionInfoSummary = new Summary({ percentiles: [0.1, 0.25, 0.5, 0.9, 0.99], }) -const gaugeBatchUtilization = new Gauge({ - name: 'recording_blob_ingestion_batch_utilization', - help: 'Indicates how big batches are we are processing compared to the max batch size. Useful as a scaling metric', -}) - type PartitionMetrics = { lastMessageTimestamp?: number lastMessageOffset?: number @@ -348,8 +343,6 @@ export class SessionRecordingIngester { }) } - gaugeBatchUtilization.set(messages.length / this.config.SESSION_RECORDING_KAFKA_BATCH_SIZE) - await runInstrumentedFunction({ statsKey: `recordingingester.handleEachBatch`, sendTimeoutGuardToSentry: false,