From 1f6ce77e66dd8771a25315d7aca03add62567a40 Mon Sep 17 00:00:00 2001 From: Brett Hoerner Date: Tue, 5 Dec 2023 11:47:12 -0700 Subject: [PATCH] chore(plugin-server): remove statsd (#19027) --- plugin-server/README.md | 3 - plugin-server/package.json | 1 - plugin-server/pnpm-lock.yaml | 20 ----- plugin-server/src/backfill.ts | 4 +- plugin-server/src/config/config.ts | 3 - .../main/graphile-worker/graphile-worker.ts | 3 - .../src/main/graphile-worker/schedule.ts | 3 - .../src/main/graphile-worker/worker-setup.ts | 3 - .../batch-processing/each-batch-ingestion.ts | 6 -- .../batch-processing/each-batch-onevent.ts | 1 - .../batch-processing/each-batch-webhooks.ts | 23 +---- .../main/ingestion-queues/jobs-consumer.ts | 7 +- .../main/ingestion-queues/kafka-metrics.ts | 6 +- .../src/main/ingestion-queues/kafka-queue.ts | 32 ++----- .../on-event-handler-consumer.ts | 8 +- .../scheduled-tasks-consumer.ts | 8 +- plugin-server/src/main/pluginsServer.ts | 29 ++----- plugin-server/src/types.ts | 7 +- plugin-server/src/utils/db/db.ts | 41 +++------ plugin-server/src/utils/db/hub.ts | 43 +--------- plugin-server/src/utils/db/postgres.ts | 17 ++-- plugin-server/src/utils/metrics.ts | 29 +------ .../src/worker/ingestion/action-matcher.ts | 7 +- .../event-pipeline/pluginsProcessEventStep.ts | 3 - .../event-pipeline/populateTeamDataStep.ts | 8 -- .../event-pipeline/prepareEventStep.ts | 1 - .../event-pipeline/processPersonsStep.ts | 1 - .../worker/ingestion/event-pipeline/runner.ts | 9 -- plugin-server/src/worker/ingestion/hooks.ts | 4 - .../src/worker/ingestion/person-state.ts | 25 ------ .../src/worker/ingestion/process-event.ts | 6 +- .../ingestion/property-definitions-cache.ts | 5 +- .../ingestion/property-definitions-manager.ts | 9 +- .../src/worker/ingestion/team-manager.ts | 5 +- .../src/worker/plugins/loadPluginsFromDB.ts | 6 +- .../src/worker/plugins/loadSchedule.ts | 1 - plugin-server/src/worker/plugins/run.ts | 84 ++----------------- plugin-server/src/worker/plugins/setup.ts | 1 - .../src/worker/vm/extensions/jobs.ts | 1 - .../src/worker/vm/extensions/posthog.ts | 1 - .../src/worker/vm/extensions/storage.ts | 12 --- plugin-server/src/worker/vm/lazy.ts | 21 +---- plugin-server/src/worker/vm/vm.ts | 9 -- plugin-server/tests/main/db.test.ts | 2 - ...nalytics-events-ingestion-consumer.test.ts | 6 -- ...events-ingestion-overflow-consumer.test.ts | 6 -- .../main/ingestion-queues/each-batch.test.ts | 25 +----- .../main/ingestion-queues/kafka-queue.test.ts | 5 -- .../event-pipeline-integration.test.ts | 1 - .../pluginsProcessEventStep.test.ts | 7 -- .../ingestion/event-pipeline/runner.test.ts | 45 ---------- .../tests/worker/ingestion/hooks.test.ts | 1 - .../worker/ingestion/person-state.test.ts | 33 +------- 53 files changed, 66 insertions(+), 581 deletions(-) diff --git a/plugin-server/README.md b/plugin-server/README.md index b61cba750d9f3..16b8fb740a776 100644 --- a/plugin-server/README.md +++ b/plugin-server/README.md @@ -125,9 +125,6 @@ There's a multitude of settings you can use to control the plugin server. Use th | KAFKA_MAX_MESSAGE_BATCH_SIZE | Kafka producer batch max size in bytes before flushing | `900000` | | LOG_LEVEL | minimum log level | `'info'` | | SENTRY_DSN | Sentry ingestion URL | `null` | -| STATSD_HOST | StatsD host - integration disabled if this is not provided | `null` | -| STATSD_PORT | StatsD port | `8125` | -| STATSD_PREFIX | StatsD prefix | `'plugin-server.'` | | DISABLE_MMDB | whether to disable MMDB IP location capabilities | `false` | | INTERNAL_MMDB_SERVER_PORT | port of the internal server used for IP location (0 means random) | `0` | | DISTINCT_ID_LRU_SIZE | size of persons distinct ID LRU cache | `10000` | diff --git a/plugin-server/package.json b/plugin-server/package.json index 9c81a46063c96..7876a953def8d 100644 --- a/plugin-server/package.json +++ b/plugin-server/package.json @@ -61,7 +61,6 @@ "fast-deep-equal": "^3.1.3", "generic-pool": "^3.7.1", "graphile-worker": "0.13.0", - "hot-shots": "^9.2.0", "ioredis": "^4.27.6", "ipaddr.js": "^2.1.0", "kafkajs": "^2.2.0", diff --git a/plugin-server/pnpm-lock.yaml b/plugin-server/pnpm-lock.yaml index 6cfe55d201995..6407f25be359e 100644 --- a/plugin-server/pnpm-lock.yaml +++ b/plugin-server/pnpm-lock.yaml @@ -88,9 +88,6 @@ dependencies: graphile-worker: specifier: 0.13.0 version: 0.13.0 - hot-shots: - specifier: ^9.2.0 - version: 9.3.0 ioredis: specifier: ^4.27.6 version: 4.28.5 @@ -6484,13 +6481,6 @@ packages: minimalistic-crypto-utils: 1.0.1 dev: true - /hot-shots@9.3.0: - resolution: {integrity: sha512-e4tgWptiBvlIMnAX0ORe+dNEt0HznD+T2ckzXDUwCBsU7uWr2mwq5UtoT+Df5r9hD5S/DuP8rTxJUQvqAFSFKA==} - engines: {node: '>=6.0.0'} - optionalDependencies: - unix-dgram: 2.0.6 - dev: false - /hsl-to-rgb-for-reals@1.1.1: resolution: {integrity: sha512-LgOWAkrN0rFaQpfdWBQlv/VhkOxb5AsBjk6NQVx4yEzWS923T07X0M1Y0VNko2H52HeSpZrZNNMJ0aFqsdVzQg==} dev: true @@ -10015,16 +10005,6 @@ packages: engines: {node: '>= 10.0.0'} dev: true - /unix-dgram@2.0.6: - resolution: {integrity: sha512-AURroAsb73BZ6CdAyMrTk/hYKNj3DuYYEuOaB8bYMOHGKupRNScw90Q5C71tWJc3uE7dIeXRyuwN0xLLq3vDTg==} - engines: {node: '>=0.10.48'} - requiresBuild: true - dependencies: - bindings: 1.5.0 - nan: 2.17.0 - dev: false - optional: true - /update-browserslist-db@1.0.11(browserslist@4.21.5): resolution: {integrity: sha512-dCwEFf0/oT85M1fHBg4F0jtLwJrutGoHSQXCh7u4o2t1drG+c0a9Flnqww6XUKSfQMPpJBRjU8d4RXB09qtvaA==} hasBin: true diff --git a/plugin-server/src/backfill.ts b/plugin-server/src/backfill.ts index 467b73eaff478..e04a93e6f3b27 100644 --- a/plugin-server/src/backfill.ts +++ b/plugin-server/src/backfill.ts @@ -22,7 +22,7 @@ export async function startBackfill() { defaultConfig.PLUGIN_SERVER_MODE = null // Disable all consuming capabilities const noCapability = {} initApp(defaultConfig) - const [hub, closeHub] = await createHub(defaultConfig, null, noCapability) + const [hub, closeHub] = await createHub(defaultConfig, noCapability) status.info('🏁', 'Bootstraping done, starting to backfill') await runBackfill(hub) @@ -101,7 +101,7 @@ async function retrieveEvents( WHERE _timestamp >= '${chTimestampLower}' AND _timestamp < '${chTimestampHigher}' AND timestamp >= '${chTimestampLowerTS}' - AND timestamp < '${chTimestampHigherTS}' + AND timestamp < '${chTimestampHigherTS}' AND event IN ('$merge_dangerously', '$create_alias', '$identify') AND ((event = '$identify' and JSONExtractString(properties, '$anon_distinct_id') != '') OR (event != '$identify' and JSONExtractString(properties, 'alias') != '')) diff --git a/plugin-server/src/config/config.ts b/plugin-server/src/config/config.ts index 434cb21eb2685..5f7f064122542 100644 --- a/plugin-server/src/config/config.ts +++ b/plugin-server/src/config/config.ts @@ -80,9 +80,6 @@ export function getDefaultConfig(): PluginsServerConfig { SENTRY_PLUGIN_SERVER_TRACING_SAMPLE_RATE: 0, SENTRY_PLUGIN_SERVER_PROFILING_SAMPLE_RATE: 0, HTTP_SERVER_PORT: DEFAULT_HTTP_SERVER_PORT, - STATSD_HOST: null, - STATSD_PORT: 8125, - STATSD_PREFIX: 'plugin-server.', SCHEDULE_LOCK_TTL: 60, REDIS_POOL_MIN_SIZE: 1, REDIS_POOL_MAX_SIZE: 3, diff --git a/plugin-server/src/main/graphile-worker/graphile-worker.ts b/plugin-server/src/main/graphile-worker/graphile-worker.ts index 02a4d028f75a9..7e4e149177070 100644 --- a/plugin-server/src/main/graphile-worker/graphile-worker.ts +++ b/plugin-server/src/main/graphile-worker/graphile-worker.ts @@ -70,7 +70,6 @@ export class GraphileWorker { const enqueueFn = () => this._enqueue(jobName, job) await instrument( - this.hub.statsd, { metricName: `job_queues_enqueue_${jobName}`, key: instrumentationContext?.key ?? '?', @@ -84,10 +83,8 @@ export class GraphileWorker { async _enqueue(jobName: string, job: EnqueuedJob): Promise { try { await this.addJob(jobName, job) - this.hub.statsd?.increment('enqueue_job.success', { jobName }) graphileEnqueueJobCounter.labels({ status: 'success', job: jobName }).inc() } catch (error) { - this.hub.statsd?.increment('enqueue_job.fail', { jobName }) graphileEnqueueJobCounter.labels({ status: 'fail', job: jobName }).inc() throw error } diff --git a/plugin-server/src/main/graphile-worker/schedule.ts b/plugin-server/src/main/graphile-worker/schedule.ts index 2dfd3ef83c88b..861833e0b5bee 100644 --- a/plugin-server/src/main/graphile-worker/schedule.ts +++ b/plugin-server/src/main/graphile-worker/schedule.ts @@ -46,7 +46,6 @@ export async function runScheduledTasks( taskType: taskType, runAt: helpers.job.run_at, }) - server.statsd?.increment('skipped_scheduled_tasks', { taskType }) graphileScheduledTaskCounter.labels({ status: 'skipped', task: taskType }).inc() return } @@ -58,14 +57,12 @@ export async function runScheduledTasks( topic: KAFKA_SCHEDULED_TASKS, messages: [{ key: pluginConfigId.toString(), value: JSON.stringify({ taskType, pluginConfigId }) }], }) - server.statsd?.increment('queued_scheduled_task', { taskType }) graphileScheduledTaskCounter.labels({ status: 'queued', task: taskType }).inc() } } else { for (const pluginConfigId of server.pluginSchedule?.[taskType] || []) { status.info('⏲ī¸', `Running ${taskType} for plugin config with ID ${pluginConfigId}`) await piscina.run({ task: taskType, args: { pluginConfigId } }) - server.statsd?.increment('completed_scheduled_task', { taskType }) graphileScheduledTaskCounter.labels({ status: 'completed', task: taskType }).inc() } } diff --git a/plugin-server/src/main/graphile-worker/worker-setup.ts b/plugin-server/src/main/graphile-worker/worker-setup.ts index 8ceeeaa024395..b0650b8f744bc 100644 --- a/plugin-server/src/main/graphile-worker/worker-setup.ts +++ b/plugin-server/src/main/graphile-worker/worker-setup.ts @@ -85,9 +85,6 @@ export function getPluginJobHandlers(hub: Hub, graphileWorker: GraphileWorker, p pluginJob: async (job) => { const jobType = (job as EnqueuedPluginJob)?.type jobsTriggeredCounter.labels(jobType).inc() - hub.statsd?.increment('triggered_job', { - instanceId: hub.instanceId.toString(), - }) try { await piscina.run({ task: 'runPluginJob', args: { job: job as EnqueuedPluginJob } }) jobsExecutionSuccessCounter.labels(jobType).inc() diff --git a/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-ingestion.ts b/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-ingestion.ts index d72013a082d21..47354d9fe5113 100644 --- a/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-ingestion.ts +++ b/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-ingestion.ts @@ -120,13 +120,7 @@ export async function eachBatchParallelIngestion( splitBatch.toProcess.sort((a, b) => a.length - b.length) ingestEventBatchingInputLengthSummary.observe(messages.length) - queue.pluginsServer.statsd?.histogram('ingest_event_batching.input_length', messages.length, { - key: metricKey, - }) ingestEventBatchingBatchCountSummary.observe(splitBatch.toProcess.length) - queue.pluginsServer.statsd?.histogram('ingest_event_batching.batch_count', splitBatch.toProcess.length, { - key: metricKey, - }) prepareSpan.finish() const processingPromises: Array> = [] diff --git a/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-onevent.ts b/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-onevent.ts index 70333d5b78f2b..8a9d1fac89af2 100644 --- a/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-onevent.ts +++ b/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-onevent.ts @@ -83,7 +83,6 @@ export async function eachBatchAppsOnEventHandlers( payload, (teamId) => queue.pluginsServer.pluginConfigsPerTeam.has(teamId), (event) => eachMessageAppsOnEventHandlers(event, queue), - queue.pluginsServer.statsd, queue.pluginsServer.WORKER_CONCURRENCY * queue.pluginsServer.TASKS_PER_WORKER, 'on_event' ) diff --git a/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-webhooks.ts b/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-webhooks.ts index a1654b9e553ff..9c4f10dcbb137 100644 --- a/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-webhooks.ts +++ b/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-webhooks.ts @@ -1,5 +1,4 @@ import * as Sentry from '@sentry/node' -import { StatsD } from 'hot-shots' import { EachBatchPayload, KafkaMessage } from 'kafkajs' import { Counter } from 'prom-client' import { ActionMatcher } from 'worker/ingestion/action-matcher' @@ -62,14 +61,12 @@ export async function eachBatchWebhooksHandlers( payload: EachBatchPayload, actionMatcher: ActionMatcher, hookCannon: HookCommander, - statsd: StatsD | undefined, concurrency: number ): Promise { await eachBatchHandlerHelper( payload, (teamId) => actionMatcher.hasWebhooks(teamId), - (event) => eachMessageWebhooksHandlers(event, actionMatcher, hookCannon, statsd), - statsd, + (event) => eachMessageWebhooksHandlers(event, actionMatcher, hookCannon), concurrency, 'webhooks' ) @@ -79,7 +76,6 @@ export async function eachBatchHandlerHelper( payload: EachBatchPayload, shouldProcess: (teamId: number) => boolean, eachMessageHandler: (event: RawClickHouseEvent) => Promise, - statsd: StatsD | undefined, concurrency: number, stats_key: string ): Promise { @@ -95,8 +91,6 @@ export async function eachBatchHandlerHelper( try { const batchesWithOffsets = groupIntoBatchesByUsage(batch.messages, concurrency, shouldProcess) - statsd?.histogram('ingest_event_batching.input_length', batch.messages.length, { key: key }) - statsd?.histogram('ingest_event_batching.batch_count', batchesWithOffsets.length, { key: key }) ingestEventBatchingInputLengthSummary.observe(batch.messages.length) ingestEventBatchingBatchCountSummary.observe(batchesWithOffsets.length) @@ -145,8 +139,7 @@ export async function eachBatchHandlerHelper( export async function eachMessageWebhooksHandlers( clickHouseEvent: RawClickHouseEvent, actionMatcher: ActionMatcher, - hookCannon: HookCommander, - statsd: StatsD | undefined + hookCannon: HookCommander ): Promise { if (!actionMatcher.hasWebhooks(clickHouseEvent.team_id)) { // exit early if no webhooks nor resthooks @@ -162,7 +155,7 @@ export async function eachMessageWebhooksHandlers( convertToProcessedPluginEvent(event) await runInstrumentedFunction({ - func: () => runWebhooks(statsd, actionMatcher, hookCannon, event), + func: () => runWebhooks(actionMatcher, hookCannon, event), statsKey: `kafka_queue.process_async_handlers_webhooks`, timeoutMessage: 'After 30 seconds still running runWebhooksHandlersEventPipeline', timeoutContext: () => ({ @@ -172,21 +165,13 @@ export async function eachMessageWebhooksHandlers( }) } -async function runWebhooks( - statsd: StatsD | undefined, - actionMatcher: ActionMatcher, - hookCannon: HookCommander, - event: PostIngestionEvent -) { +async function runWebhooks(actionMatcher: ActionMatcher, hookCannon: HookCommander, event: PostIngestionEvent) { const timer = new Date() try { await processWebhooksStep(event, actionMatcher, hookCannon) - statsd?.increment('kafka_queue.event_pipeline.step', { step: processWebhooksStep.name }) - statsd?.timing('kafka_queue.event_pipeline.step.timing', timer, { step: processWebhooksStep.name }) pipelineStepMsSummary.labels('processWebhooksStep').observe(Date.now() - timer.getTime()) } catch (error) { - statsd?.increment('kafka_queue.event_pipeline.step.error', { step: processWebhooksStep.name }) pipelineStepErrorCounter.labels('processWebhooksStep').inc() if (error instanceof DependencyUnavailableError) { diff --git a/plugin-server/src/main/ingestion-queues/jobs-consumer.ts b/plugin-server/src/main/ingestion-queues/jobs-consumer.ts index 7064b68e437e2..cb632e0ea1f49 100644 --- a/plugin-server/src/main/ingestion-queues/jobs-consumer.ts +++ b/plugin-server/src/main/ingestion-queues/jobs-consumer.ts @@ -1,4 +1,3 @@ -import { StatsD } from 'hot-shots' import { EachBatchHandler, Kafka } from 'kafkajs' import { Counter } from 'prom-client' import { KafkaProducerWrapper } from 'utils/db/kafka-producer-wrapper' @@ -24,12 +23,10 @@ export const startJobsConsumer = async ({ kafka, producer, graphileWorker, - statsd, }: { kafka: Kafka producer: KafkaProducerWrapper graphileWorker: GraphileWorker - statsd?: StatsD }) => { /* Consumes from the jobs buffer topic, and enqueues the jobs for execution @@ -84,11 +81,9 @@ export const startJobsConsumer = async ({ try { await graphileWorker.enqueue(JobName.PLUGIN_JOB, job) jobsConsumerSuccessCounter.inc() - statsd?.increment('jobs_consumer.enqueued') } catch (error) { status.error('⚠ī¸', 'Failed to enqueue anonymous event for processing', { error }) jobsConsumerFailuresCounter.inc() - statsd?.increment('jobs_consumer.enqueue_error') throw error } @@ -114,7 +109,7 @@ export const startJobsConsumer = async ({ await consumer.subscribe({ topic: KAFKA_JOBS }) await consumer.run({ eachBatch: async (payload) => { - return await instrumentEachBatchKafkaJS(KAFKA_JOBS, eachBatch, payload, statsd) + return await instrumentEachBatchKafkaJS(KAFKA_JOBS, eachBatch, payload) }, }) diff --git a/plugin-server/src/main/ingestion-queues/kafka-metrics.ts b/plugin-server/src/main/ingestion-queues/kafka-metrics.ts index 3797b25fd372b..f476a1dc79786 100644 --- a/plugin-server/src/main/ingestion-queues/kafka-metrics.ts +++ b/plugin-server/src/main/ingestion-queues/kafka-metrics.ts @@ -1,5 +1,4 @@ import * as Sentry from '@sentry/node' -import { StatsD } from 'hot-shots' import { Consumer } from 'kafkajs' import { KafkaConsumer } from 'node-rdkafka' @@ -9,7 +8,7 @@ import { kafkaConsumerEventRequestPendingMsSummary, } from './metrics' -export function addMetricsEventListeners(consumer: Consumer, statsd: StatsD | undefined): void { +export function addMetricsEventListeners(consumer: Consumer): void { const listenEvents = [ consumer.events.GROUP_JOIN, consumer.events.CONNECT, @@ -22,14 +21,11 @@ export function addMetricsEventListeners(consumer: Consumer, statsd: StatsD | un listenEvents.forEach((event) => { consumer.on(event, () => { - statsd?.increment('kafka_queue_consumer_event', { event }) kafkaConsumerEventCounter.labels(event).inc() }) }) consumer.on(consumer.events.REQUEST, ({ payload }) => { - statsd?.timing('kafka_queue_consumer_event_request_duration', payload.duration, 0.01) - statsd?.timing('kafka_queue_consumer_event_request_pending_duration', payload.pendingDuration, 0.01) kafkaConsumerEventRequestMsSummary.observe(payload.duration) kafkaConsumerEventRequestPendingMsSummary.observe(payload.pendingDuration) }) diff --git a/plugin-server/src/main/ingestion-queues/kafka-queue.ts b/plugin-server/src/main/ingestion-queues/kafka-queue.ts index 28d567cac9c31..5449daf6e6cf6 100644 --- a/plugin-server/src/main/ingestion-queues/kafka-queue.ts +++ b/plugin-server/src/main/ingestion-queues/kafka-queue.ts @@ -1,5 +1,4 @@ import * as Sentry from '@sentry/node' -import { StatsD } from 'hot-shots' import { Consumer, EachBatchPayload, Kafka } from 'kafkajs' import { Message } from 'node-rdkafka' import { Counter } from 'prom-client' @@ -29,7 +28,6 @@ export class KafkaJSIngestionConsumer { public consumer: Consumer public sessionTimeout: number private kafka: Kafka - private consumerGroupMemberId: string | null private wasConsumerRan: boolean constructor(pluginsServer: Hub, topic: string, consumerGroupId: string, batchHandler: KafkaJSBatchFunction) { @@ -46,7 +44,6 @@ export class KafkaJSIngestionConsumer { ) this.wasConsumerRan = false - this.consumerGroupMemberId = null this.consumerReady = false this.eachBatch = batchHandler @@ -62,12 +59,11 @@ export class KafkaJSIngestionConsumer { ) const startPromise = new Promise(async (resolve, reject) => { - addMetricsEventListeners(this.consumer, this.pluginsServer.statsd) + addMetricsEventListeners(this.consumer) this.consumer.on(this.consumer.events.GROUP_JOIN, ({ payload }) => { status.info('ℹī¸', 'Kafka joined consumer group', JSON.stringify(payload)) this.consumerReady = true - this.consumerGroupMemberId = payload.memberId clearTimeout(timeout) resolve() }) @@ -93,12 +89,7 @@ export class KafkaJSIngestionConsumer { async eachBatchConsumer(payload: EachBatchPayload): Promise { const topic = payload.batch.topic - await instrumentEachBatchKafkaJS( - topic, - (payload) => this.eachBatch(payload, this), - payload, - this.pluginsServer.statsd - ) + await instrumentEachBatchKafkaJS(topic, (payload) => this.eachBatch(payload, this), payload) } async pause(targetTopic: string, partition?: number): Promise { @@ -213,12 +204,7 @@ export class IngestionConsumer { } async eachBatchConsumer(messages: Message[]): Promise { - await instrumentEachBatch( - this.topic, - (messages) => this.eachBatch(messages, this), - messages, - this.pluginsServer.statsd - ) + await instrumentEachBatch(this.topic, (messages) => this.eachBatch(messages, this), messages) } async stop(): Promise { @@ -282,8 +268,7 @@ type EachBatchHandler = (messages: Message[]) => Promise export const instrumentEachBatch = async ( topic: string, eachBatch: EachBatchHandler, - messages: Message[], - statsd?: StatsD + messages: Message[] ): Promise => { try { kafkaConsumerMessagesReadCounter.labels({ topic_name: topic }).inc(messages.length) @@ -291,9 +276,6 @@ export const instrumentEachBatch = async ( kafkaConsumerMessagesProcessedCounter.labels({ topic_name: topic }).inc(messages.length) } catch (error) { const eventCount = messages.length - statsd?.increment('kafka_queue_each_batch_failed_events', eventCount, { - topic: topic, - }) kafkaConsumerEachBatchFailedCounter.labels({ topic_name: topic }).inc(eventCount) status.warn('💀', `Kafka batch of ${eventCount} events for topic ${topic} failed!`) throw error @@ -303,8 +285,7 @@ export const instrumentEachBatch = async ( export const instrumentEachBatchKafkaJS = async ( topic: string, eachBatch: (payload: EachBatchPayload) => Promise, - payload: EachBatchPayload, - statsd?: StatsD + payload: EachBatchPayload ): Promise => { try { kafkaConsumerMessagesReadCounter.labels({ topic_name: topic }).inc(payload.batch.messages.length) @@ -312,9 +293,6 @@ export const instrumentEachBatchKafkaJS = async ( kafkaConsumerMessagesProcessedCounter.labels({ topic_name: topic }).inc(payload.batch.messages.length) } catch (error) { const eventCount = payload.batch.messages.length - statsd?.increment('kafka_queue_each_batch_failed_events', eventCount, { - topic: topic, - }) kafkaConsumerEachBatchFailedCounter.labels({ topic_name: topic }).inc(eventCount) status.warn('💀', `Kafka batch of ${eventCount} events for topic ${topic} failed!`, { stack: error.stack, diff --git a/plugin-server/src/main/ingestion-queues/on-event-handler-consumer.ts b/plugin-server/src/main/ingestion-queues/on-event-handler-consumer.ts index 80f4347bcb383..38653f698eaa6 100644 --- a/plugin-server/src/main/ingestion-queues/on-event-handler-consumer.ts +++ b/plugin-server/src/main/ingestion-queues/on-event-handler-consumer.ts @@ -1,4 +1,3 @@ -import { StatsD } from 'hot-shots' import { Consumer, Kafka } from 'kafkajs' import * as schedule from 'node-schedule' import { AppMetrics } from 'worker/ingestion/app-metrics' @@ -45,7 +44,6 @@ export const startAsyncWebhooksHandlerConsumer = async ({ postgres, teamManager, organizationManager, - statsd, serverConfig, appMetrics, }: { @@ -53,7 +51,6 @@ export const startAsyncWebhooksHandlerConsumer = async ({ postgres: PostgresRouter teamManager: TeamManager organizationManager: OrganizationManager - statsd: StatsD | undefined serverConfig: PluginsServerConfig appMetrics: AppMetrics }) => { @@ -76,13 +73,12 @@ export const startAsyncWebhooksHandlerConsumer = async ({ const actionManager = new ActionManager(postgres) await actionManager.prepare() - const actionMatcher = new ActionMatcher(postgres, actionManager, statsd) + const actionMatcher = new ActionMatcher(postgres, actionManager) const hookCannon = new HookCommander( postgres, teamManager, organizationManager, appMetrics, - statsd, serverConfig.EXTERNAL_REQUEST_TIMEOUT_MS ) const concurrency = serverConfig.TASKS_PER_WORKER || 20 @@ -107,7 +103,7 @@ export const startAsyncWebhooksHandlerConsumer = async ({ await consumer.subscribe({ topic: KAFKA_EVENTS_JSON, fromBeginning: false }) await consumer.run({ - eachBatch: (payload) => eachBatchWebhooksHandlers(payload, actionMatcher, hookCannon, statsd, concurrency), + eachBatch: (payload) => eachBatchWebhooksHandlers(payload, actionMatcher, hookCannon, concurrency), }) const isHealthy = makeHealthCheck(consumer, serverConfig.KAFKA_CONSUMPTION_SESSION_TIMEOUT_MS) diff --git a/plugin-server/src/main/ingestion-queues/scheduled-tasks-consumer.ts b/plugin-server/src/main/ingestion-queues/scheduled-tasks-consumer.ts index 12d096f515b22..3f550009dcc7d 100644 --- a/plugin-server/src/main/ingestion-queues/scheduled-tasks-consumer.ts +++ b/plugin-server/src/main/ingestion-queues/scheduled-tasks-consumer.ts @@ -1,4 +1,3 @@ -import { StatsD } from 'hot-shots' import { Batch, EachBatchHandler, Kafka } from 'kafkajs' import { KafkaProducerWrapper } from 'utils/db/kafka-producer-wrapper' @@ -18,13 +17,11 @@ export const startScheduledTasksConsumer = async ({ producer, piscina, partitionConcurrency = 3, - statsd, }: { kafka: Kafka producer: KafkaProducerWrapper piscina: Piscina partitionConcurrency: number - statsd?: StatsD }) => { /* @@ -77,7 +74,6 @@ export const startScheduledTasksConsumer = async ({ pluginConfigId, durationSeconds: (performance.now() - startTime) / 1000, }) - statsd?.increment('completed_scheduled_task', { taskType }) scheduledTaskCounter.labels({ status: 'completed', task: taskType }).inc() } catch (error) { // TODO: figure out a nice way to test this code path. @@ -93,7 +89,6 @@ export const startScheduledTasksConsumer = async ({ error: error, stack: error.stack, }) - statsd?.increment('retriable_scheduled_task', { taskType }) scheduledTaskCounter.labels({ status: 'error', task: taskType }).inc() throw error } @@ -105,7 +100,6 @@ export const startScheduledTasksConsumer = async ({ stack: error.stack, }) resolveOffset(message.offset) - statsd?.increment('failed_scheduled_tasks', { taskType }) scheduledTaskCounter.labels({ status: 'failed', task: taskType }).inc() } finally { clearInterval(heartbeatInterval) @@ -133,7 +127,7 @@ export const startScheduledTasksConsumer = async ({ await consumer.run({ partitionsConsumedConcurrently: partitionConcurrency, eachBatch: async (payload) => { - return await instrumentEachBatchKafkaJS(KAFKA_SCHEDULED_TASKS, eachBatch, payload, statsd) + return await instrumentEachBatchKafkaJS(KAFKA_SCHEDULED_TASKS, eachBatch, payload) }, }) diff --git a/plugin-server/src/main/pluginsServer.ts b/plugin-server/src/main/pluginsServer.ts index ccfbb5f380df9..cf51b7713eb8e 100644 --- a/plugin-server/src/main/pluginsServer.ts +++ b/plugin-server/src/main/pluginsServer.ts @@ -12,9 +12,8 @@ import v8Profiler from 'v8-profiler-next' import { getPluginServerCapabilities } from '../capabilities' import { defaultConfig, sessionRecordingConsumerConfig } from '../config/config' import { Hub, PluginServerCapabilities, PluginsServerConfig } from '../types' -import { createHub, createKafkaClient, createKafkaProducerWrapper, createStatsdClient } from '../utils/db/hub' +import { createHub, createKafkaClient, createKafkaProducerWrapper } from '../utils/db/hub' import { PostgresRouter } from '../utils/db/postgres' -import { captureEventLoopMetrics } from '../utils/metrics' import { cancelAllScheduledJobs } from '../utils/node-schedule' import { PubSub } from '../utils/pubsub' import { status } from '../utils/status' @@ -250,7 +249,7 @@ export async function startPluginsServer( // 4. conversion_events_buffer // if (capabilities.processPluginJobs || capabilities.pluginScheduledTasks) { - ;[hub, closeHub] = hub ? [hub, closeHub] : await createHub(serverConfig, null, capabilities) + ;[hub, closeHub] = hub ? [hub, closeHub] : await createHub(serverConfig, capabilities) serverInstance = serverInstance ? serverInstance : { hub } graphileWorker = new GraphileWorker(hub) @@ -273,7 +272,6 @@ export async function startPluginsServer( producer: hub.kafkaProducer, kafka: hub.kafka, partitionConcurrency: serverConfig.KAFKA_PARTITIONS_CONSUMED_CONCURRENTLY, - statsd: hub.statsd, }) } @@ -282,13 +280,12 @@ export async function startPluginsServer( kafka: hub.kafka, producer: hub.kafkaProducer, graphileWorker: graphileWorker, - statsd: hub.statsd, }) } } if (capabilities.ingestion) { - ;[hub, closeHub] = hub ? [hub, closeHub] : await createHub(serverConfig, null, capabilities) + ;[hub, closeHub] = hub ? [hub, closeHub] : await createHub(serverConfig, capabilities) serverInstance = serverInstance ? serverInstance : { hub } piscina = piscina ?? (await makePiscina(serverConfig, hub)) @@ -304,7 +301,7 @@ export async function startPluginsServer( } if (capabilities.ingestionHistorical) { - ;[hub, closeHub] = hub ? [hub, closeHub] : await createHub(serverConfig, null, capabilities) + ;[hub, closeHub] = hub ? [hub, closeHub] : await createHub(serverConfig, capabilities) serverInstance = serverInstance ? serverInstance : { hub } piscina = piscina ?? (await makePiscina(serverConfig, hub)) @@ -319,7 +316,7 @@ export async function startPluginsServer( } if (capabilities.ingestionOverflow) { - ;[hub, closeHub] = hub ? [hub, closeHub] : await createHub(serverConfig, null, capabilities) + ;[hub, closeHub] = hub ? [hub, closeHub] : await createHub(serverConfig, capabilities) serverInstance = serverInstance ? serverInstance : { hub } piscina = piscina ?? (await makePiscina(serverConfig, hub)) @@ -332,7 +329,7 @@ export async function startPluginsServer( } if (capabilities.processAsyncOnEventHandlers) { - ;[hub, closeHub] = hub ? [hub, closeHub] : await createHub(serverConfig, null, capabilities) + ;[hub, closeHub] = hub ? [hub, closeHub] : await createHub(serverConfig, capabilities) serverInstance = serverInstance ? serverInstance : { hub } piscina = piscina ?? (await makePiscina(serverConfig, hub)) @@ -349,10 +346,9 @@ export async function startPluginsServer( if (capabilities.processAsyncWebhooksHandlers) { // If we have a hub, then reuse some of it's attributes, otherwise // we need to create them. We only initialize the ones we need. - const statsd = hub?.statsd ?? createStatsdClient(serverConfig, null) - const postgres = hub?.postgres ?? new PostgresRouter(serverConfig, statsd) + const postgres = hub?.postgres ?? new PostgresRouter(serverConfig) const kafka = hub?.kafka ?? createKafkaClient(serverConfig) - const teamManager = hub?.teamManager ?? new TeamManager(postgres, serverConfig, statsd) + const teamManager = hub?.teamManager ?? new TeamManager(postgres, serverConfig) const organizationManager = hub?.organizationManager ?? new OrganizationManager(postgres, teamManager) const KafkaProducerWrapper = hub?.kafkaProducer ?? (await createKafkaProducerWrapper(serverConfig)) const appMetrics = @@ -371,7 +367,6 @@ export async function startPluginsServer( organizationManager: organizationManager, serverConfig: serverConfig, appMetrics: appMetrics, - statsd: statsd, }) stopWebhooksHandlerConsumer = webhooksStopConsumer @@ -401,15 +396,10 @@ export async function startPluginsServer( startPreflightSchedules(hub) } - if (hub.statsd) { - stopEventLoopMetrics = captureEventLoopMetrics(hub.statsd, hub.instanceId) - } - serverInstance.piscina = piscina serverInstance.queue = analyticsEventsIngestionConsumer serverInstance.stop = closeJobs - hub.statsd?.timing('total_setup_time', timer) pluginServerStartupTimeMs.inc(Date.now() - timer.valueOf()) status.info('🚀', 'All systems go') @@ -419,8 +409,7 @@ export async function startPluginsServer( if (capabilities.sessionRecordingBlobIngestion) { const recordingConsumerConfig = sessionRecordingConsumerConfig(serverConfig) - const statsd = hub?.statsd ?? createStatsdClient(serverConfig, null) - const postgres = hub?.postgres ?? new PostgresRouter(serverConfig, statsd) + const postgres = hub?.postgres ?? new PostgresRouter(serverConfig) const s3 = hub?.objectStorage ?? getObjectStorage(recordingConsumerConfig) if (!s3) { diff --git a/plugin-server/src/types.ts b/plugin-server/src/types.ts index b29ed86d68ba0..e1374f3d8a376 100644 --- a/plugin-server/src/types.ts +++ b/plugin-server/src/types.ts @@ -12,7 +12,6 @@ import { Webhook, } from '@posthog/plugin-scaffold' import { Pool as GenericPool } from 'generic-pool' -import { StatsD } from 'hot-shots' import { Redis } from 'ioredis' import { Kafka } from 'kafkajs' import { DateTime } from 'luxon' @@ -151,9 +150,6 @@ export interface PluginsServerConfig { SENTRY_PLUGIN_SERVER_TRACING_SAMPLE_RATE: number // Rate of tracing in plugin server (between 0 and 1) SENTRY_PLUGIN_SERVER_PROFILING_SAMPLE_RATE: number // Rate of profiling in plugin server (between 0 and 1) HTTP_SERVER_PORT: number - STATSD_HOST: string | null - STATSD_PORT: number - STATSD_PREFIX: string SCHEDULE_LOCK_TTL: number // how many seconds to hold the lock for the schedule DISABLE_MMDB: boolean // whether to disable fetching MaxMind database for IP location DISTINCT_ID_LRU_SIZE: number @@ -238,7 +234,7 @@ export interface Hub extends PluginsServerConfig { instanceId: UUID // what tasks this server will tackle - e.g. ingestion, scheduled plugins or others. capabilities: PluginServerCapabilities - // active connections to Postgres, Redis, ClickHouse, Kafka, StatsD + // active connections to Postgres, Redis, ClickHouse, Kafka db: DB postgres: PostgresRouter redisPool: GenericPool @@ -247,7 +243,6 @@ export interface Hub extends PluginsServerConfig { kafkaProducer: KafkaProducerWrapper objectStorage: ObjectStorage // metrics - statsd?: StatsD pluginMetricsJob: Job | undefined // currently enabled plugin status plugins: Map diff --git a/plugin-server/src/utils/db/db.ts b/plugin-server/src/utils/db/db.ts index 8e4529f822adb..3c202e0db73aa 100644 --- a/plugin-server/src/utils/db/db.ts +++ b/plugin-server/src/utils/db/db.ts @@ -2,7 +2,6 @@ import ClickHouse from '@posthog/clickhouse' import { CacheOptions, Properties } from '@posthog/plugin-scaffold' import { captureException } from '@sentry/node' import { Pool as GenericPool } from 'generic-pool' -import { StatsD } from 'hot-shots' import Redis from 'ioredis' import { ProducerRecord } from 'kafkajs' import { DateTime } from 'luxon' @@ -158,9 +157,6 @@ export class DB { /** ClickHouse used for syncing Postgres and ClickHouse person data. */ clickhouse: ClickHouse - /** StatsD instance used to do instrumentation */ - statsd: StatsD | undefined - /** How many unique group types to allow per team */ MAX_GROUP_TYPES_PER_TEAM = 5 @@ -175,7 +171,6 @@ export class DB { redisPool: GenericPool, kafkaProducer: KafkaProducerWrapper, clickhouse: ClickHouse, - statsd: StatsD | undefined, pluginsDefaultLogLevel: PluginLogLevel, personAndGroupsCacheTtl = 1 ) { @@ -183,7 +178,6 @@ export class DB { this.redisPool = redisPool this.kafkaProducer = kafkaProducer this.clickhouse = clickhouse - this.statsd = statsd this.pluginsDefaultLogLevel = pluginsDefaultLogLevel this.PERSONS_AND_GROUPS_CACHE_TTL = personAndGroupsCacheTtl } @@ -194,7 +188,7 @@ export class DB { query: string, options?: ClickHouse.QueryOptions ): Promise> { - return instrumentQuery(this.statsd, 'query.clickhouse', undefined, async () => { + return instrumentQuery('query.clickhouse', undefined, async () => { const timeout = timeoutGuard('ClickHouse slow query warning after 30 sec', { query }) try { const queryResult = await this.clickhouse.querying(query, options) @@ -217,7 +211,7 @@ export class DB { ): Promise { const { jsonSerialize = true } = options - return instrumentQuery(this.statsd, 'query.redisGet', tag, async () => { + return instrumentQuery('query.redisGet', tag, async () => { const client = await this.redisPool.acquire() const timeout = timeoutGuard('Getting redis key delayed. Waiting over 30 sec to get key.', { key }) try { @@ -252,7 +246,7 @@ export class DB { ): Promise { const { jsonSerialize = true } = options - return instrumentQuery(this.statsd, 'query.redisSet', tag, async () => { + return instrumentQuery('query.redisSet', tag, async () => { const client = await this.redisPool.acquire() const timeout = timeoutGuard('Setting redis key delayed. Waiting over 30 sec to set key', { key }) try { @@ -272,7 +266,7 @@ export class DB { public redisSetMulti(kv: Array<[string, unknown]>, ttlSeconds?: number, options: CacheOptions = {}): Promise { const { jsonSerialize = true } = options - return instrumentQuery(this.statsd, 'query.redisSet', undefined, async () => { + return instrumentQuery('query.redisSet', undefined, async () => { const client = await this.redisPool.acquire() const timeout = timeoutGuard('Setting redis key delayed. Waiting over 30 sec to set keys', { keys: kv.map((x) => x[0]), @@ -296,7 +290,7 @@ export class DB { } public redisIncr(key: string): Promise { - return instrumentQuery(this.statsd, 'query.redisIncr', undefined, async () => { + return instrumentQuery('query.redisIncr', undefined, async () => { const client = await this.redisPool.acquire() const timeout = timeoutGuard('Incrementing redis key delayed. Waiting over 30 sec to incr key', { key }) try { @@ -309,7 +303,7 @@ export class DB { } public redisExpire(key: string, ttlSeconds: number): Promise { - return instrumentQuery(this.statsd, 'query.redisExpire', undefined, async () => { + return instrumentQuery('query.redisExpire', undefined, async () => { const client = await this.redisPool.acquire() const timeout = timeoutGuard('Expiring redis key delayed. Waiting over 30 sec to expire key', { key }) try { @@ -324,7 +318,7 @@ export class DB { public redisLPush(key: string, value: unknown, options: CacheOptions = {}): Promise { const { jsonSerialize = true } = options - return instrumentQuery(this.statsd, 'query.redisLPush', undefined, async () => { + return instrumentQuery('query.redisLPush', undefined, async () => { const client = await this.redisPool.acquire() const timeout = timeoutGuard('LPushing redis key delayed. Waiting over 30 sec to lpush key', { key }) try { @@ -338,7 +332,7 @@ export class DB { } public redisLRange(key: string, startIndex: number, endIndex: number): Promise { - return instrumentQuery(this.statsd, 'query.redisLRange', undefined, async () => { + return instrumentQuery('query.redisLRange', undefined, async () => { const client = await this.redisPool.acquire() const timeout = timeoutGuard('LRANGE delayed. Waiting over 30 sec to perform LRANGE', { key, @@ -355,7 +349,7 @@ export class DB { } public redisLLen(key: string): Promise { - return instrumentQuery(this.statsd, 'query.redisLLen', undefined, async () => { + return instrumentQuery('query.redisLLen', undefined, async () => { const client = await this.redisPool.acquire() const timeout = timeoutGuard('LLEN delayed. Waiting over 30 sec to perform LLEN', { key, @@ -370,7 +364,7 @@ export class DB { } public redisBRPop(key1: string, key2: string): Promise<[string, string]> { - return instrumentQuery(this.statsd, 'query.redisBRPop', undefined, async () => { + return instrumentQuery('query.redisBRPop', undefined, async () => { const client = await this.redisPool.acquire() const timeout = timeoutGuard('BRPoping redis key delayed. Waiting over 30 sec to brpop keys', { key1, @@ -386,7 +380,7 @@ export class DB { } public redisLRem(key: string, count: number, elementKey: string): Promise { - return instrumentQuery(this.statsd, 'query.redisLRem', undefined, async () => { + return instrumentQuery('query.redisLRem', undefined, async () => { const client = await this.redisPool.acquire() const timeout = timeoutGuard('LREM delayed. Waiting over 30 sec to perform LREM', { key, @@ -403,7 +397,7 @@ export class DB { } public redisLPop(key: string, count: number): Promise { - return instrumentQuery(this.statsd, 'query.redisLPop', undefined, async () => { + return instrumentQuery('query.redisLPop', undefined, async () => { const client = await this.redisPool.acquire() const timeout = timeoutGuard('LPOP delayed. Waiting over 30 sec to perform LPOP', { key, @@ -419,7 +413,7 @@ export class DB { } public redisPublish(channel: string, message: string): Promise { - return instrumentQuery(this.statsd, 'query.redisPublish', undefined, async () => { + return instrumentQuery('query.redisPublish', undefined, async () => { const client = await this.redisPool.acquire() const timeout = timeoutGuard('Publish delayed. Waiting over 30 sec to perform Publish', { channel, @@ -502,7 +496,6 @@ export class DB { ) if (cachedGroupData) { - this.statsd?.increment('group_info_cache.hit') groupInfoCacheResultCounter.labels({ result: 'hit' }).inc() groupPropertiesColumns[propertiesColumnName] = JSON.stringify(cachedGroupData.properties) groupCreatedAtColumns[createdAtColumnName] = cachedGroupData.created_at @@ -513,7 +506,6 @@ export class DB { captureException(error, { tags: { team_id: teamId } }) } - this.statsd?.increment('group_info_cache.miss') groupInfoCacheResultCounter.labels({ result: 'miss' }).inc() // If we didn't find cached data, lookup the group from Postgres @@ -541,7 +533,6 @@ export class DB { } } else { // We couldn't find the data from the cache nor Postgres, so record this in a metric and in Sentry - this.statsd?.increment('groups_data_missing_entirely') groupDataMissingCounter.inc() status.debug('🔍', `Could not find group data for group ${groupCacheKey} in cache or storage`) @@ -782,7 +773,6 @@ export class DB { // Without races, the returned person (updatedPerson) should have a version that's only +1 the person in memory const versionDisparity = updatedPerson.version - person.version - 1 if (versionDisparity > 0) { - this.statsd?.increment('person_update_version_mismatch', { versionDisparity: String(versionDisparity) }) personUpdateVersionMismatchCounter.inc() } @@ -1112,11 +1102,6 @@ export class DB { return } - this.statsd?.increment(`logs.entries_created`, { - source, - team_id: pluginConfig.team_id.toString(), - plugin_id: pluginConfig.plugin_id.toString(), - }) pluginLogEntryCounter.labels({ plugin_id: String(pluginConfig.plugin_id), source }).inc() try { diff --git a/plugin-server/src/utils/db/hub.ts b/plugin-server/src/utils/db/hub.ts index b161de1873fc9..9298403b02a6d 100644 --- a/plugin-server/src/utils/db/hub.ts +++ b/plugin-server/src/utils/db/hub.ts @@ -1,7 +1,5 @@ import ClickHouse from '@posthog/clickhouse' -import * as Sentry from '@sentry/node' import * as fs from 'fs' -import { StatsD } from 'hot-shots' import { Kafka, SASLOptions } from 'kafkajs' import { DateTime } from 'luxon' import { hostname } from 'os' @@ -70,7 +68,6 @@ export function createEventsToDropByToken(eventsToDropByTokenStr?: string): Map< export async function createHub( config: Partial = {}, - threadId: number | null = null, capabilities: PluginServerCapabilities | null = null ): Promise<[Hub, () => Promise]> { status.info('ℹī¸', `Connecting to all services:`) @@ -89,8 +86,6 @@ export async function createHub( serverConfig.CONVERSION_BUFFER_ENABLED_TEAMS.split(',').filter(String).map(Number) ) - const statsd: StatsD | undefined = createStatsdClient(serverConfig, threadId) - status.info('🤔', `Connecting to ClickHouse...`) const clickhouse = new ClickHouse({ // We prefer to run queries on the offline cluster. @@ -117,7 +112,7 @@ export async function createHub( const kafkaProducer = await createKafkaProducerWrapper(serverConfig) status.info('👍', `Kafka ready`) - const postgres = new PostgresRouter(serverConfig, statsd) + const postgres = new PostgresRouter(serverConfig) // TODO: assert tables are reachable (async calls that cannot be in a constructor) status.info('👍', `Postgres Router ready`) @@ -139,11 +134,10 @@ export async function createHub( redisPool, kafkaProducer, clickhouse, - statsd, serverConfig.PLUGINS_DEFAULT_LOG_LEVEL, serverConfig.PERSON_INFO_CACHE_TTL ) - const teamManager = new TeamManager(postgres, serverConfig, statsd) + const teamManager = new TeamManager(postgres, serverConfig) const organizationManager = new OrganizationManager(postgres, teamManager) const pluginsApiKeyManager = new PluginsApiKeyManager(db) const rootAccessManager = new RootAccessManager(db) @@ -176,7 +170,6 @@ export async function createHub( clickhouse, kafka, kafkaProducer, - statsd, enqueuePluginJob, objectStorage: objectStorage, @@ -236,38 +229,6 @@ export type KafkaConfig = { KAFKA_CLIENT_RACK?: string } -export function createStatsdClient(serverConfig: PluginsServerConfig, threadId: number | null) { - let statsd: StatsD | undefined - - if (serverConfig.STATSD_HOST) { - status.info('🤔', `Connecting to StatsD...`) - statsd = new StatsD({ - port: serverConfig.STATSD_PORT, - host: serverConfig.STATSD_HOST, - prefix: serverConfig.STATSD_PREFIX, - telegraf: true, - globalTags: serverConfig.PLUGIN_SERVER_MODE - ? { pluginServerMode: serverConfig.PLUGIN_SERVER_MODE } - : undefined, - errorHandler: (error) => { - status.warn('⚠ī¸', 'StatsD error', error) - Sentry.captureException(error, { - extra: { threadId }, - }) - }, - }) - // don't repeat the same info in each thread - if (threadId === null) { - status.info( - 'đŸĒĩ', - `Sending metrics to StatsD at ${serverConfig.STATSD_HOST}:${serverConfig.STATSD_PORT}, prefix: "${serverConfig.STATSD_PREFIX}"` - ) - } - status.info('👍', `StatsD ready`) - } - return statsd -} - export function createKafkaClient({ KAFKA_HOSTS, KAFKAJS_LOG_LEVEL, diff --git a/plugin-server/src/utils/db/postgres.ts b/plugin-server/src/utils/db/postgres.ts index 7e859a3bc225e..8bc67320fa47c 100644 --- a/plugin-server/src/utils/db/postgres.ts +++ b/plugin-server/src/utils/db/postgres.ts @@ -1,6 +1,5 @@ // Postgres -import { StatsD } from 'hot-shots' import { Client, Pool, PoolClient, QueryConfig, QueryResult, QueryResultRow } from 'pg' import { PluginsServerConfig } from '../../types' @@ -29,9 +28,8 @@ export class TransactionClient { export class PostgresRouter { private pools: Map - private readonly statsd: StatsD | undefined - constructor(serverConfig: PluginsServerConfig, statsd: StatsD | undefined) { + constructor(serverConfig: PluginsServerConfig) { status.info('🤔', `Connecting to common Postgresql...`) const commonClient = createPostgresPool(serverConfig.DATABASE_URL, serverConfig.POSTGRES_CONNECTION_POOL_SIZE) status.info('👍', `Common Postgresql ready`) @@ -42,7 +40,6 @@ export class PostgresRouter { [PostgresUse.COMMON_READ, commonClient], [PostgresUse.PLUGIN_STORAGE_RW, commonClient], ]) - this.statsd = statsd if (serverConfig.DATABASE_READONLY_URL) { status.info('🤔', `Connecting to read-only common Postgresql...`) @@ -70,10 +67,10 @@ export class PostgresRouter { ): Promise> { if (target instanceof TransactionClient) { const wrappedTag = `${PostgresUse[target.target]}:Tx<${tag}>` - return postgresQuery(target.client, queryString, values, wrappedTag, this.statsd) + return postgresQuery(target.client, queryString, values, wrappedTag) } else { const wrappedTag = `${PostgresUse[target]}<${tag}>` - return postgresQuery(this.pools.get(target)!, queryString, values, wrappedTag, this.statsd) + return postgresQuery(this.pools.get(target)!, queryString, values, wrappedTag) } } @@ -105,7 +102,8 @@ export class PostgresRouter { transaction: (client: TransactionClient) => Promise ): Promise { const wrappedTag = `${PostgresUse[usage]}:Tx<${tag}>` - return instrumentQuery(this.statsd, 'query.postgres_transaction', wrappedTag, async () => { + + return instrumentQuery('query.postgres_transaction', wrappedTag, async () => { const timeout = timeoutGuard(`Postgres slow transaction warning after 30 sec!`) const client = await this.pools.get(usage)!.connect() try { @@ -143,10 +141,9 @@ function postgresQuery( client: Client | Pool | PoolClient, queryString: string | QueryConfig, values: I | undefined, - tag: string, - statsd?: StatsD + tag: string ): Promise> { - return instrumentQuery(statsd, 'query.postgres', tag, async () => { + return instrumentQuery('query.postgres', tag, async () => { const queryConfig = typeof queryString === 'string' ? { diff --git a/plugin-server/src/utils/metrics.ts b/plugin-server/src/utils/metrics.ts index 5341b662944eb..358ac970d85ee 100644 --- a/plugin-server/src/utils/metrics.ts +++ b/plugin-server/src/utils/metrics.ts @@ -1,19 +1,13 @@ -import { StatsD, Tags } from 'hot-shots' import { Summary } from 'prom-client' import { runInSpan } from '../sentry' -import { UUID } from './utils' -type StopCallback = () => void - -export function instrumentQuery( - statsd: StatsD | undefined, +export async function instrumentQuery( metricName: string, tag: string | undefined, runQuery: () => Promise ): Promise { return instrument( - statsd, { metricName, key: 'queryTag', @@ -24,7 +18,6 @@ export function instrumentQuery( } export function instrument( - statsd: StatsD | undefined, options: { metricName: string key?: string @@ -33,20 +26,17 @@ export function instrument( }, runQuery: () => Promise ): Promise { - const tags: Tags = options.key ? { [options.key]: options.tag! } : {} return runInSpan( { op: options.metricName, description: options.tag, - data: { ...tags, ...options.data }, + data: { ...options.data }, }, async () => { const timer = new Date() - statsd?.increment(`${options.metricName}.total`, tags) try { return await runQuery() } finally { - statsd?.timing(options.metricName, timer, tags) instrumentedFnSummary .labels(options.metricName, String(options.key ?? 'null'), String(options.tag ?? 'null')) .observe(Date.now() - timer.getTime()) @@ -55,21 +45,6 @@ export function instrument( ) } -export function captureEventLoopMetrics(statsd: StatsD, instanceId: UUID): StopCallback { - const timer = setInterval(() => { - const time = new Date() - setTimeout(() => { - statsd?.timing('event_loop_lag_set_timeout', time, { - instanceId: instanceId.toString(), - }) - }, 0) - }, 2000) - - return () => { - clearInterval(timer) - } -} - const instrumentedFnSummary = new Summary({ name: 'instrumented_fn_duration_ms', help: 'Duration of instrumented functions', diff --git a/plugin-server/src/worker/ingestion/action-matcher.ts b/plugin-server/src/worker/ingestion/action-matcher.ts index fbed343195893..75d3158785801 100644 --- a/plugin-server/src/worker/ingestion/action-matcher.ts +++ b/plugin-server/src/worker/ingestion/action-matcher.ts @@ -2,7 +2,6 @@ import { Properties } from '@posthog/plugin-scaffold' import { captureException } from '@sentry/node' import escapeStringRegexp from 'escape-string-regexp' import equal from 'fast-deep-equal' -import { StatsD } from 'hot-shots' import { Summary } from 'prom-client' import RE2 from 're2' @@ -135,12 +134,10 @@ export function matchString(actual: string, expected: string, matching: StringMa export class ActionMatcher { private postgres: PostgresRouter private actionManager: ActionManager - private statsd: StatsD | undefined - constructor(postgres: PostgresRouter, actionManager: ActionManager, statsd?: StatsD) { + constructor(postgres: PostgresRouter, actionManager: ActionManager) { this.postgres = postgres this.actionManager = actionManager - this.statsd = statsd } public hasWebhooks(teamId: number): boolean { @@ -164,8 +161,6 @@ export class ActionMatcher { matches.push(teamActions[i]) } } - this.statsd?.timing('action_matching_for_event', matchingStart) - this.statsd?.increment('action_matches_found', matches.length) actionMatchMsSummary.observe(new Date().getTime() - matchingStart.getTime()) return matches } diff --git a/plugin-server/src/worker/ingestion/event-pipeline/pluginsProcessEventStep.ts b/plugin-server/src/worker/ingestion/event-pipeline/pluginsProcessEventStep.ts index bd19e06ede80e..05e3a839e12b5 100644 --- a/plugin-server/src/worker/ingestion/event-pipeline/pluginsProcessEventStep.ts +++ b/plugin-server/src/worker/ingestion/event-pipeline/pluginsProcessEventStep.ts @@ -23,9 +23,6 @@ export async function pluginsProcessEventStep( return processedEvent } else { // processEvent might not return an event. This is expected and plugins, e.g. downsample plugin uses it. - runner.hub.statsd?.increment('kafka_queue.dropped_event', { - teamID: String(event.team_id), - }) droppedEventCounter.inc() return null } diff --git a/plugin-server/src/worker/ingestion/event-pipeline/populateTeamDataStep.ts b/plugin-server/src/worker/ingestion/event-pipeline/populateTeamDataStep.ts index 248e54b75655d..5ea66c651c32d 100644 --- a/plugin-server/src/worker/ingestion/event-pipeline/populateTeamDataStep.ts +++ b/plugin-server/src/worker/ingestion/event-pipeline/populateTeamDataStep.ts @@ -29,12 +29,6 @@ export async function populateTeamDataStep( }) .inc() - // statsd copy as prometheus is currently not supported in worker threads. - runner.hub.statsd?.increment('ingestion_event_hasauthinfo', { - team_id_present: event.team_id ? 'true' : 'false', - token_present: event.token ? 'true' : 'false', - }) - // If a team_id is present (event captured from an app), trust it and return the event as is. if (event.team_id) { // Check for an invalid UUID, which should be blocked by capture, when team_id is present @@ -56,7 +50,6 @@ export async function populateTeamDataStep( drop_cause: 'no_token', }) .inc() - runner.hub.statsd?.increment('dropped_event_with_no_team', { token_set: 'false' }) return null } @@ -72,7 +65,6 @@ export async function populateTeamDataStep( drop_cause: 'invalid_token', }) .inc() - runner.hub.statsd?.increment('dropped_event_with_no_team', { token_set: 'true' }) return null } diff --git a/plugin-server/src/worker/ingestion/event-pipeline/prepareEventStep.ts b/plugin-server/src/worker/ingestion/event-pipeline/prepareEventStep.ts index 79619a0304c30..b026423156662 100644 --- a/plugin-server/src/worker/ingestion/event-pipeline/prepareEventStep.ts +++ b/plugin-server/src/worker/ingestion/event-pipeline/prepareEventStep.ts @@ -10,7 +10,6 @@ export async function prepareEventStep(runner: EventPipelineRunner, event: Plugi const { team_id, uuid } = event const tsParsingIngestionWarnings: Promise[] = [] const invalidTimestampCallback = function (type: string, details: Record) { - runner.hub.statsd?.increment('process_event_invalid_timestamp', { teamId: String(team_id), type: type }) invalidTimestampCounter.labels(type).inc() tsParsingIngestionWarnings.push(captureIngestionWarning(runner.hub.db, team_id, type, details)) diff --git a/plugin-server/src/worker/ingestion/event-pipeline/processPersonsStep.ts b/plugin-server/src/worker/ingestion/event-pipeline/processPersonsStep.ts index 0ae5390cc2d93..6a4cd6c4874ca 100644 --- a/plugin-server/src/worker/ingestion/event-pipeline/processPersonsStep.ts +++ b/plugin-server/src/worker/ingestion/event-pipeline/processPersonsStep.ts @@ -28,7 +28,6 @@ export async function processPersonsStep( String(event.distinct_id), timestamp, runner.hub.db, - runner.hub.statsd, runner.poEEmbraceJoin ).update() diff --git a/plugin-server/src/worker/ingestion/event-pipeline/runner.ts b/plugin-server/src/worker/ingestion/event-pipeline/runner.ts index 362cf7748cc65..451a033440aa2 100644 --- a/plugin-server/src/worker/ingestion/event-pipeline/runner.ts +++ b/plugin-server/src/worker/ingestion/event-pipeline/runner.ts @@ -93,7 +93,6 @@ export class EventPipelineRunner { } else { result = this.registerLastStep('populateTeamDataStep', null, [event]) } - this.hub.statsd?.increment('kafka_queue.single_event.processed_and_ingested') eventProcessedAndIngestedCounter.inc() return result } catch (error) { @@ -145,10 +144,6 @@ export class EventPipelineRunner { args: any[], promises?: Array> ): EventPipelineResult { - this.hub.statsd?.increment('kafka_queue.event_pipeline.step.last', { - step: stepName, - team_id: String(teamId), // NOTE: potentially high cardinality - }) pipelineLastStepCounter.labels(stepName).inc() return { promises: promises, lastStep: stepName, args } } @@ -176,8 +171,6 @@ export class EventPipelineRunner { try { const result = await step(...args) pipelineStepMsSummary.labels(step.name).observe(Date.now() - timer.getTime()) - this.hub.statsd?.increment('kafka_queue.event_pipeline.step', { step: step.name }) - this.hub.statsd?.timing('kafka_queue.event_pipeline.step.timing', timer, { step: step.name }) return result } catch (err) { await this.handleError(err, step.name, args, teamId, sentToDql) @@ -206,7 +199,6 @@ export class EventPipelineRunner { extra: { currentArgs, originalEvent: this.originalEvent }, }) - this.hub.statsd?.increment('kafka_queue.event_pipeline.step.error', { step: currentStepName }) pipelineStepErrorCounter.labels(currentStepName).inc() // Should we throw or should we drop and send the event to DLQ. @@ -225,7 +217,6 @@ export class EventPipelineRunner { `plugin_server_ingest_event:${currentStepName}` ) await this.hub.db.kafkaProducer!.queueMessage(message) - this.hub.statsd?.increment('events_added_to_dead_letter_queue') } catch (dlqError) { status.info('🔔', `Errored trying to add event to dead letter queue. Error: ${dlqError}`) Sentry.captureException(dlqError, { diff --git a/plugin-server/src/worker/ingestion/hooks.ts b/plugin-server/src/worker/ingestion/hooks.ts index 25ba7f2e6f2eb..c2a6c0536df46 100644 --- a/plugin-server/src/worker/ingestion/hooks.ts +++ b/plugin-server/src/worker/ingestion/hooks.ts @@ -1,5 +1,4 @@ import { captureException } from '@sentry/node' -import { StatsD } from 'hot-shots' import { Histogram } from 'prom-client' import { format } from 'util' @@ -256,7 +255,6 @@ export class HookCommander { teamManager: TeamManager organizationManager: OrganizationManager appMetrics: AppMetrics - statsd: StatsD | undefined siteUrl: string /** Hook request timeout in ms. */ EXTERNAL_REQUEST_TIMEOUT: number @@ -266,7 +264,6 @@ export class HookCommander { teamManager: TeamManager, organizationManager: OrganizationManager, appMetrics: AppMetrics, - statsd: StatsD | undefined, timeout: number ) { this.postgres = postgres @@ -278,7 +275,6 @@ export class HookCommander { status.warn('⚠ī¸', 'SITE_URL env is not set for webhooks') this.siteUrl = '' } - this.statsd = statsd this.appMetrics = appMetrics this.EXTERNAL_REQUEST_TIMEOUT = timeout } diff --git a/plugin-server/src/worker/ingestion/person-state.ts b/plugin-server/src/worker/ingestion/person-state.ts index e5763cfe60d50..250f95e4e67b7 100644 --- a/plugin-server/src/worker/ingestion/person-state.ts +++ b/plugin-server/src/worker/ingestion/person-state.ts @@ -1,6 +1,5 @@ import { PluginEvent, Properties } from '@posthog/plugin-scaffold' import * as Sentry from '@sentry/node' -import { StatsD } from 'hot-shots' import { ProducerRecord } from 'kafkajs' import { DateTime } from 'luxon' import { Counter } from 'prom-client' @@ -90,7 +89,6 @@ export class PersonState { maxMergeAttempts: number private db: DB - private statsd: StatsD | undefined public updateIsIdentified: boolean // TODO: remove this from the class and being hidden private poEEmbraceJoin: boolean @@ -100,7 +98,6 @@ export class PersonState { distinctId: string, timestamp: DateTime, db: DB, - statsd: StatsD | undefined = undefined, poEEmbraceJoin = false, uuid: UUIDT | undefined = undefined, maxMergeAttempts: number = MAX_FAILED_PERSON_MERGE_ATTEMPTS @@ -114,7 +111,6 @@ export class PersonState { this.maxMergeAttempts = maxMergeAttempts this.db = db - this.statsd = statsd // If set to true, we'll update `is_identified` at the end of `updateProperties` // :KLUDGE: This is an indirect communication channel between `handleIdentifyOrAlias` and `updateProperties` @@ -405,21 +401,8 @@ export class PersonState { otherPersonDistinctId: string }): Promise { const olderCreatedAt = DateTime.min(mergeInto.created_at, otherPerson.created_at) - const newerCreatedAt = DateTime.max(mergeInto.created_at, otherPerson.created_at) - const mergeAllowed = this.isMergeAllowed(otherPerson) - this.statsd?.increment('merge_users', { - call: this.event.event, // $identify, $create_alias or $merge_dangerously - teamId: this.teamId.toString(), - oldPersonIdentified: String(otherPerson.is_identified), - newPersonIdentified: String(mergeInto.is_identified), - // For analyzing impact of merges we need to know how old data would need to get updated - // If we are smart we merge the newer person into the older one, - // so we need to know the newer person's age - newerPersonAgeInMonths: String(ageInMonthsLowCardinality(newerCreatedAt)), - }) - // If merge isn't allowed, we will ignore it, log an ingestion warning and exit if (!mergeAllowed) { // TODO: add event UUID to the ingestion warning @@ -702,14 +685,6 @@ class PersonOverrideWriter { } } -export function ageInMonthsLowCardinality(timestamp: DateTime): number { - const ageInMonths = Math.max(-Math.floor(timestamp.diffNow('months').months), 0) - // for getting low cardinality for statsd metrics tags, which can cause issues in e.g. InfluxDB: - // https://docs.influxdata.com/influxdb/cloud/write-data/best-practices/resolve-high-cardinality/ - const ageLowCardinality = Math.min(ageInMonths, 50) - return ageLowCardinality -} - function SQL(sqlParts: TemplateStringsArray, ...args: any[]): { text: string; values: any[] } { // Generates a node-pq compatible query object given a tagged // template literal. The intention is to remove the need to match up diff --git a/plugin-server/src/worker/ingestion/process-event.ts b/plugin-server/src/worker/ingestion/process-event.ts index 3a69b022771ad..1c1ad2d82ac7a 100644 --- a/plugin-server/src/worker/ingestion/process-event.ts +++ b/plugin-server/src/worker/ingestion/process-event.ts @@ -63,8 +63,7 @@ export class EventsProcessor { this.teamManager, this.groupTypeManager, pluginsServer.db, - pluginsServer, - pluginsServer.statsd + pluginsServer ) } @@ -95,9 +94,6 @@ export class EventsProcessor { }) try { result = await this.capture(eventUuid, team, data['event'], distinctId, properties, timestamp) - this.pluginsServer.statsd?.timing('kafka_queue.single_save.standard', singleSaveTimer, { - team_id: teamId.toString(), - }) processEventMsSummary.observe(Date.now() - singleSaveTimer.valueOf()) } finally { clearTimeout(captureTimeout) diff --git a/plugin-server/src/worker/ingestion/property-definitions-cache.ts b/plugin-server/src/worker/ingestion/property-definitions-cache.ts index 2ac60e3ee9d64..db91dd7fc8d2a 100644 --- a/plugin-server/src/worker/ingestion/property-definitions-cache.ts +++ b/plugin-server/src/worker/ingestion/property-definitions-cache.ts @@ -1,4 +1,3 @@ -import { StatsD } from 'hot-shots' import LRU from 'lru-cache' import LRUCache from 'lru-cache' @@ -27,12 +26,10 @@ type PropertyDefinitionsCacheValue = PropertyType | typeof NULL_IN_DATABASE | ty */ export class PropertyDefinitionsCache { readonly propertyDefinitionsCache: Map> - private readonly statsd?: StatsD private readonly lruCacheSize: number - constructor(serverConfig: PluginsServerConfig, statsd?: StatsD) { + constructor(serverConfig: PluginsServerConfig) { this.lruCacheSize = serverConfig.EVENT_PROPERTY_LRU_SIZE - this.statsd = statsd this.propertyDefinitionsCache = new Map() } diff --git a/plugin-server/src/worker/ingestion/property-definitions-manager.ts b/plugin-server/src/worker/ingestion/property-definitions-manager.ts index 430790f27e286..aad9343a16a28 100644 --- a/plugin-server/src/worker/ingestion/property-definitions-manager.ts +++ b/plugin-server/src/worker/ingestion/property-definitions-manager.ts @@ -1,5 +1,4 @@ import { Properties } from '@posthog/plugin-scaffold' -import { StatsD } from 'hot-shots' import LRU from 'lru-cache' import { DateTime } from 'luxon' import { Summary } from 'prom-client' @@ -59,18 +58,15 @@ export class PropertyDefinitionsManager { eventPropertiesCache: LRU> // Map> eventLastSeenCache: LRU // key: JSON.stringify([team_id, event]); value: parseInt(YYYYMMDD) propertyDefinitionsCache: PropertyDefinitionsCache - statsd?: StatsD private readonly lruCacheSize: number constructor( teamManager: TeamManager, groupTypeManager: GroupTypeManager, db: DB, - serverConfig: PluginsServerConfig, - statsd?: StatsD + serverConfig: PluginsServerConfig ) { this.db = db - this.statsd = statsd this.teamManager = teamManager this.groupTypeManager = groupTypeManager this.lruCacheSize = serverConfig.EVENT_PROPERTY_LRU_SIZE @@ -90,7 +86,7 @@ export class PropertyDefinitionsManager { maxAge: ONE_HOUR * 24, // cache up to 24h updateAgeOnGet: true, }) - this.propertyDefinitionsCache = new PropertyDefinitionsCache(serverConfig, statsd) + this.propertyDefinitionsCache = new PropertyDefinitionsCache(serverConfig) } public async updateEventNamesAndProperties(teamId: number, event: string, properties: Properties): Promise { @@ -118,7 +114,6 @@ export class PropertyDefinitionsManager { ]) } finally { clearTimeout(timeout) - this.statsd?.timing('updateEventNamesAndProperties', timer) updateEventNamesAndPropertiesMsSummary.observe(Date.now() - timer.valueOf()) } } diff --git a/plugin-server/src/worker/ingestion/team-manager.ts b/plugin-server/src/worker/ingestion/team-manager.ts index bd1dec1cd9ddf..862c482c00cbe 100644 --- a/plugin-server/src/worker/ingestion/team-manager.ts +++ b/plugin-server/src/worker/ingestion/team-manager.ts @@ -1,5 +1,4 @@ import { Properties } from '@posthog/plugin-scaffold' -import { StatsD } from 'hot-shots' import LRU from 'lru-cache' import { ONE_MINUTE } from '../../config/constants' @@ -13,12 +12,10 @@ export class TeamManager { postgres: PostgresRouter teamCache: LRU tokenToTeamIdCache: LRU - statsd?: StatsD instanceSiteUrl: string - constructor(postgres: PostgresRouter, serverConfig: PluginsServerConfig, statsd?: StatsD) { + constructor(postgres: PostgresRouter, serverConfig: PluginsServerConfig) { this.postgres = postgres - this.statsd = statsd this.teamCache = new LRU({ max: 10_000, diff --git a/plugin-server/src/worker/plugins/loadPluginsFromDB.ts b/plugin-server/src/worker/plugins/loadPluginsFromDB.ts index 3f556b7e6b160..81282e0646794 100644 --- a/plugin-server/src/worker/plugins/loadPluginsFromDB.ts +++ b/plugin-server/src/worker/plugins/loadPluginsFromDB.ts @@ -35,7 +35,6 @@ export async function loadPluginsFromDB( for (const row of pluginRows) { plugins.set(row.id, row) } - hub.statsd?.timing('load_plugins.plugins', startTimer) loadPluginsMsSummary.observe(new Date().getTime() - startTimer.getTime()) const pluginAttachmentTimer = new Date() @@ -53,7 +52,6 @@ export async function loadPluginsFromDB( contents: row.contents, } } - hub.statsd?.timing('load_plugins.plugin_attachments', pluginAttachmentTimer) loadPluginAttachmentsMsSummary.observe(new Date().getTime() - pluginAttachmentTimer.getTime()) const pluginConfigTimer = new Date() @@ -97,10 +95,8 @@ export async function loadPluginsFromDB( } teamConfigs.push(pluginConfig) } - hub.statsd?.timing('load_plugins.plugin_configs', pluginConfigTimer) - loadPluginConfigsMsSummary.observe(new Date().getTime() - pluginConfigTimer.getTime()) - hub.statsd?.timing('load_plugins.total', startTimer) + loadPluginConfigsMsSummary.observe(new Date().getTime() - pluginConfigTimer.getTime()) loadPluginsTotalMsSummary.observe(new Date().getTime() - startTimer.getTime()) return { plugins, pluginConfigs, pluginConfigsPerTeam } diff --git a/plugin-server/src/worker/plugins/loadSchedule.ts b/plugin-server/src/worker/plugins/loadSchedule.ts index e288d15e09893..6c5c4684d7390 100644 --- a/plugin-server/src/worker/plugins/loadSchedule.ts +++ b/plugin-server/src/worker/plugins/loadSchedule.ts @@ -32,6 +32,5 @@ export async function loadSchedule(server: Hub): Promise { } server.pluginSchedule = pluginSchedule - server.statsd?.timing('load_schedule.success', timer) loadScheduleMsSummary.observe(new Date().getTime() - timer.getTime()) } diff --git a/plugin-server/src/worker/plugins/run.ts b/plugin-server/src/worker/plugins/run.ts index d07e33dc67bd9..239b270cc1b37 100644 --- a/plugin-server/src/worker/plugins/run.ts +++ b/plugin-server/src/worker/plugins/run.ts @@ -4,7 +4,6 @@ import { Summary } from 'prom-client' import { Hub, PluginConfig, PluginTaskType, VMMethods } from '../../types' import { processError } from '../../utils/db/error' import { trackedFetch } from '../../utils/fetch' -import { instrument } from '../../utils/metrics' import { status } from '../../utils/status' import { IllegalOperationError } from '../../utils/utils' @@ -26,12 +25,6 @@ async function runSingleTeamPluginOnEvent( }, 10 * 1000) // 10 seconds try { // Runs onEvent for a single plugin without any retries - const metricName = 'plugin.on_event' - const metricTags = { - plugin: pluginConfig.plugin?.name ?? '?', - teamId: event.team_id.toString(), - } - const timer = new Date() try { await onEvent!(event) @@ -45,7 +38,6 @@ async function runSingleTeamPluginOnEvent( successes: 1, }) } catch (error) { - hub.statsd?.increment(`${metricName}.ERROR`, metricTags) pluginActionMsSummary .labels(pluginConfig.plugin?.id.toString() ?? '?', 'onEvent', 'error') .observe(new Date().getTime() - timer.getTime()) @@ -63,7 +55,6 @@ async function runSingleTeamPluginOnEvent( } ) } - hub.statsd?.timing(metricName, timer, metricTags) } finally { clearTimeout(timeout) } @@ -76,17 +67,7 @@ export async function runOnEvent(hub: Hub, event: ProcessedPluginEvent): Promise await Promise.all( pluginMethodsToRun .filter(([, method]) => !!method) - .map(([pluginConfig, onEvent]) => - instrument( - hub.statsd, - { - metricName: 'plugin.runOnEvent', - key: 'plugin', - tag: pluginConfig.plugin?.name || '?', - }, - () => runSingleTeamPluginOnEvent(hub, event, pluginConfig, onEvent) - ) - ) + .map(([pluginConfig, onEvent]) => runSingleTeamPluginOnEvent(hub, event, pluginConfig, onEvent)) ) } @@ -105,11 +86,6 @@ async function runSingleTeamPluginComposeWebhook( }, slowWarningTimeout) try { // Runs composeWebhook for a single plugin without any retries - const metricName = 'plugin.compose_webhook' - const metricTags = { - plugin: pluginConfig.plugin?.name ?? '?', - teamId: event.team_id.toString(), - } const timer = new Date() try { const webhook: Webhook | null = await composeWebhook!(event) @@ -139,7 +115,6 @@ async function runSingleTeamPluginComposeWebhook( successes: 1, }) } else { - hub.statsd?.increment(`${metricName}.ERROR`, metricTags) pluginActionMsSummary .labels(pluginConfig.plugin?.id.toString() ?? '?', 'composeWebhook', 'error') .observe(new Date().getTime() - timer.getTime()) @@ -159,7 +134,6 @@ async function runSingleTeamPluginComposeWebhook( ) } } catch (error) { - hub.statsd?.increment(`${metricName}.ERROR`, metricTags) pluginActionMsSummary .labels(pluginConfig.plugin?.id.toString() ?? '?', 'composeWebhook', 'error') .observe(new Date().getTime() - timer.getTime()) @@ -177,7 +151,6 @@ async function runSingleTeamPluginComposeWebhook( } ) } - hub.statsd?.timing(metricName, timer, metricTags) } finally { clearTimeout(timeout) } @@ -191,15 +164,7 @@ export async function runComposeWebhook(hub: Hub, event: PostHogEvent): Promise< pluginMethodsToRun .filter(([, method]) => !!method) .map(([pluginConfig, composeWebhook]) => - instrument( - hub.statsd, - { - metricName: 'plugin.runComposeWebhook', - key: 'plugin', - tag: pluginConfig.plugin?.name || '?', - }, - () => runSingleTeamPluginComposeWebhook(hub, event, pluginConfig, composeWebhook) - ) + runSingleTeamPluginComposeWebhook(hub, event, pluginConfig, composeWebhook) ) ) } @@ -223,16 +188,7 @@ export async function runProcessEvent(hub: Hub, event: PluginEvent): Promise processEvent(returnedEvent!) - )) || null + returnedEvent = (await processEvent(returnedEvent!)) || null if (returnedEvent && returnedEvent.team_id !== teamId) { returnedEvent.team_id = teamId throw new IllegalOperationError('Plugin tried to change event.team_id') @@ -252,10 +208,6 @@ export async function runProcessEvent(hub: Hub, event: PluginEvent): Promise (payload ? task?.exec(payload) : task?.exec()) - ) + response = await (payload ? task?.exec(payload) : task?.exec()) pluginActionMsSummary .labels(String(pluginConfig?.plugin?.id), 'task', 'success') @@ -356,12 +292,7 @@ export async function runPluginTask( } } catch (error) { await processError(hub, pluginConfig || null, error) - hub.statsd?.increment(`plugin.task.ERROR`, { - taskType: taskType, - taskName: taskName, - pluginConfigId: pluginConfigId.toString(), - teamId: teamId?.toString() ?? '?', - }) + pluginActionMsSummary .labels(String(pluginConfig?.plugin?.id), 'task', 'error') .observe(new Date().getTime() - timer.getTime()) @@ -377,10 +308,7 @@ export async function runPluginTask( ) } } - hub.statsd?.timing(`plugin.task`, timer, { - plugin: pluginConfig?.plugin?.name ?? '?', - teamId: teamId?.toString() ?? '?', - }) + return response } diff --git a/plugin-server/src/worker/plugins/setup.ts b/plugin-server/src/worker/plugins/setup.ts index 35bbc378111a3..b2e4e0bdd0f0c 100644 --- a/plugin-server/src/worker/plugins/setup.ts +++ b/plugin-server/src/worker/plugins/setup.ts @@ -58,7 +58,6 @@ export async function setupPlugins(hub: Hub): Promise { } await Promise.all(pluginVMLoadPromises) - hub.statsd?.timing('setup_plugins.success', timer) setupPluginsMsSummary.observe(new Date().getTime() - timer.getTime()) hub.plugins = plugins diff --git a/plugin-server/src/worker/vm/extensions/jobs.ts b/plugin-server/src/worker/vm/extensions/jobs.ts index 7c7a03eb1266d..cdeaa9c1ff45b 100644 --- a/plugin-server/src/worker/vm/extensions/jobs.ts +++ b/plugin-server/src/worker/vm/extensions/jobs.ts @@ -61,7 +61,6 @@ export function createJobs(server: Hub, pluginConfig: PluginConfig): Jobs { pluginConfigId: pluginConfig.id, pluginConfigTeam: pluginConfig.team_id, } - server.statsd?.increment('job_enqueue_attempt') pluginJobEnqueueCounter.labels(String(pluginConfig.plugin?.id)).inc() await server.enqueuePluginJob(job) } catch (e) { diff --git a/plugin-server/src/worker/vm/extensions/posthog.ts b/plugin-server/src/worker/vm/extensions/posthog.ts index 5c9f5d4b617ac..c7a0a7124c50d 100644 --- a/plugin-server/src/worker/vm/extensions/posthog.ts +++ b/plugin-server/src/worker/vm/extensions/posthog.ts @@ -74,7 +74,6 @@ export function createPosthog(hub: Hub, pluginConfig: PluginConfig): DummyPostHo uuid: new UUIDT().toString(), } await queueEvent(hub, pluginConfig, data) - hub.statsd?.increment('vm_posthog_extension_capture_called') vmPosthogExtensionCaptureCalledCounter.labels(String(pluginConfig.plugin?.id)).inc() }, api: createApi(hub, pluginConfig), diff --git a/plugin-server/src/worker/vm/extensions/storage.ts b/plugin-server/src/worker/vm/extensions/storage.ts index 02516a7840cb5..5f48b64974811 100644 --- a/plugin-server/src/worker/vm/extensions/storage.ts +++ b/plugin-server/src/worker/vm/extensions/storage.ts @@ -19,10 +19,6 @@ const vmExtensionStorageSetMsSummary = new Summary({ export function createStorage(server: Hub, pluginConfig: PluginConfig): StorageExtension { const get = async function (key: string, defaultValue: unknown): Promise { - server.statsd?.increment('vm_extension_storage_get', { - plugin: pluginConfig.plugin?.name ?? '?', - team_id: pluginConfig.team_id.toString(), - }) vmExtensionStorageGetCounter.labels(String(pluginConfig.plugin?.id)).inc() const result = await postgresGet(server.db, pluginConfig.id, key) @@ -46,14 +42,6 @@ export function createStorage(server: Hub, pluginConfig: PluginConfig): StorageE ) } - server.statsd?.increment('vm_extension_storage_set', { - plugin: pluginConfig.plugin?.name ?? '?', - team_id: pluginConfig.team_id.toString(), - }) - server.statsd?.timing('vm_extension_storage_set_timing', timer, { - plugin: pluginConfig.plugin?.name ?? '?', - team_id: pluginConfig.team_id.toString(), - }) vmExtensionStorageSetMsSummary .labels(String(pluginConfig.plugin?.id)) .observe(new Date().getTime() - timer.getTime()) diff --git a/plugin-server/src/worker/vm/lazy.ts b/plugin-server/src/worker/vm/lazy.ts index 459edd0fcc3a9..52286b13ff937 100644 --- a/plugin-server/src/worker/vm/lazy.ts +++ b/plugin-server/src/worker/vm/lazy.ts @@ -192,7 +192,6 @@ export class LazyPluginVM { const vm = (await this.resolveInternalVm)?.vm try { await instrument( - this.hub.statsd, { metricName: 'vm.setup', key: 'plugin', @@ -235,17 +234,7 @@ export class LazyPluginVM { throw Error('Only 1x replication is allowed') } } - await instrument( - this.hub.statsd, - { - metricName: 'plugin.setupPlugin', - key: 'plugin', - tag: this.pluginConfig.plugin?.name || '?', - }, - () => vm?.run(`${this.vmResponseVariable}.methods.setupPlugin?.()`) - ) - this.hub.statsd?.increment('plugin.setup.success', { plugin: this.pluginConfig.plugin?.name ?? '?' }) - this.hub.statsd?.timing('plugin.setup.timing', timer, { plugin: this.pluginConfig.plugin?.name ?? '?' }) + await vm?.run(`${this.vmResponseVariable}.methods.setupPlugin?.()`) pluginSetupMsSummary .labels({ plugin_id: pluginId, status: 'success' }) .observe(new Date().getTime() - timer.getTime()) @@ -257,10 +246,6 @@ export class LazyPluginVM { PluginLogEntryType.Debug ) } catch (error) { - this.hub.statsd?.increment('plugin.setup.fail', { plugin: this.pluginConfig.plugin?.name ?? '?' }) - this.hub.statsd?.timing('plugin.setup.fail_timing', timer, { - plugin: this.pluginConfig.plugin?.name ?? '?', - }) pluginSetupMsSummary .labels({ plugin_id: pluginId, status: 'fail' }) .observe(new Date().getTime() - timer.getTime()) @@ -308,10 +293,6 @@ export class LazyPluginVM { } private async processFatalVmSetupError(error: Error, isSystemError: boolean): Promise { - this.hub.statsd?.increment('plugin.disabled.by_system', { - teamId: this.pluginConfig.team_id.toString(), - plugin: this.pluginConfig.plugin?.name ?? '?', - }) pluginDisabledBySystemCounter.labels(this.pluginConfig.plugin?.id.toString() || 'unknown').inc() await processError(this.hub, this.pluginConfig, error) await disablePlugin(this.hub, this.pluginConfig.id) diff --git a/plugin-server/src/worker/vm/vm.ts b/plugin-server/src/worker/vm/vm.ts index c3b304049046f..8341fb721a53f 100644 --- a/plugin-server/src/worker/vm/vm.ts +++ b/plugin-server/src/worker/vm/vm.ts @@ -39,14 +39,6 @@ export function createPluginConfigVM( ): PluginConfigVMResponse { const timer = new Date() - const statsdTiming = (metric: string) => { - hub.statsd?.timing(metric, timer, { - pluginConfigId: String(pluginConfig.id), - pluginName: String(pluginConfig.plugin?.name), - teamId: String(pluginConfig.team_id), - }) - } - const usedImports: Set = new Set() const transformedCode = transformCode(indexJs, hub, AVAILABLE_IMPORTS, usedImports) @@ -232,7 +224,6 @@ export function createPluginConfigVM( const vmResponse = vm.run(responseVar) const { methods, tasks } = vmResponse - statsdTiming('vm_setup_full') vmSetupMsSummary.labels(String(pluginConfig.plugin?.id)).observe(new Date().getTime() - timer.getTime()) return { diff --git a/plugin-server/tests/main/db.test.ts b/plugin-server/tests/main/db.test.ts index 2d0d879dcd62a..a64e8f368f9d3 100644 --- a/plugin-server/tests/main/db.test.ts +++ b/plugin-server/tests/main/db.test.ts @@ -652,7 +652,6 @@ describe('DB', () => { beforeEach(() => { jest.spyOn(db, 'fetchGroup') jest.spyOn(db, 'redisGet') - db.statsd = { increment: jest.fn(), timing: jest.fn() } as any }) describe('one group', () => { @@ -705,7 +704,6 @@ describe('DB', () => { const groupDataMissingCounterSpy = jest.spyOn(dbMetrics.groupDataMissingCounter, 'inc') await db.getGroupsColumns(2, [[0, 'unknown_key']]) - expect(db.statsd?.increment).toHaveBeenLastCalledWith('groups_data_missing_entirely') expect(groupDataMissingCounterSpy).toHaveBeenCalledTimes(1) }) }) diff --git a/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-consumer.test.ts b/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-consumer.test.ts index 2444b4cd624d4..6641aa8b87268 100644 --- a/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-consumer.test.ts +++ b/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-consumer.test.ts @@ -62,12 +62,6 @@ describe('eachBatchParallelIngestion with overflow reroute', () => { bufferSleep: jest.fn(), pluginsServer: { INGESTION_CONCURRENCY: 4, - statsd: { - timing: jest.fn(), - increment: jest.fn(), - histogram: jest.fn(), - gauge: jest.fn(), - }, kafkaProducer: { produce: jest.fn(), }, diff --git a/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-overflow-consumer.test.ts b/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-overflow-consumer.test.ts index b4d54bd533906..70c242be766c6 100644 --- a/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-overflow-consumer.test.ts +++ b/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-overflow-consumer.test.ts @@ -58,12 +58,6 @@ describe('eachBatchParallelIngestion with overflow consume', () => { bufferSleep: jest.fn(), pluginsServer: { INGESTION_CONCURRENCY: 4, - statsd: { - timing: jest.fn(), - increment: jest.fn(), - histogram: jest.fn(), - gauge: jest.fn(), - }, kafkaProducer: { queueMessage: jest.fn(), }, diff --git a/plugin-server/tests/main/ingestion-queues/each-batch.test.ts b/plugin-server/tests/main/ingestion-queues/each-batch.test.ts index 6816f8f1d9278..709262ed79aa6 100644 --- a/plugin-server/tests/main/ingestion-queues/each-batch.test.ts +++ b/plugin-server/tests/main/ingestion-queues/each-batch.test.ts @@ -132,12 +132,6 @@ describe('eachBatchX', () => { TASKS_PER_WORKER: 10, INGESTION_CONCURRENCY: 4, BUFFER_CONVERSION_SECONDS: 60, - statsd: { - timing: jest.fn(), - increment: jest.fn(), - histogram: jest.fn(), - gauge: jest.fn(), - }, kafkaProducer: { queueMessage: jest.fn(), }, @@ -217,19 +211,12 @@ describe('eachBatchX', () => { queue.pluginsServer.organizationManager, new Set(), queue.pluginsServer.appMetrics, - undefined, queue.pluginsServer.EXTERNAL_REQUEST_TIMEOUT_MS ) const matchSpy = jest.spyOn(actionMatcher, 'match') // mock hasWebhooks to return true actionMatcher.hasWebhooks = jest.fn(() => true) - await eachBatchWebhooksHandlers( - createKafkaJSBatch(clickhouseEvent), - actionMatcher, - hookCannon, - queue.pluginsServer.statsd, - 10 - ) + await eachBatchWebhooksHandlers(createKafkaJSBatch(clickhouseEvent), actionMatcher, hookCannon, 10) // NOTE: really it would be nice to verify that fire has been called // on hookCannon, but that would require a little more setup, and it @@ -524,17 +511,7 @@ describe('eachBatchX', () => { const tokenBlockList = buildStringMatcher('another_token,more_token', false) await eachBatchParallelIngestion(tokenBlockList, batch, queue, IngestionOverflowMode.Disabled) expect(runEventPipeline).toHaveBeenCalledTimes(14) - expect(queue.pluginsServer.statsd.histogram).toHaveBeenCalledWith( - 'ingest_event_batching.input_length', - 14, - { - key: 'ingestion', - } - ) expect(ingestEventBatchingInputLengthSummarySpy).toHaveBeenCalledWith(14) - expect(queue.pluginsServer.statsd.histogram).toHaveBeenCalledWith('ingest_event_batching.batch_count', 5, { - key: 'ingestion', - }) expect(ingestEventBatchingBatchCountSummarySpy).toHaveBeenCalledWith(5) }) diff --git a/plugin-server/tests/main/ingestion-queues/kafka-queue.test.ts b/plugin-server/tests/main/ingestion-queues/kafka-queue.test.ts index 5cb0bc512621f..4f5f50cb7dd12 100644 --- a/plugin-server/tests/main/ingestion-queues/kafka-queue.test.ts +++ b/plugin-server/tests/main/ingestion-queues/kafka-queue.test.ts @@ -54,11 +54,6 @@ describe.skip('IngestionConsumer', () => { test('consumer consumes from both topics - ingestion and buffer', async () => { expect((await hub.db.fetchEvents()).length).toBe(0) - hub.statsd = { - timing: jest.fn(), - increment: jest.fn(), - gauge: jest.fn(), - } as any const uuid = new UUIDT().toString() diff --git a/plugin-server/tests/worker/ingestion/event-pipeline/event-pipeline-integration.test.ts b/plugin-server/tests/worker/ingestion/event-pipeline/event-pipeline-integration.test.ts index c4de100994479..f2603cee6fac3 100644 --- a/plugin-server/tests/worker/ingestion/event-pipeline/event-pipeline-integration.test.ts +++ b/plugin-server/tests/worker/ingestion/event-pipeline/event-pipeline-integration.test.ts @@ -53,7 +53,6 @@ describe('Event Pipeline integration test', () => { hub.teamManager, hub.organizationManager, hub.appMetrics, - undefined, hub.EXTERNAL_REQUEST_TIMEOUT_MS ) diff --git a/plugin-server/tests/worker/ingestion/event-pipeline/pluginsProcessEventStep.test.ts b/plugin-server/tests/worker/ingestion/event-pipeline/pluginsProcessEventStep.test.ts index a4f00dc69230f..eb65cd7aa1e03 100644 --- a/plugin-server/tests/worker/ingestion/event-pipeline/pluginsProcessEventStep.test.ts +++ b/plugin-server/tests/worker/ingestion/event-pipeline/pluginsProcessEventStep.test.ts @@ -24,12 +24,6 @@ describe('pluginsProcessEventStep()', () => { beforeEach(() => { runner = { nextStep: (...args: any[]) => args, - hub: { - statsd: { - increment: jest.fn(), - timing: jest.fn(), - }, - }, } }) @@ -49,7 +43,6 @@ describe('pluginsProcessEventStep()', () => { const response = await pluginsProcessEventStep(runner, pluginEvent) expect(response).toEqual(null) - expect(runner.hub.statsd.increment).toHaveBeenCalledWith('kafka_queue.dropped_event', { teamID: '2' }) expect(droppedEventCounterSpy).toHaveBeenCalledTimes(1) }) }) diff --git a/plugin-server/tests/worker/ingestion/event-pipeline/runner.test.ts b/plugin-server/tests/worker/ingestion/event-pipeline/runner.test.ts index efa410314df50..c0c894f4777b9 100644 --- a/plugin-server/tests/worker/ingestion/event-pipeline/runner.test.ts +++ b/plugin-server/tests/worker/ingestion/event-pipeline/runner.test.ts @@ -89,10 +89,6 @@ describe('EventPipelineRunner', () => { kafkaProducer: { queueMessage: jest.fn() }, fetchPerson: jest.fn(), }, - statsd: { - increment: jest.fn(), - timing: jest.fn(), - }, eventsToDropByToken: createEventsToDropByToken('drop_token:drop_id,drop_token_all:*'), } runner = new TestEventPipelineRunner(hub, pluginEvent) @@ -165,25 +161,11 @@ describe('EventPipelineRunner', () => { const result = await runner.runEventPipeline(pipelineEvent) expect(result.error).toBeUndefined() - expect(hub.statsd.timing).toHaveBeenCalledTimes(5) expect(pipelineStepMsSummarySpy).toHaveBeenCalledTimes(5) - - expect(hub.statsd.increment).toHaveBeenCalledTimes(7) expect(pipelineLastStepCounterSpy).toHaveBeenCalledTimes(1) expect(eventProcessedAndIngestedCounterSpy).toHaveBeenCalledTimes(1) - - expect(hub.statsd.increment).toHaveBeenCalledWith('kafka_queue.event_pipeline.step', { - step: 'createEventStep', - }) expect(pipelineStepMsSummarySpy).toHaveBeenCalledWith('createEventStep') - - expect(hub.statsd.increment).toHaveBeenCalledWith('kafka_queue.event_pipeline.step.last', { - step: 'createEventStep', - team_id: '2', - }) expect(pipelineLastStepCounterSpy).toHaveBeenCalledWith('createEventStep') - - expect(hub.statsd.increment).not.toHaveBeenCalledWith('kafka_queue.event_pipeline.step.error') expect(pipelineStepErrorCounterSpy).not.toHaveBeenCalled() }) @@ -205,16 +187,8 @@ describe('EventPipelineRunner', () => { await runner.runEventPipeline(pipelineEvent) - expect(hub.statsd.timing).toHaveBeenCalledTimes(2) expect(pipelineStepMsSummarySpy).toHaveBeenCalledTimes(2) - - expect(hub.statsd.increment).toHaveBeenCalledWith('kafka_queue.event_pipeline.step.last', { - step: 'pluginsProcessEventStep', - team_id: '2', - }) expect(pipelineLastStepCounterSpy).toHaveBeenCalledWith('pluginsProcessEventStep') - - expect(hub.statsd.increment).not.toHaveBeenCalledWith('kafka_queue.event_pipeline.step.error') expect(pipelineStepErrorCounterSpy).not.toHaveBeenCalled() }) }) @@ -231,27 +205,10 @@ describe('EventPipelineRunner', () => { await runner.runEventPipeline(pipelineEvent) - expect(hub.statsd.increment).toHaveBeenCalledWith('kafka_queue.event_pipeline.step', { - step: 'populateTeamDataStep', - }) expect(pipelineStepMsSummarySpy).toHaveBeenCalledWith('populateTeamDataStep') - - expect(hub.statsd.increment).toHaveBeenCalledWith('kafka_queue.event_pipeline.step', { - step: 'pluginsProcessEventStep', - }) expect(pipelineStepMsSummarySpy).toHaveBeenCalledWith('pluginsProcessEventStep') - - expect(hub.statsd.increment).not.toHaveBeenCalledWith('kafka_queue.event_pipeline.step', { - step: 'prepareEventStep', - }) expect(pipelineStepMsSummarySpy).not.toHaveBeenCalledWith('prepareEventStep') - - expect(hub.statsd.increment).not.toHaveBeenCalledWith('kafka_queue.event_pipeline.step.last') expect(pipelineLastStepCounterSpy).not.toHaveBeenCalled() - - expect(hub.statsd.increment).toHaveBeenCalledWith('kafka_queue.event_pipeline.step.error', { - step: 'prepareEventStep', - }) expect(pipelineStepErrorCounterSpy).toHaveBeenCalledWith('prepareEventStep') }) @@ -268,7 +225,6 @@ describe('EventPipelineRunner', () => { error: 'ingestEvent failed. Error: testError', error_location: 'plugin_server_ingest_event:prepareEventStep', }) - expect(hub.statsd.increment).toHaveBeenCalledWith('events_added_to_dead_letter_queue') expect(pipelineStepDLQCounterSpy).toHaveBeenCalledWith('prepareEventStep') }) @@ -279,7 +235,6 @@ describe('EventPipelineRunner', () => { await runner.runEventPipeline(pipelineEvent) expect(hub.db.kafkaProducer.queueMessage).not.toHaveBeenCalled() - expect(hub.statsd.increment).not.toHaveBeenCalledWith('events_added_to_dead_letter_queue') expect(pipelineStepDLQCounterSpy).not.toHaveBeenCalled() }) }) diff --git a/plugin-server/tests/worker/ingestion/hooks.test.ts b/plugin-server/tests/worker/ingestion/hooks.test.ts index 09e2089b0c058..36886c297920d 100644 --- a/plugin-server/tests/worker/ingestion/hooks.test.ts +++ b/plugin-server/tests/worker/ingestion/hooks.test.ts @@ -491,7 +491,6 @@ describe('hooks', () => { {} as any, {} as any, { queueError: () => Promise.resolve(), queueMetric: () => Promise.resolve() } as unknown as AppMetrics, - undefined, 20000 ) }) diff --git a/plugin-server/tests/worker/ingestion/person-state.test.ts b/plugin-server/tests/worker/ingestion/person-state.test.ts index a29435dbcd5c3..9c03e2d058414 100644 --- a/plugin-server/tests/worker/ingestion/person-state.test.ts +++ b/plugin-server/tests/worker/ingestion/person-state.test.ts @@ -7,7 +7,7 @@ import { createHub } from '../../../src/utils/db/hub' import { PostgresUse } from '../../../src/utils/db/postgres' import { defaultRetryConfig } from '../../../src/utils/retries' import { UUIDT } from '../../../src/utils/utils' -import { ageInMonthsLowCardinality, PersonState } from '../../../src/worker/ingestion/person-state' +import { PersonState } from '../../../src/worker/ingestion/person-state' import { delayUntilEventIngested } from '../../helpers/clickhouse' import { createOrganization, createTeam, fetchPostgresPersons, insertRow } from '../../helpers/sql' @@ -69,7 +69,6 @@ describe('PersonState.update()', () => { event.distinct_id!, timestamp, customHub ? customHub.db : hub.db, - customHub ? customHub.statsd : hub.statsd, poEEmbraceJoin, uuid, maxMergeAttempts ?? 3 // the default @@ -1088,10 +1087,6 @@ describe('PersonState.update()', () => { }) describe('illegal aliasing', () => { - beforeEach(() => { - hub.statsd = { increment: jest.fn() } as any - }) - const illegalIds = ['', ' ', 'null', 'undefined', '"undefined"', '[object Object]', '"[object Object]"'] it.each(illegalIds)('stops $identify if current distinct_id is illegal: `%s`', async (illegalId: string) => { const person = await personState({ @@ -2069,30 +2064,4 @@ describe('PersonState.update()', () => { }) }) }) - - describe('ageInMonthsLowCardinality', () => { - beforeEach(() => { - jest.setSystemTime(new Date('2022-03-15')) - }) - it('gets the correct age in months', () => { - let date = DateTime.fromISO('2022-01-16') - expect(ageInMonthsLowCardinality(date)).toEqual(2) - date = DateTime.fromISO('2022-01-14') - expect(ageInMonthsLowCardinality(date)).toEqual(3) - date = DateTime.fromISO('2021-11-25') - expect(ageInMonthsLowCardinality(date)).toEqual(4) - }) - it('returns 0 for future dates', () => { - let date = DateTime.fromISO('2022-06-01') - expect(ageInMonthsLowCardinality(date)).toEqual(0) - date = DateTime.fromISO('2023-01-01') - expect(ageInMonthsLowCardinality(date)).toEqual(0) - }) - it('returns a low cardinality value', () => { - let date = DateTime.fromISO('1990-01-01') - expect(ageInMonthsLowCardinality(date)).toEqual(50) - date = DateTime.fromMillis(0) - expect(ageInMonthsLowCardinality(date)).toEqual(50) - }) - }) })