diff --git a/plugin-server/src/kafka/consumer.ts b/plugin-server/src/kafka/consumer.ts index 62b8e951ebc9f..14a45f946376e 100644 --- a/plugin-server/src/kafka/consumer.ts +++ b/plugin-server/src/kafka/consumer.ts @@ -116,7 +116,9 @@ export const instrumentConsumerMetrics = ( } } } else if (error.code === CODES.ERRORS.ERR__REVOKE_PARTITIONS) { - status.info('๐Ÿ“๏ธ', `librdkafka ${strategyString} rebalance started, partitions revoked`, { assignments }) + status.info('๐Ÿ“๏ธ', `librdkafka ${strategyString} rebalance started, partitions revoked`, { + revocations: assignments, + }) for (const [topic, count] of countPartitionsPerTopic(assignments)) { if (cooperativeRebalance) { kafkaRebalancePartitionCount.labels({ topic: topic }).dec(count) diff --git a/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-onevent.ts b/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-onevent.ts index 4d12925f0ce6b..e7d52d3a26bbe 100644 --- a/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-onevent.ts +++ b/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-onevent.ts @@ -28,10 +28,12 @@ export async function eachMessageAppsOnEventHandlers( const event = convertToIngestionEvent(clickHouseEvent, skipElementsChain) await runInstrumentedFunction({ - event: event, func: () => queue.workerMethods.runAppsOnEventPipeline(event), statsKey: `kafka_queue.process_async_handlers_on_event`, timeoutMessage: 'After 30 seconds still running runAppsOnEventPipeline', + timeoutContext: () => ({ + event: JSON.stringify(event), + }), teamId: event.teamId, }) } else { diff --git a/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-webhooks.ts b/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-webhooks.ts index fb671f0cd9633..a0c3f1e5bc22b 100644 --- a/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-webhooks.ts +++ b/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-webhooks.ts @@ -155,10 +155,12 @@ export async function eachMessageWebhooksHandlers( convertToProcessedPluginEvent(event) await runInstrumentedFunction({ - event: event, func: () => runWebhooks(statsd, actionMatcher, hookCannon, event), statsKey: `kafka_queue.process_async_handlers_webhooks`, timeoutMessage: 'After 30 seconds still running runWebhooksHandlersEventPipeline', + timeoutContext: () => ({ + event: JSON.stringify(event), + }), teamId: event.teamId, }) } diff --git a/plugin-server/src/main/ingestion-queues/session-recording/services/partition-locker.ts b/plugin-server/src/main/ingestion-queues/session-recording/services/partition-locker.ts index 2779ac884b0c5..62f3200c22cfb 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/services/partition-locker.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/services/partition-locker.ts @@ -20,7 +20,7 @@ const FLAG_EXPIRE_MS = 'PX' * To do this we keep a "lock" in place until we have flushed as much data as possible. */ export class PartitionLocker { - consumerID = randomUUID() + consumerID = process.env.HOSTNAME ?? randomUUID() delay = 1000 ttl = 30000 @@ -62,8 +62,6 @@ export class PartitionLocker { keys.map(async (key) => { const existingClaim = await client.get(key) - status.info('๐Ÿ”’', `PartitionLocker claim: ${key}:${existingClaim}`) - if (existingClaim && existingClaim !== this.consumerID) { // Still claimed by someone else! blockingConsumers.add(existingClaim) @@ -92,7 +90,7 @@ export class PartitionLocker { } } - status.info('๐Ÿ”’', 'PartitionLocker claimed all required keys') + status.debug('๐Ÿ”’', 'PartitionLocker claimed all required keys') } catch (error) { status.error('๐Ÿงจ', 'PartitionLocker errored to claim keys', { error: error.message, diff --git a/plugin-server/src/main/ingestion-queues/session-recording/services/realtime-manager.ts b/plugin-server/src/main/ingestion-queues/session-recording/services/realtime-manager.ts index 1fa34e252e6fa..7571ed0835f53 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/services/realtime-manager.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/services/realtime-manager.ts @@ -54,7 +54,6 @@ export class RealtimeManager extends EventEmitter { try { const subMessage = JSON.parse(message) as { team_id: number; session_id: string } this.emitSubscriptionEvent(subMessage.team_id, subMessage.session_id) - status.info('๐Ÿ”Œ', 'RealtimeManager recevied realtime request', subMessage) } catch (e) { captureException('Failed to parse message from redis pubsub', e) } diff --git a/plugin-server/src/main/ingestion-queues/session-recording/services/session-manager.ts b/plugin-server/src/main/ingestion-queues/session-recording/services/session-manager.ts index 4e4dbc9719f13..9251f67032724 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/services/session-manager.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/services/session-manager.ts @@ -528,7 +528,7 @@ export class SessionManager { }) this.realtimeTail.on('line', async (data: string) => { - status.info('โšก๏ธ', '[session-manager][realtime] writing to redis', { + status.debug('โšก๏ธ', '[session-manager][realtime] writing to redis', { sessionId: this.sessionId, teamId: this.teamId, }) diff --git a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v2.ts b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v2.ts index 4fc7eca87a34b..939df4cf80f0f 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v2.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v2.ts @@ -8,12 +8,11 @@ import { sessionRecordingConsumerConfig } from '../../../config/config' import { KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS } from '../../../config/kafka-topics' import { BatchConsumer, startBatchConsumer } from '../../../kafka/batch-consumer' import { createRdConnectionConfigFromEnvVars } from '../../../kafka/config' +import { runInstrumentedFunction } from '../../../main/utils' import { PipelineEvent, PluginsServerConfig, RawEventMessage, RedisPool, TeamId } from '../../../types' import { BackgroundRefresher } from '../../../utils/background-refresher' import { PostgresRouter } from '../../../utils/db/postgres' -import { timeoutGuard } from '../../../utils/db/utils' import { status } from '../../../utils/status' -import { asyncTimeoutGuard } from '../../../utils/timing' import { fetchTeamTokensWithRecordings } from '../../../worker/ingestion/team-manager' import { ObjectStorage } from '../../services/object_storage' import { addSentryBreadcrumbsEventListeners } from '../kafka-metrics' @@ -183,11 +182,12 @@ export class SessionRecordingIngesterV2 { op: 'checkHighWaterMark', }) - if (await this.offsetHighWaterMarker.isBelowHighWaterMark(event.metadata, session_id, offset)) { + // Check that we are not below the high water mark for this partition (another consumer may have flushed further than us when revoking) + if (await this.offsetHighWaterMarker.isBelowHighWaterMark(event.metadata, HIGH_WATERMARK_KEY, offset)) { eventDroppedCounter .labels({ event_type: 'session_recordings_blob_ingestion', - drop_cause: 'high_water_mark', + drop_cause: 'high_water_mark_partition', }) .inc() @@ -195,12 +195,11 @@ export class SessionRecordingIngesterV2 { return } - // Check that we are not below the high water mark for this partition (another consumer may have flushed further than us when revoking) - if (await this.offsetHighWaterMarker.isBelowHighWaterMark(event.metadata, HIGH_WATERMARK_KEY, offset)) { + if (await this.offsetHighWaterMarker.isBelowHighWaterMark(event.metadata, session_id, offset)) { eventDroppedCounter .labels({ event_type: 'session_recordings_blob_ingestion', - drop_cause: 'high_water_mark_partition', + drop_cause: 'high_water_mark', }) .inc() @@ -309,9 +308,10 @@ export class SessionRecordingIngesterV2 { } public async handleEachBatch(messages: Message[]): Promise { - await asyncTimeoutGuard( - { message: 'Processing batch is taking longer than 60 seconds', timeout: 60 * 1000 }, - async () => { + await runInstrumentedFunction({ + statsKey: `recordingingester.handleEachBatch`, + logExecutionTime: true, + func: async () => { const transaction = Sentry.startTransaction({ name: `blobIngestion_handleEachBatch` }, {}) histogramKafkaBatchSize.observe(messages.length) @@ -321,53 +321,63 @@ export class SessionRecordingIngesterV2 { await this.partitionLocker.claim(messages) } - for (const message of messages) { - const { partition, offset, timestamp } = message + await runInstrumentedFunction({ + statsKey: `recordingingester.handleEachBatch.parseKafkaMessages`, + func: async () => { + for (const message of messages) { + const { partition, offset, timestamp } = message - if (timestamp && this.partitionAssignments[partition]) { - const metrics = this.partitionAssignments[partition] + if (timestamp && this.partitionAssignments[partition]) { + const metrics = this.partitionAssignments[partition] - // For some reason timestamp can be null. If it isn't, update our ingestion metrics - metrics.lastMessageTimestamp = timestamp - // If we don't have a last known commit then set it to this offset as we can't commit lower than that - metrics.lastKnownCommit = metrics.lastKnownCommit ?? offset - metrics.lastMessageOffset = offset + // For some reason timestamp can be null. If it isn't, update our ingestion metrics + metrics.lastMessageTimestamp = timestamp + // If we don't have a last known commit then set it to this offset as we can't commit lower than that + metrics.lastKnownCommit = metrics.lastKnownCommit ?? offset + metrics.lastMessageOffset = offset - counterKafkaMessageReceived.inc({ partition }) + counterKafkaMessageReceived.inc({ partition }) - gaugeLagMilliseconds - .labels({ - partition: partition.toString(), - }) - .set(now() - timestamp) + gaugeLagMilliseconds + .labels({ + partition: partition.toString(), + }) + .set(now() - timestamp) - const offsetsByPartition = await this.offsetsRefresher.get() - const highOffset = offsetsByPartition[partition] + const offsetsByPartition = await this.offsetsRefresher.get() + const highOffset = offsetsByPartition[partition] - if (highOffset) { - // NOTE: This is an important metric used by the autoscaler - gaugeLag.set({ partition }, Math.max(0, highOffset - metrics.lastMessageOffset)) - } - } + if (highOffset) { + // NOTE: This is an important metric used by the autoscaler + gaugeLag.set({ partition }, Math.max(0, highOffset - metrics.lastMessageOffset)) + } + } - const recordingMessage = await this.parseKafkaMessage(message, (token) => - this.teamsRefresher.get().then((teams) => teams[token] || null) - ) + const recordingMessage = await this.parseKafkaMessage(message, (token) => + this.teamsRefresher.get().then((teams) => teams[token] || null) + ) - if (recordingMessage) { - recordingMessages.push(recordingMessage) - } - } + if (recordingMessage) { + recordingMessages.push(recordingMessage) + } + } + }, + }) - for (const message of recordingMessages) { - const consumeSpan = transaction?.startChild({ - op: 'blobConsume', - }) + await runInstrumentedFunction({ + statsKey: `recordingingester.handleEachBatch.consumeSerial`, + func: async () => { + for (const message of recordingMessages) { + const consumeSpan = transaction?.startChild({ + op: 'blobConsume', + }) - await this.consume(message, consumeSpan) - // TODO: We could do this as batch of offsets for the whole lot... - consumeSpan?.finish() - } + await this.consume(message, consumeSpan) + // TODO: We could do this as batch of offsets for the whole lot... + consumeSpan?.finish() + } + }, + }) for (const message of messages) { // Now that we have consumed everything, attempt to commit all messages in this batch @@ -375,14 +385,22 @@ export class SessionRecordingIngesterV2 { await this.commitOffset(message.topic, partition, offset) } - await this.replayEventsIngester.consumeBatch(recordingMessages) - const timeout = timeoutGuard(`Flushing sessions timed out`, {}, 120 * 1000) - await this.flushAllReadySessions() - clearTimeout(timeout) + await runInstrumentedFunction({ + statsKey: `recordingingester.handleEachBatch.consumeReplayEvents`, + func: async () => { + await this.replayEventsIngester.consumeBatch(recordingMessages) + }, + }) + await runInstrumentedFunction({ + statsKey: `recordingingester.handleEachBatch.flushAllReadySessions`, + func: async () => { + await this.flushAllReadySessions() + }, + }) transaction.finish() - } - ) + }, + }) } public async start(): Promise { @@ -549,12 +567,19 @@ export class SessionRecordingIngesterV2 { // - have some sort of timeout so we don't get stuck here forever if (this.serverConfig.SESSION_RECORDING_PARTITION_REVOKE_OPTIMIZATION) { status.info('๐Ÿ”', `blob_ingester_consumer - flushing ${sessionsToDrop.length} sessions on revoke...`) - await Promise.allSettled( - sessionsToDrop - .map(([_, x]) => x) - .sort((x) => x.buffer.oldestKafkaTimestamp ?? Infinity) - .map((x) => x.flush('partition_shutdown')) - ) + + await runInstrumentedFunction({ + statsKey: `recordingingester.onRevokePartitions.flushSessions`, + logExecutionTime: true, + func: async () => { + await Promise.allSettled( + sessionsToDrop + .map(([_, x]) => x) + .sort((x) => x.buffer.oldestKafkaTimestamp ?? Infinity) + .map((x) => x.flush('partition_shutdown')) + ) + }, + }) } topicPartitions.forEach((topicPartition: TopicPartition) => { diff --git a/plugin-server/src/main/utils.ts b/plugin-server/src/main/utils.ts index e5d970395b6dc..8d83bddee92aa 100644 --- a/plugin-server/src/main/utils.ts +++ b/plugin-server/src/main/utils.ts @@ -4,38 +4,54 @@ import { exponentialBuckets, Histogram } from 'prom-client' import { timeoutGuard } from '../utils/db/utils' import { status } from '../utils/status' -interface FunctionInstrumentation { - event: E - timeoutMessage: string +interface FunctionInstrumentation { statsKey: string - func: (event: E) => Promise - teamId: number + func: () => Promise + timeout?: number + timeoutMessage?: string + timeoutContext?: () => Record + teamId?: number + logExecutionTime?: boolean } -export async function runInstrumentedFunction({ +const logTime = (startTime: number, statsKey: string, error?: any) => { + status.info('โฑ๏ธ', `${statsKey} took ${Math.round(performance.now() - startTime)}ms`, { + error, + }) +} + +export async function runInstrumentedFunction({ timeoutMessage, - event, + timeout, + timeoutContext, func, statsKey, teamId, -}: FunctionInstrumentation): Promise { - const timeout = timeoutGuard(timeoutMessage, { - event: JSON.stringify(event), - }) + logExecutionTime = false, +}: FunctionInstrumentation): Promise { + const t = timeoutGuard(timeoutMessage ?? `Timeout warning for '${statsKey}'!`, timeoutContext, timeout) + const startTime = performance.now() const end = instrumentedFunctionDuration.startTimer({ function: statsKey, }) + try { - const result = await func(event) + const result = await func() end({ success: 'true' }) + if (logExecutionTime) { + logTime(startTime, statsKey) + } return result } catch (error) { end({ success: 'false' }) status.info('๐Ÿ””', error) + if (logExecutionTime) { + logTime(startTime, statsKey, error) + } Sentry.captureException(error, { tags: { team_id: teamId } }) throw error } finally { - clearTimeout(timeout) + clearTimeout(t) } } diff --git a/plugin-server/src/utils/db/utils.ts b/plugin-server/src/utils/db/utils.ts index 9e4eb0a3c11b7..a242445047cff 100644 --- a/plugin-server/src/utils/db/utils.ts +++ b/plugin-server/src/utils/db/utils.ts @@ -30,12 +30,13 @@ export function sanitizeEventName(eventName: any): string { export function timeoutGuard( message: string, - context?: Record, + context?: Record | (() => Record), timeout = defaultConfig.TASK_TIMEOUT * 1000 ): NodeJS.Timeout { return setTimeout(() => { - console.log(`โŒ›โŒ›โŒ› ${message}`, context) - Sentry.captureMessage(message, context ? { extra: context } : undefined) + const ctx = typeof context === 'function' ? context() : context + console.log(`โŒ›โŒ›โŒ› ${message}`, ctx) + Sentry.captureMessage(message, ctx ? { extra: ctx } : undefined) }, timeout) } diff --git a/plugin-server/src/worker/ingestion/event-pipeline/pluginsProcessEventStep.ts b/plugin-server/src/worker/ingestion/event-pipeline/pluginsProcessEventStep.ts index 6e12cb5a8a5ea..0b0b024b3e229 100644 --- a/plugin-server/src/worker/ingestion/event-pipeline/pluginsProcessEventStep.ts +++ b/plugin-server/src/worker/ingestion/event-pipeline/pluginsProcessEventStep.ts @@ -9,8 +9,10 @@ export async function pluginsProcessEventStep( event: PluginEvent ): Promise { const processedEvent = await runInstrumentedFunction({ - event, - func: (event) => runProcessEvent(runner.hub, event), + timeoutContext: () => ({ + event: JSON.stringify(event), + }), + func: () => runProcessEvent(runner.hub, event), statsKey: 'kafka_queue.single_event', timeoutMessage: 'Still running plugins on event. Timeout warning after 30 sec!', teamId: event.team_id, diff --git a/plugin-server/src/worker/ingestion/event-pipeline/runAsyncHandlersStep.ts b/plugin-server/src/worker/ingestion/event-pipeline/runAsyncHandlersStep.ts index 463065c744fdb..086ee39ebac8d 100644 --- a/plugin-server/src/worker/ingestion/event-pipeline/runAsyncHandlersStep.ts +++ b/plugin-server/src/worker/ingestion/event-pipeline/runAsyncHandlersStep.ts @@ -10,8 +10,10 @@ export async function processOnEventStep(runner: EventPipelineRunner, event: Pos const processedPluginEvent = convertToProcessedPluginEvent(event) await runInstrumentedFunction({ - event: processedPluginEvent, - func: (event) => runOnEvent(runner.hub, event), + timeoutContext: () => ({ + event: JSON.stringify(processedPluginEvent), + }), + func: () => runOnEvent(runner.hub, processedPluginEvent), statsKey: `kafka_queue.single_on_event`, timeoutMessage: `After 30 seconds still running onEvent`, teamId: event.teamId,