Skip to content

Commit

Permalink
chore(plugin-server): add prometheus metrics to match statsd ones, re…
Browse files Browse the repository at this point in the history
…move unused metrics (#19000)
  • Loading branch information
bretthoerner authored Dec 1, 2023
1 parent 79b28d0 commit f470984
Show file tree
Hide file tree
Showing 49 changed files with 440 additions and 427 deletions.
8 changes: 4 additions & 4 deletions plugin-server/src/main/graphile-worker/graphile-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import { instrument } from '../../utils/metrics'
import { runRetriableFunction } from '../../utils/retries'
import { status } from '../../utils/status'
import { createPostgresPool } from '../../utils/utils'
import { graphileEnqueueJobCounter } from './metrics'

export interface InstrumentationContext {
key: string
Expand Down Expand Up @@ -80,10 +81,7 @@ export class GraphileWorker {
enqueueFn = () =>
runRetriableFunction({
hub: this.hub,
metricName: 'job_queues_enqueue',
metricTags: {
jobName,
},
metricName: `job_queues_enqueue_${jobName}`,
maxAttempts: 10,
retryBaseMs: 6000,
retryMultiplier: 2,
Expand All @@ -110,8 +108,10 @@ export class GraphileWorker {
try {
await this.addJob(jobName, job)
this.hub.statsd?.increment('enqueue_job.success', { jobName })
graphileEnqueueJobCounter.labels({ status: 'success', job: jobName }).inc()
} catch (error) {
this.hub.statsd?.increment('enqueue_job.fail', { jobName })
graphileEnqueueJobCounter.labels({ status: 'fail', job: jobName }).inc()
throw error
}
}
Expand Down
13 changes: 13 additions & 0 deletions plugin-server/src/main/graphile-worker/metrics.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import { Counter } from 'prom-client'

export const graphileEnqueueJobCounter = new Counter({
name: 'graphile_enqueue_job',
help: 'Result status of enqueueing a job to the graphile worker queue',
labelNames: ['status', 'job'],
})

export const graphileScheduledTaskCounter = new Counter({
name: 'graphile_scheduled_task',
help: 'Graphile scheduled task status change',
labelNames: ['status', 'task'],
})
4 changes: 4 additions & 0 deletions plugin-server/src/main/graphile-worker/schedule.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { Hub, PluginConfigId } from '../../types'
import { status } from '../../utils/status'
import { delay } from '../../utils/utils'
import Piscina from '../../worker/piscina'
import { graphileScheduledTaskCounter } from './metrics'

type TaskTypes = 'runEveryMinute' | 'runEveryHour' | 'runEveryDay'

Expand Down Expand Up @@ -46,6 +47,7 @@ export async function runScheduledTasks(
runAt: helpers.job.run_at,
})
server.statsd?.increment('skipped_scheduled_tasks', { taskType })
graphileScheduledTaskCounter.labels({ status: 'skipped', task: taskType }).inc()
return
}

Expand All @@ -57,12 +59,14 @@ export async function runScheduledTasks(
messages: [{ key: pluginConfigId.toString(), value: JSON.stringify({ taskType, pluginConfigId }) }],
})
server.statsd?.increment('queued_scheduled_task', { taskType })
graphileScheduledTaskCounter.labels({ status: 'queued', task: taskType }).inc()
}
} else {
for (const pluginConfigId of server.pluginSchedule?.[taskType] || []) {
status.info('⏲️', `Running ${taskType} for plugin config with ID ${pluginConfigId}`)
await piscina.run({ task: taskType, args: { pluginConfigId } })
server.statsd?.increment('completed_scheduled_task', { taskType })
graphileScheduledTaskCounter.labels({ status: 'completed', task: taskType }).inc()
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import { ingestionPartitionKeyOverflowed } from '../analytics-events-ingestion-c
import { IngestionConsumer } from '../kafka-queue'
import { eventDroppedCounter, latestOffsetTimestampGauge } from '../metrics'
import {
ingestEventBatchingBatchCountSummary,
ingestEventBatchingInputLengthSummary,
ingestionOverflowingMessagesTotal,
ingestionParallelism,
ingestionParallelismPotential,
Expand Down Expand Up @@ -117,9 +119,11 @@ export async function eachBatchParallelIngestion(
const splitBatch = splitIngestionBatch(tokenBlockList, messages, overflowMode)
splitBatch.toProcess.sort((a, b) => a.length - b.length)

ingestEventBatchingInputLengthSummary.observe(messages.length)
queue.pluginsServer.statsd?.histogram('ingest_event_batching.input_length', messages.length, {
key: metricKey,
})
ingestEventBatchingBatchCountSummary.observe(splitBatch.toProcess.length)
queue.pluginsServer.statsd?.histogram('ingest_event_batching.batch_count', splitBatch.toProcess.length, {
key: metricKey,
})
Expand Down Expand Up @@ -241,7 +245,6 @@ export async function eachBatchParallelIngestion(
}ms (${loggingKey})`
)
} finally {
queue.pluginsServer.statsd?.timing(`kafka_queue.${loggingKey}`, batchStartTimer)
transaction.finish()
}
}
Expand All @@ -251,18 +254,8 @@ async function ingestEvent(
event: PipelineEvent,
checkAndPause?: () => void // pause incoming messages if we are slow in getting them out again
): Promise<EventPipelineResult> {
const eachEventStartTimer = new Date()

checkAndPause?.()

server.statsd?.increment('kafka_queue_ingest_event_hit', {
pipeline: 'runEventPipeline',
})

const result = await runEventPipeline(server, event)

server.statsd?.timing('kafka_queue.each_event', eachEventStartTimer)

return result
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
import * as Sentry from '@sentry/node'
import { EachBatchPayload, KafkaMessage } from 'kafkajs'
import { EachBatchPayload } from 'kafkajs'

import { PluginConfig, PluginMethod, RawClickHouseEvent } from '../../../types'
import { convertToIngestionEvent, convertToPostHogEvent } from '../../../utils/event'
import { status } from '../../../utils/status'
import {
processComposeWebhookStep,
processOnEventStep,
} from '../../../worker/ingestion/event-pipeline/runAsyncHandlersStep'
import { runInstrumentedFunction } from '../../utils'
import { KafkaJSIngestionConsumer } from '../kafka-queue'
import { eventDroppedCounter, latestOffsetTimestampGauge } from '../metrics'
import { eventDroppedCounter } from '../metrics'
import { eachBatchHandlerHelper } from './each-batch-webhooks'

// Must require as `tsc` strips unused `import` statements and just requiring this seems to init some globals
Expand Down Expand Up @@ -90,75 +88,3 @@ export async function eachBatchAppsOnEventHandlers(
'on_event'
)
}

export async function eachBatch(
/**
* Using the provided groupIntoBatches function, split the incoming batch into micro-batches
* that are executed **sequentially**, committing offsets after each of them.
* Events within a single micro-batch are processed in parallel.
*/
{ batch, resolveOffset, heartbeat, commitOffsetsIfNecessary, isRunning, isStale }: EachBatchPayload,
queue: KafkaJSIngestionConsumer,
eachMessage: (message: KafkaMessage, queue: KafkaJSIngestionConsumer) => Promise<void>,
groupIntoBatches: (messages: KafkaMessage[], batchSize: number) => KafkaMessage[][],
key: string
): Promise<void> {
const batchStartTimer = new Date()
const loggingKey = `each_batch_${key}`

const transaction = Sentry.startTransaction({ name: `eachBatch(${eachMessage.name})` }, { topic: queue.topic })

try {
const messageBatches = groupIntoBatches(
batch.messages,
queue.pluginsServer.WORKER_CONCURRENCY * queue.pluginsServer.TASKS_PER_WORKER
)
queue.pluginsServer.statsd?.histogram('ingest_event_batching.input_length', batch.messages.length, { key: key })
queue.pluginsServer.statsd?.histogram('ingest_event_batching.batch_count', messageBatches.length, { key: key })

for (const messageBatch of messageBatches) {
const batchSpan = transaction.startChild({ op: 'messageBatch', data: { batchLength: messageBatch.length } })

if (!isRunning() || isStale()) {
status.info('🚪', `Bailing out of a batch of ${batch.messages.length} events (${loggingKey})`, {
isRunning: isRunning(),
isStale: isStale(),
msFromBatchStart: new Date().valueOf() - batchStartTimer.valueOf(),
})
await heartbeat()
return
}

const lastBatchMessage = messageBatch[messageBatch.length - 1]
await Promise.all(
messageBatch.map((message: KafkaMessage) => eachMessage(message, queue).finally(() => heartbeat()))
)

// this if should never be false, but who can trust computers these days
if (lastBatchMessage) {
resolveOffset(lastBatchMessage.offset)
}
await commitOffsetsIfNecessary()

// Record that latest messages timestamp, such that we can then, for
// instance, alert on if this value is too old.
latestOffsetTimestampGauge
.labels({ partition: batch.partition, topic: batch.topic, groupId: key })
.set(Number.parseInt(lastBatchMessage.timestamp))

await heartbeat()

batchSpan.finish()
}

status.debug(
'🧩',
`Kafka batch of ${batch.messages.length} events completed in ${
new Date().valueOf() - batchStartTimer.valueOf()
}ms (${loggingKey})`
)
} finally {
queue.pluginsServer.statsd?.timing(`kafka_queue.${loggingKey}`, batchStartTimer)
transaction.finish()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ import { PostIngestionEvent, RawClickHouseEvent } from '../../../types'
import { DependencyUnavailableError } from '../../../utils/db/error'
import { convertToIngestionEvent, convertToProcessedPluginEvent } from '../../../utils/event'
import { status } from '../../../utils/status'
import { pipelineStepErrorCounter, pipelineStepMsSummary } from '../../../worker/ingestion/event-pipeline/metrics'
import { processWebhooksStep } from '../../../worker/ingestion/event-pipeline/runAsyncHandlersStep'
import { HookCommander } from '../../../worker/ingestion/hooks'
import { runInstrumentedFunction } from '../../utils'
import { eventDroppedCounter, latestOffsetTimestampGauge } from '../metrics'
import { ingestEventBatchingBatchCountSummary, ingestEventBatchingInputLengthSummary } from './metrics'

// Must require as `tsc` strips unused `import` statements and just requiring this seems to init some globals
require('@sentry/tracing')
Expand Down Expand Up @@ -95,6 +97,8 @@ export async function eachBatchHandlerHelper(

statsd?.histogram('ingest_event_batching.input_length', batch.messages.length, { key: key })
statsd?.histogram('ingest_event_batching.batch_count', batchesWithOffsets.length, { key: key })
ingestEventBatchingInputLengthSummary.observe(batch.messages.length)
ingestEventBatchingBatchCountSummary.observe(batchesWithOffsets.length)

for (const { eventBatch, lastOffset, lastTimestamp } of batchesWithOffsets) {
const batchSpan = transaction.startChild({ op: 'messageBatch', data: { batchLength: eventBatch.length } })
Expand Down Expand Up @@ -134,7 +138,6 @@ export async function eachBatchHandlerHelper(
}ms (${loggingKey})`
)
} finally {
statsd?.timing(`kafka_queue.${loggingKey}`, batchStartTimer)
transaction.finish()
}
}
Expand Down Expand Up @@ -178,13 +181,13 @@ async function runWebhooks(
const timer = new Date()

try {
statsd?.increment('kafka_queue.event_pipeline.start', { pipeline: 'webhooks' })
await processWebhooksStep(event, actionMatcher, hookCannon)
statsd?.increment('kafka_queue.webhooks.processed')
statsd?.increment('kafka_queue.event_pipeline.step', { step: processWebhooksStep.name })
statsd?.timing('kafka_queue.event_pipeline.step.timing', timer, { step: processWebhooksStep.name })
pipelineStepMsSummary.labels('processWebhooksStep').observe(Date.now() - timer.getTime())
} catch (error) {
statsd?.increment('kafka_queue.event_pipeline.step.error', { step: processWebhooksStep.name })
pipelineStepErrorCounter.labels('processWebhooksStep').inc()

if (error instanceof DependencyUnavailableError) {
// If this is an error with a dependency that we control, we want to
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Counter, exponentialBuckets, Histogram } from 'prom-client' // but fail to commit offsets, which can cause duplicate events
import { Counter, exponentialBuckets, Histogram, Summary } from 'prom-client' // but fail to commit offsets, which can cause duplicate events

// The following two counters can be used to see how often we start,
// but fail to commit offsets, which can cause duplicate events
Expand Down Expand Up @@ -29,3 +29,15 @@ export const ingestionParallelismPotential = new Histogram({
labelNames: ['overflow_mode'],
buckets: exponentialBuckets(1, 2, 7), // Up to 64
})

export const ingestEventBatchingInputLengthSummary = new Summary({
name: 'ingest_event_batching_input_length',
help: 'Length of input batches of events',
percentiles: [0.5, 0.9, 0.95, 0.99],
})

export const ingestEventBatchingBatchCountSummary = new Summary({
name: 'ingest_event_batching_batch_count',
help: 'Number of batches of events',
percentiles: [0.5, 0.9, 0.95, 0.99],
})
Loading

0 comments on commit f470984

Please sign in to comment.