Skip to content

Commit

Permalink
chore(plugin-server): remove statsd
Browse files Browse the repository at this point in the history
  • Loading branch information
bretthoerner committed Dec 1, 2023
1 parent 5b7385a commit 9de1886
Show file tree
Hide file tree
Showing 53 changed files with 60 additions and 536 deletions.
3 changes: 0 additions & 3 deletions plugin-server/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,6 @@ There's a multitude of settings you can use to control the plugin server. Use th
| KAFKA_MAX_MESSAGE_BATCH_SIZE | Kafka producer batch max size in bytes before flushing | `900000` |
| LOG_LEVEL | minimum log level | `'info'` |
| SENTRY_DSN | Sentry ingestion URL | `null` |
| STATSD_HOST | StatsD host - integration disabled if this is not provided | `null` |
| STATSD_PORT | StatsD port | `8125` |
| STATSD_PREFIX | StatsD prefix | `'plugin-server.'` |
| DISABLE_MMDB | whether to disable MMDB IP location capabilities | `false` |
| INTERNAL_MMDB_SERVER_PORT | port of the internal server used for IP location (0 means random) | `0` |
| DISTINCT_ID_LRU_SIZE | size of persons distinct ID LRU cache | `10000` |
Expand Down
1 change: 0 additions & 1 deletion plugin-server/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@
"fast-deep-equal": "^3.1.3",
"generic-pool": "^3.7.1",
"graphile-worker": "0.13.0",
"hot-shots": "^9.2.0",
"ioredis": "^4.27.6",
"ipaddr.js": "^2.1.0",
"kafkajs": "^2.2.0",
Expand Down
20 changes: 0 additions & 20 deletions plugin-server/pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions plugin-server/src/backfill.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ export async function startBackfill() {
defaultConfig.PLUGIN_SERVER_MODE = null // Disable all consuming capabilities
const noCapability = {}
initApp(defaultConfig)
const [hub, closeHub] = await createHub(defaultConfig, null, noCapability)
const [hub, closeHub] = await createHub(defaultConfig, noCapability)
status.info('🏁', 'Bootstraping done, starting to backfill')

await runBackfill(hub)
Expand Down Expand Up @@ -101,7 +101,7 @@ async function retrieveEvents(
WHERE _timestamp >= '${chTimestampLower}'
AND _timestamp < '${chTimestampHigher}'
AND timestamp >= '${chTimestampLowerTS}'
AND timestamp < '${chTimestampHigherTS}'
AND timestamp < '${chTimestampHigherTS}'
AND event IN ('$merge_dangerously', '$create_alias', '$identify')
AND ((event = '$identify' and JSONExtractString(properties, '$anon_distinct_id') != '') OR
(event != '$identify' and JSONExtractString(properties, 'alias') != ''))
Expand Down
3 changes: 0 additions & 3 deletions plugin-server/src/config/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,6 @@ export function getDefaultConfig(): PluginsServerConfig {
SENTRY_PLUGIN_SERVER_TRACING_SAMPLE_RATE: 0,
SENTRY_PLUGIN_SERVER_PROFILING_SAMPLE_RATE: 0,
HTTP_SERVER_PORT: DEFAULT_HTTP_SERVER_PORT,
STATSD_HOST: null,
STATSD_PORT: 8125,
STATSD_PREFIX: 'plugin-server.',
SCHEDULE_LOCK_TTL: 60,
REDIS_POOL_MIN_SIZE: 1,
REDIS_POOL_MAX_SIZE: 3,
Expand Down
3 changes: 0 additions & 3 deletions plugin-server/src/main/graphile-worker/graphile-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ export class GraphileWorker {
}

await instrument(
this.hub.statsd,
{
metricName: 'job_queues_enqueue_${jobName}',
key: instrumentationContext?.key ?? '?',
Expand All @@ -106,10 +105,8 @@ export class GraphileWorker {
async _enqueue(jobName: string, job: EnqueuedJob): Promise<void> {
try {
await this.addJob(jobName, job)
this.hub.statsd?.increment('enqueue_job.success', { jobName })
graphileEnqueueJobCounter.labels({ status: 'success', job: jobName }).inc()
} catch (error) {
this.hub.statsd?.increment('enqueue_job.fail', { jobName })
graphileEnqueueJobCounter.labels({ status: 'fail', job: jobName }).inc()
throw error
}
Expand Down
3 changes: 0 additions & 3 deletions plugin-server/src/main/graphile-worker/schedule.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ export async function runScheduledTasks(
taskType: taskType,
runAt: helpers.job.run_at,
})
server.statsd?.increment('skipped_scheduled_tasks', { taskType })
graphileScheduledTaskCounter.labels({ status: 'skipped', task: taskType }).inc()
return
}
Expand All @@ -58,14 +57,12 @@ export async function runScheduledTasks(
topic: KAFKA_SCHEDULED_TASKS,
messages: [{ key: pluginConfigId.toString(), value: JSON.stringify({ taskType, pluginConfigId }) }],
})
server.statsd?.increment('queued_scheduled_task', { taskType })
graphileScheduledTaskCounter.labels({ status: 'queued', task: taskType }).inc()
}
} else {
for (const pluginConfigId of server.pluginSchedule?.[taskType] || []) {
status.info('⏲️', `Running ${taskType} for plugin config with ID ${pluginConfigId}`)
await piscina.run({ task: taskType, args: { pluginConfigId } })
server.statsd?.increment('completed_scheduled_task', { taskType })
graphileScheduledTaskCounter.labels({ status: 'completed', task: taskType }).inc()
}
}
Expand Down
3 changes: 0 additions & 3 deletions plugin-server/src/main/graphile-worker/worker-setup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,6 @@ export function getPluginJobHandlers(hub: Hub, graphileWorker: GraphileWorker, p
pluginJob: async (job) => {
const jobType = (job as EnqueuedPluginJob)?.type
jobsTriggeredCounter.labels(jobType).inc()
hub.statsd?.increment('triggered_job', {
instanceId: hub.instanceId.toString(),
})
try {
await piscina.run({ task: 'runPluginJob', args: { job: job as EnqueuedPluginJob } })
jobsExecutionSuccessCounter.labels(jobType).inc()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,13 +120,7 @@ export async function eachBatchParallelIngestion(
splitBatch.toProcess.sort((a, b) => a.length - b.length)

ingestEventBatchingInputLengthSummary.observe(messages.length)
queue.pluginsServer.statsd?.histogram('ingest_event_batching.input_length', messages.length, {
key: metricKey,
})
ingestEventBatchingBatchCountSummary.observe(splitBatch.toProcess.length)
queue.pluginsServer.statsd?.histogram('ingest_event_batching.batch_count', splitBatch.toProcess.length, {
key: metricKey,
})
prepareSpan.finish()

const processingPromises: Array<Promise<void>> = []
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ export async function eachBatchAppsOnEventHandlers(
payload,
(teamId) => queue.pluginsServer.pluginConfigsPerTeam.has(teamId),
(event) => eachMessageAppsOnEventHandlers(event, queue),
queue.pluginsServer.statsd,
queue.pluginsServer.WORKER_CONCURRENCY * queue.pluginsServer.TASKS_PER_WORKER,
'on_event'
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import * as Sentry from '@sentry/node'
import { StatsD } from 'hot-shots'
import { EachBatchPayload, KafkaMessage } from 'kafkajs'
import { Counter } from 'prom-client'
import { ActionMatcher } from 'worker/ingestion/action-matcher'
Expand Down Expand Up @@ -62,14 +61,12 @@ export async function eachBatchWebhooksHandlers(
payload: EachBatchPayload,
actionMatcher: ActionMatcher,
hookCannon: HookCommander,
statsd: StatsD | undefined,
concurrency: number
): Promise<void> {
await eachBatchHandlerHelper(
payload,
(teamId) => actionMatcher.hasWebhooks(teamId),
(event) => eachMessageWebhooksHandlers(event, actionMatcher, hookCannon, statsd),
statsd,
(event) => eachMessageWebhooksHandlers(event, actionMatcher, hookCannon),
concurrency,
'webhooks'
)
Expand All @@ -79,7 +76,6 @@ export async function eachBatchHandlerHelper(
payload: EachBatchPayload,
shouldProcess: (teamId: number) => boolean,
eachMessageHandler: (event: RawClickHouseEvent) => Promise<void>,
statsd: StatsD | undefined,
concurrency: number,
stats_key: string
): Promise<void> {
Expand All @@ -95,8 +91,6 @@ export async function eachBatchHandlerHelper(
try {
const batchesWithOffsets = groupIntoBatchesByUsage(batch.messages, concurrency, shouldProcess)

statsd?.histogram('ingest_event_batching.input_length', batch.messages.length, { key: key })
statsd?.histogram('ingest_event_batching.batch_count', batchesWithOffsets.length, { key: key })
ingestEventBatchingInputLengthSummary.observe(batch.messages.length)
ingestEventBatchingBatchCountSummary.observe(batchesWithOffsets.length)

Expand Down Expand Up @@ -145,8 +139,7 @@ export async function eachBatchHandlerHelper(
export async function eachMessageWebhooksHandlers(
clickHouseEvent: RawClickHouseEvent,
actionMatcher: ActionMatcher,
hookCannon: HookCommander,
statsd: StatsD | undefined
hookCannon: HookCommander
): Promise<void> {
if (!actionMatcher.hasWebhooks(clickHouseEvent.team_id)) {
// exit early if no webhooks nor resthooks
Expand All @@ -162,7 +155,7 @@ export async function eachMessageWebhooksHandlers(
convertToProcessedPluginEvent(event)

await runInstrumentedFunction({
func: () => runWebhooks(statsd, actionMatcher, hookCannon, event),
func: () => runWebhooks(actionMatcher, hookCannon, event),
statsKey: `kafka_queue.process_async_handlers_webhooks`,
timeoutMessage: 'After 30 seconds still running runWebhooksHandlersEventPipeline',
timeoutContext: () => ({
Expand All @@ -172,21 +165,13 @@ export async function eachMessageWebhooksHandlers(
})
}

async function runWebhooks(
statsd: StatsD | undefined,
actionMatcher: ActionMatcher,
hookCannon: HookCommander,
event: PostIngestionEvent
) {
async function runWebhooks(actionMatcher: ActionMatcher, hookCannon: HookCommander, event: PostIngestionEvent) {
const timer = new Date()

try {
await processWebhooksStep(event, actionMatcher, hookCannon)
statsd?.increment('kafka_queue.event_pipeline.step', { step: processWebhooksStep.name })
statsd?.timing('kafka_queue.event_pipeline.step.timing', timer, { step: processWebhooksStep.name })
pipelineStepMsSummary.labels('processWebhooksStep').observe(Date.now() - timer.getTime())
} catch (error) {
statsd?.increment('kafka_queue.event_pipeline.step.error', { step: processWebhooksStep.name })
pipelineStepErrorCounter.labels('processWebhooksStep').inc()

if (error instanceof DependencyUnavailableError) {
Expand Down
7 changes: 1 addition & 6 deletions plugin-server/src/main/ingestion-queues/jobs-consumer.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import { StatsD } from 'hot-shots'
import { EachBatchHandler, Kafka } from 'kafkajs'
import { Counter } from 'prom-client'
import { KafkaProducerWrapper } from 'utils/db/kafka-producer-wrapper'
Expand All @@ -24,12 +23,10 @@ export const startJobsConsumer = async ({
kafka,
producer,
graphileWorker,
statsd,
}: {
kafka: Kafka
producer: KafkaProducerWrapper
graphileWorker: GraphileWorker
statsd?: StatsD
}) => {
/*
Consumes from the jobs buffer topic, and enqueues the jobs for execution
Expand Down Expand Up @@ -84,11 +81,9 @@ export const startJobsConsumer = async ({
try {
await graphileWorker.enqueue(JobName.PLUGIN_JOB, job)
jobsConsumerSuccessCounter.inc()
statsd?.increment('jobs_consumer.enqueued')
} catch (error) {
status.error('⚠️', 'Failed to enqueue anonymous event for processing', { error })
jobsConsumerFailuresCounter.inc()
statsd?.increment('jobs_consumer.enqueue_error')

throw error
}
Expand All @@ -114,7 +109,7 @@ export const startJobsConsumer = async ({
await consumer.subscribe({ topic: KAFKA_JOBS })
await consumer.run({
eachBatch: async (payload) => {
return await instrumentEachBatchKafkaJS(KAFKA_JOBS, eachBatch, payload, statsd)
return await instrumentEachBatchKafkaJS(KAFKA_JOBS, eachBatch, payload)
},
})

Expand Down
6 changes: 1 addition & 5 deletions plugin-server/src/main/ingestion-queues/kafka-metrics.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import * as Sentry from '@sentry/node'
import { StatsD } from 'hot-shots'
import { Consumer } from 'kafkajs'
import { KafkaConsumer } from 'node-rdkafka'

Expand All @@ -9,7 +8,7 @@ import {
kafkaConsumerEventRequestPendingMsSummary,
} from './metrics'

export function addMetricsEventListeners(consumer: Consumer, statsd: StatsD | undefined): void {
export function addMetricsEventListeners(consumer: Consumer): void {
const listenEvents = [
consumer.events.GROUP_JOIN,
consumer.events.CONNECT,
Expand All @@ -22,14 +21,11 @@ export function addMetricsEventListeners(consumer: Consumer, statsd: StatsD | un

listenEvents.forEach((event) => {
consumer.on(event, () => {
statsd?.increment('kafka_queue_consumer_event', { event })
kafkaConsumerEventCounter.labels(event).inc()
})
})

consumer.on(consumer.events.REQUEST, ({ payload }) => {
statsd?.timing('kafka_queue_consumer_event_request_duration', payload.duration, 0.01)
statsd?.timing('kafka_queue_consumer_event_request_pending_duration', payload.pendingDuration, 0.01)
kafkaConsumerEventRequestMsSummary.observe(payload.duration)
kafkaConsumerEventRequestPendingMsSummary.observe(payload.pendingDuration)
})
Expand Down
Loading

0 comments on commit 9de1886

Please sign in to comment.