Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(plugin-server): remove statsd #19027

Merged
merged 2 commits into from
Dec 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions plugin-server/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,6 @@ There's a multitude of settings you can use to control the plugin server. Use th
| KAFKA_MAX_MESSAGE_BATCH_SIZE | Kafka producer batch max size in bytes before flushing | `900000` |
| LOG_LEVEL | minimum log level | `'info'` |
| SENTRY_DSN | Sentry ingestion URL | `null` |
| STATSD_HOST | StatsD host - integration disabled if this is not provided | `null` |
| STATSD_PORT | StatsD port | `8125` |
| STATSD_PREFIX | StatsD prefix | `'plugin-server.'` |
| DISABLE_MMDB | whether to disable MMDB IP location capabilities | `false` |
| INTERNAL_MMDB_SERVER_PORT | port of the internal server used for IP location (0 means random) | `0` |
| DISTINCT_ID_LRU_SIZE | size of persons distinct ID LRU cache | `10000` |
Expand Down
1 change: 0 additions & 1 deletion plugin-server/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@
"fast-deep-equal": "^3.1.3",
"generic-pool": "^3.7.1",
"graphile-worker": "0.13.0",
"hot-shots": "^9.2.0",
"ioredis": "^4.27.6",
"ipaddr.js": "^2.1.0",
"kafkajs": "^2.2.0",
Expand Down
20 changes: 0 additions & 20 deletions plugin-server/pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions plugin-server/src/backfill.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ export async function startBackfill() {
defaultConfig.PLUGIN_SERVER_MODE = null // Disable all consuming capabilities
const noCapability = {}
initApp(defaultConfig)
const [hub, closeHub] = await createHub(defaultConfig, null, noCapability)
const [hub, closeHub] = await createHub(defaultConfig, noCapability)
status.info('🏁', 'Bootstraping done, starting to backfill')

await runBackfill(hub)
Expand Down Expand Up @@ -101,7 +101,7 @@ async function retrieveEvents(
WHERE _timestamp >= '${chTimestampLower}'
AND _timestamp < '${chTimestampHigher}'
AND timestamp >= '${chTimestampLowerTS}'
AND timestamp < '${chTimestampHigherTS}'
AND timestamp < '${chTimestampHigherTS}'
AND event IN ('$merge_dangerously', '$create_alias', '$identify')
AND ((event = '$identify' and JSONExtractString(properties, '$anon_distinct_id') != '') OR
(event != '$identify' and JSONExtractString(properties, 'alias') != ''))
Expand Down
3 changes: 0 additions & 3 deletions plugin-server/src/config/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,6 @@ export function getDefaultConfig(): PluginsServerConfig {
SENTRY_PLUGIN_SERVER_TRACING_SAMPLE_RATE: 0,
SENTRY_PLUGIN_SERVER_PROFILING_SAMPLE_RATE: 0,
HTTP_SERVER_PORT: DEFAULT_HTTP_SERVER_PORT,
STATSD_HOST: null,
STATSD_PORT: 8125,
STATSD_PREFIX: 'plugin-server.',
SCHEDULE_LOCK_TTL: 60,
REDIS_POOL_MIN_SIZE: 1,
REDIS_POOL_MAX_SIZE: 3,
Expand Down
3 changes: 0 additions & 3 deletions plugin-server/src/main/graphile-worker/graphile-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ export class GraphileWorker {
const enqueueFn = () => this._enqueue(jobName, job)

await instrument(
this.hub.statsd,
{
metricName: `job_queues_enqueue_${jobName}`,
key: instrumentationContext?.key ?? '?',
Expand All @@ -84,10 +83,8 @@ export class GraphileWorker {
async _enqueue(jobName: string, job: EnqueuedJob): Promise<void> {
try {
await this.addJob(jobName, job)
this.hub.statsd?.increment('enqueue_job.success', { jobName })
graphileEnqueueJobCounter.labels({ status: 'success', job: jobName }).inc()
} catch (error) {
this.hub.statsd?.increment('enqueue_job.fail', { jobName })
graphileEnqueueJobCounter.labels({ status: 'fail', job: jobName }).inc()
throw error
}
Expand Down
3 changes: 0 additions & 3 deletions plugin-server/src/main/graphile-worker/schedule.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ export async function runScheduledTasks(
taskType: taskType,
runAt: helpers.job.run_at,
})
server.statsd?.increment('skipped_scheduled_tasks', { taskType })
graphileScheduledTaskCounter.labels({ status: 'skipped', task: taskType }).inc()
return
}
Expand All @@ -58,14 +57,12 @@ export async function runScheduledTasks(
topic: KAFKA_SCHEDULED_TASKS,
messages: [{ key: pluginConfigId.toString(), value: JSON.stringify({ taskType, pluginConfigId }) }],
})
server.statsd?.increment('queued_scheduled_task', { taskType })
graphileScheduledTaskCounter.labels({ status: 'queued', task: taskType }).inc()
}
} else {
for (const pluginConfigId of server.pluginSchedule?.[taskType] || []) {
status.info('⏲️', `Running ${taskType} for plugin config with ID ${pluginConfigId}`)
await piscina.run({ task: taskType, args: { pluginConfigId } })
server.statsd?.increment('completed_scheduled_task', { taskType })
graphileScheduledTaskCounter.labels({ status: 'completed', task: taskType }).inc()
}
}
Expand Down
3 changes: 0 additions & 3 deletions plugin-server/src/main/graphile-worker/worker-setup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,6 @@ export function getPluginJobHandlers(hub: Hub, graphileWorker: GraphileWorker, p
pluginJob: async (job) => {
const jobType = (job as EnqueuedPluginJob)?.type
jobsTriggeredCounter.labels(jobType).inc()
hub.statsd?.increment('triggered_job', {
instanceId: hub.instanceId.toString(),
})
try {
await piscina.run({ task: 'runPluginJob', args: { job: job as EnqueuedPluginJob } })
jobsExecutionSuccessCounter.labels(jobType).inc()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,13 +120,7 @@ export async function eachBatchParallelIngestion(
splitBatch.toProcess.sort((a, b) => a.length - b.length)

ingestEventBatchingInputLengthSummary.observe(messages.length)
queue.pluginsServer.statsd?.histogram('ingest_event_batching.input_length', messages.length, {
key: metricKey,
})
ingestEventBatchingBatchCountSummary.observe(splitBatch.toProcess.length)
queue.pluginsServer.statsd?.histogram('ingest_event_batching.batch_count', splitBatch.toProcess.length, {
key: metricKey,
})
prepareSpan.finish()

const processingPromises: Array<Promise<void>> = []
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ export async function eachBatchAppsOnEventHandlers(
payload,
(teamId) => queue.pluginsServer.pluginConfigsPerTeam.has(teamId),
(event) => eachMessageAppsOnEventHandlers(event, queue),
queue.pluginsServer.statsd,
queue.pluginsServer.WORKER_CONCURRENCY * queue.pluginsServer.TASKS_PER_WORKER,
'on_event'
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import * as Sentry from '@sentry/node'
import { StatsD } from 'hot-shots'
import { EachBatchPayload, KafkaMessage } from 'kafkajs'
import { Counter } from 'prom-client'
import { ActionMatcher } from 'worker/ingestion/action-matcher'
Expand Down Expand Up @@ -62,14 +61,12 @@ export async function eachBatchWebhooksHandlers(
payload: EachBatchPayload,
actionMatcher: ActionMatcher,
hookCannon: HookCommander,
statsd: StatsD | undefined,
concurrency: number
): Promise<void> {
await eachBatchHandlerHelper(
payload,
(teamId) => actionMatcher.hasWebhooks(teamId),
(event) => eachMessageWebhooksHandlers(event, actionMatcher, hookCannon, statsd),
statsd,
(event) => eachMessageWebhooksHandlers(event, actionMatcher, hookCannon),
concurrency,
'webhooks'
)
Expand All @@ -79,7 +76,6 @@ export async function eachBatchHandlerHelper(
payload: EachBatchPayload,
shouldProcess: (teamId: number) => boolean,
eachMessageHandler: (event: RawClickHouseEvent) => Promise<void>,
statsd: StatsD | undefined,
concurrency: number,
stats_key: string
): Promise<void> {
Expand All @@ -95,8 +91,6 @@ export async function eachBatchHandlerHelper(
try {
const batchesWithOffsets = groupIntoBatchesByUsage(batch.messages, concurrency, shouldProcess)

statsd?.histogram('ingest_event_batching.input_length', batch.messages.length, { key: key })
statsd?.histogram('ingest_event_batching.batch_count', batchesWithOffsets.length, { key: key })
ingestEventBatchingInputLengthSummary.observe(batch.messages.length)
ingestEventBatchingBatchCountSummary.observe(batchesWithOffsets.length)

Expand Down Expand Up @@ -145,8 +139,7 @@ export async function eachBatchHandlerHelper(
export async function eachMessageWebhooksHandlers(
clickHouseEvent: RawClickHouseEvent,
actionMatcher: ActionMatcher,
hookCannon: HookCommander,
statsd: StatsD | undefined
hookCannon: HookCommander
): Promise<void> {
if (!actionMatcher.hasWebhooks(clickHouseEvent.team_id)) {
// exit early if no webhooks nor resthooks
Expand All @@ -162,7 +155,7 @@ export async function eachMessageWebhooksHandlers(
convertToProcessedPluginEvent(event)

await runInstrumentedFunction({
func: () => runWebhooks(statsd, actionMatcher, hookCannon, event),
func: () => runWebhooks(actionMatcher, hookCannon, event),
statsKey: `kafka_queue.process_async_handlers_webhooks`,
timeoutMessage: 'After 30 seconds still running runWebhooksHandlersEventPipeline',
timeoutContext: () => ({
Expand All @@ -172,21 +165,13 @@ export async function eachMessageWebhooksHandlers(
})
}

async function runWebhooks(
statsd: StatsD | undefined,
actionMatcher: ActionMatcher,
hookCannon: HookCommander,
event: PostIngestionEvent
) {
async function runWebhooks(actionMatcher: ActionMatcher, hookCannon: HookCommander, event: PostIngestionEvent) {
const timer = new Date()

try {
await processWebhooksStep(event, actionMatcher, hookCannon)
statsd?.increment('kafka_queue.event_pipeline.step', { step: processWebhooksStep.name })
statsd?.timing('kafka_queue.event_pipeline.step.timing', timer, { step: processWebhooksStep.name })
pipelineStepMsSummary.labels('processWebhooksStep').observe(Date.now() - timer.getTime())
} catch (error) {
statsd?.increment('kafka_queue.event_pipeline.step.error', { step: processWebhooksStep.name })
pipelineStepErrorCounter.labels('processWebhooksStep').inc()

if (error instanceof DependencyUnavailableError) {
Expand Down
7 changes: 1 addition & 6 deletions plugin-server/src/main/ingestion-queues/jobs-consumer.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import { StatsD } from 'hot-shots'
import { EachBatchHandler, Kafka } from 'kafkajs'
import { Counter } from 'prom-client'
import { KafkaProducerWrapper } from 'utils/db/kafka-producer-wrapper'
Expand All @@ -24,12 +23,10 @@ export const startJobsConsumer = async ({
kafka,
producer,
graphileWorker,
statsd,
}: {
kafka: Kafka
producer: KafkaProducerWrapper
graphileWorker: GraphileWorker
statsd?: StatsD
}) => {
/*
Consumes from the jobs buffer topic, and enqueues the jobs for execution
Expand Down Expand Up @@ -84,11 +81,9 @@ export const startJobsConsumer = async ({
try {
await graphileWorker.enqueue(JobName.PLUGIN_JOB, job)
jobsConsumerSuccessCounter.inc()
statsd?.increment('jobs_consumer.enqueued')
} catch (error) {
status.error('⚠️', 'Failed to enqueue anonymous event for processing', { error })
jobsConsumerFailuresCounter.inc()
statsd?.increment('jobs_consumer.enqueue_error')

throw error
}
Expand All @@ -114,7 +109,7 @@ export const startJobsConsumer = async ({
await consumer.subscribe({ topic: KAFKA_JOBS })
await consumer.run({
eachBatch: async (payload) => {
return await instrumentEachBatchKafkaJS(KAFKA_JOBS, eachBatch, payload, statsd)
return await instrumentEachBatchKafkaJS(KAFKA_JOBS, eachBatch, payload)
},
})

Expand Down
6 changes: 1 addition & 5 deletions plugin-server/src/main/ingestion-queues/kafka-metrics.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import * as Sentry from '@sentry/node'
import { StatsD } from 'hot-shots'
import { Consumer } from 'kafkajs'
import { KafkaConsumer } from 'node-rdkafka'

Expand All @@ -9,7 +8,7 @@ import {
kafkaConsumerEventRequestPendingMsSummary,
} from './metrics'

export function addMetricsEventListeners(consumer: Consumer, statsd: StatsD | undefined): void {
export function addMetricsEventListeners(consumer: Consumer): void {
const listenEvents = [
consumer.events.GROUP_JOIN,
consumer.events.CONNECT,
Expand All @@ -22,14 +21,11 @@ export function addMetricsEventListeners(consumer: Consumer, statsd: StatsD | un

listenEvents.forEach((event) => {
consumer.on(event, () => {
statsd?.increment('kafka_queue_consumer_event', { event })
kafkaConsumerEventCounter.labels(event).inc()
})
})

consumer.on(consumer.events.REQUEST, ({ payload }) => {
statsd?.timing('kafka_queue_consumer_event_request_duration', payload.duration, 0.01)
statsd?.timing('kafka_queue_consumer_event_request_pending_duration', payload.pendingDuration, 0.01)
kafkaConsumerEventRequestMsSummary.observe(payload.duration)
kafkaConsumerEventRequestPendingMsSummary.observe(payload.pendingDuration)
})
Expand Down
Loading
Loading