diff --git a/plugin-server/src/main/ingestion-queues/on-event-handler-consumer.ts b/plugin-server/src/main/ingestion-queues/on-event-handler-consumer.ts index 31a0e425a40b3..752e13c1a01aa 100644 --- a/plugin-server/src/main/ingestion-queues/on-event-handler-consumer.ts +++ b/plugin-server/src/main/ingestion-queues/on-event-handler-consumer.ts @@ -1,6 +1,7 @@ import { StatsD } from 'hot-shots' import { Consumer, Kafka } from 'kafkajs' import * as schedule from 'node-schedule' +import { AppMetrics } from 'worker/ingestion/app-metrics' import { KAFKA_EVENTS_JSON, prefix as KAFKA_PREFIX } from '../../config/kafka-topics' import { Hub, PluginsServerConfig } from '../../types' @@ -55,6 +56,7 @@ export const startAsyncWebhooksHandlerConsumer = async ({ organizationManager, statsd, serverConfig, + appMetrics, }: { kafka: Kafka postgres: PostgresRouter @@ -62,6 +64,7 @@ export const startAsyncWebhooksHandlerConsumer = async ({ organizationManager: OrganizationManager statsd: StatsD | undefined serverConfig: PluginsServerConfig + appMetrics: AppMetrics }) => { /* Consumes analytics events from the Kafka topic `clickhouse_events_json` @@ -90,6 +93,7 @@ export const startAsyncWebhooksHandlerConsumer = async ({ teamManager, organizationManager, new Set(serverConfig.FETCH_HOSTNAME_GUARD_TEAMS.split(',').filter(String).map(Number)), + appMetrics, statsd ) const concurrency = serverConfig.TASKS_PER_WORKER || 20 diff --git a/plugin-server/src/main/pluginsServer.ts b/plugin-server/src/main/pluginsServer.ts index 30ef80768f985..9f0d34a7d8e86 100644 --- a/plugin-server/src/main/pluginsServer.ts +++ b/plugin-server/src/main/pluginsServer.ts @@ -11,13 +11,14 @@ import v8Profiler from 'v8-profiler-next' import { getPluginServerCapabilities } from '../capabilities' import { defaultConfig, sessionRecordingConsumerConfig } from '../config/config' import { Hub, PluginServerCapabilities, PluginsServerConfig } from '../types' -import { createHub, createKafkaClient, createStatsdClient } from '../utils/db/hub' +import { createHub, createKafkaClient, createKafkaProducerWrapper, createStatsdClient } from '../utils/db/hub' import { PostgresRouter } from '../utils/db/postgres' import { captureEventLoopMetrics } from '../utils/metrics' import { cancelAllScheduledJobs } from '../utils/node-schedule' import { PubSub } from '../utils/pubsub' import { status } from '../utils/status' import { delay } from '../utils/utils' +import { AppMetrics } from '../worker/ingestion/app-metrics' import { OrganizationManager } from '../worker/ingestion/organization-manager' import { TeamManager } from '../worker/ingestion/team-manager' import Piscina, { makePiscina as defaultMakePiscina } from '../worker/piscina' @@ -339,6 +340,14 @@ export async function startPluginsServer( const kafka = hub?.kafka ?? createKafkaClient(serverConfig) const teamManager = hub?.teamManager ?? new TeamManager(postgres, serverConfig, statsd) const organizationManager = hub?.organizationManager ?? new OrganizationManager(postgres, teamManager) + const KafkaProducerWrapper = hub?.kafkaProducer ?? (await createKafkaProducerWrapper(serverConfig)) + const appMetrics = + hub?.appMetrics ?? + new AppMetrics( + KafkaProducerWrapper, + serverConfig.APP_METRICS_FLUSH_FREQUENCY_MS, + serverConfig.APP_METRICS_FLUSH_MAX_QUEUE_SIZE + ) const { stop: webhooksStopConsumer, isHealthy: isWebhooksIngestionHealthy } = await startAsyncWebhooksHandlerConsumer({ @@ -347,6 +356,7 @@ export async function startPluginsServer( teamManager: teamManager, organizationManager: organizationManager, serverConfig: serverConfig, + appMetrics: appMetrics, statsd: statsd, }) diff --git a/plugin-server/src/utils/db/hub.ts b/plugin-server/src/utils/db/hub.ts index 2ae134ae1fb6a..178e50d65b32c 100644 --- a/plugin-server/src/utils/db/hub.ts +++ b/plugin-server/src/utils/db/hub.ts @@ -51,6 +51,12 @@ pgTypes.setTypeParser(1184 /* types.TypeId.TIMESTAMPTZ */, (timeStr) => timeStr ? DateTime.fromSQL(timeStr, { zone: 'utc' }).toISO() : null ) +export async function createKafkaProducerWrapper(serverConfig: PluginsServerConfig): Promise { + const kafkaConnectionConfig = createRdConnectionConfigFromEnvVars(serverConfig) + const producer = await createKafkaProducer({ ...kafkaConnectionConfig, 'linger.ms': 0 }) + return new KafkaProducerWrapper(producer, serverConfig.KAFKA_PRODUCER_WAIT_FOR_ACK) +} + export async function createHub( config: Partial = {}, threadId: number | null = null, @@ -101,10 +107,7 @@ export async function createHub( status.info('🤔', `Connecting to Kafka...`) const kafka = createKafkaClient(serverConfig) - const kafkaConnectionConfig = createRdConnectionConfigFromEnvVars(serverConfig) - const producer = await createKafkaProducer({ ...kafkaConnectionConfig, 'linger.ms': 0 }) - - const kafkaProducer = new KafkaProducerWrapper(producer, serverConfig.KAFKA_PRODUCER_WAIT_FOR_ACK) + const kafkaProducer = await createKafkaProducerWrapper(serverConfig) status.info('👍', `Kafka ready`) const postgres = new PostgresRouter(serverConfig, statsd) diff --git a/plugin-server/src/worker/ingestion/app-metrics.ts b/plugin-server/src/worker/ingestion/app-metrics.ts index 6cf4297cda476..7b79eb163ae80 100644 --- a/plugin-server/src/worker/ingestion/app-metrics.ts +++ b/plugin-server/src/worker/ingestion/app-metrics.ts @@ -15,7 +15,7 @@ export interface AppMetricIdentifier { pluginConfigId: number jobId?: string // Keep in sync with posthog/queries/app_metrics/serializers.py - category: 'processEvent' | 'onEvent' | 'exportEvents' | 'scheduledTask' + category: 'processEvent' | 'onEvent' | 'exportEvents' | 'scheduledTask' | 'webhook' } export interface AppMetric extends AppMetricIdentifier { diff --git a/plugin-server/src/worker/ingestion/hooks.ts b/plugin-server/src/worker/ingestion/hooks.ts index 85464a075a567..f291753c8d15c 100644 --- a/plugin-server/src/worker/ingestion/hooks.ts +++ b/plugin-server/src/worker/ingestion/hooks.ts @@ -9,6 +9,7 @@ import { isCloud } from '../../utils/env-utils' import { safeTrackedFetch, trackedFetch } from '../../utils/fetch' import { status } from '../../utils/status' import { getPropertyValueByPath, stringify } from '../../utils/utils' +import { AppMetrics } from './app-metrics' import { OrganizationManager } from './organization-manager' import { TeamManager } from './team-manager' @@ -255,6 +256,7 @@ export class HookCommander { postgres: PostgresRouter teamManager: TeamManager organizationManager: OrganizationManager + appMetrics: AppMetrics statsd: StatsD | undefined siteUrl: string /** null means that the hostname guard is enabled for everyone */ @@ -268,7 +270,8 @@ export class HookCommander { teamManager: TeamManager, organizationManager: OrganizationManager, fetchHostnameGuardTeams: Set | null = new Set(), - statsd?: StatsD + appMetrics: AppMetrics, + statsd: StatsD | undefined ) { this.postgres = postgres this.teamManager = teamManager @@ -281,6 +284,7 @@ export class HookCommander { this.siteUrl = '' } this.statsd = statsd + this.appMetrics = appMetrics } public async findAndFireHooks(event: PostIngestionEvent, actionMatches: Action[]): Promise { @@ -383,6 +387,26 @@ export class HookCommander { this.statsd?.increment('webhook_firings', { team_id: event.teamId.toString(), }) + await this.appMetrics.queueMetric({ + teamId: event.teamId, + pluginConfigId: -2, // -2 is hardcoded to mean webhooks + category: 'webhook', + successes: 1, + }) + } catch (error) { + await this.appMetrics.queueError( + { + teamId: event.teamId, + pluginConfigId: -2, // -2 is hardcoded to mean webhooks + category: 'webhook', + failures: 1, + }, + { + error, + event, + } + ) + throw error } finally { clearTimeout(timeout) } @@ -429,6 +453,26 @@ export class HookCommander { status.warn('⚠️', `Rest hook failed status ${request.status} for team ${event.teamId}`) } this.statsd?.increment('rest_hook_firings') + await this.appMetrics.queueMetric({ + teamId: event.teamId, + pluginConfigId: -1, // -1 is hardcoded to mean resthooks + category: 'webhook', + successes: 1, + }) + } catch (error) { + await this.appMetrics.queueError( + { + teamId: event.teamId, + pluginConfigId: -1, // -1 is hardcoded to mean resthooks + category: 'webhook', + failures: 1, + }, + { + error, + event, + } + ) + throw error } finally { clearTimeout(timeout) } diff --git a/plugin-server/tests/worker/ingestion/app-metrics.test.ts b/plugin-server/tests/worker/ingestion/app-metrics.test.ts index 43a2b07364208..dda7d6dc30e97 100644 --- a/plugin-server/tests/worker/ingestion/app-metrics.test.ts +++ b/plugin-server/tests/worker/ingestion/app-metrics.test.ts @@ -1,5 +1,6 @@ import { Hub } from '../../../src/types' import { createHub } from '../../../src/utils/db/hub' +import { KafkaProducerWrapper } from '../../../src/utils/db/kafka-producer-wrapper' import { UUIDT } from '../../../src/utils/utils' import { AppMetricIdentifier, AppMetrics } from '../../../src/worker/ingestion/app-metrics' import { delayUntilEventIngested, resetTestDatabaseClickhouse } from '../../helpers/clickhouse' @@ -19,23 +20,23 @@ const uuid2 = new UUIDT().toString() describe('AppMetrics()', () => { let appMetrics: AppMetrics - let hub: Hub - let closeHub: () => Promise - - beforeEach(async () => { - ;[hub, closeHub] = await createHub({ APP_METRICS_FLUSH_FREQUENCY_MS: 100, APP_METRICS_FLUSH_MAX_QUEUE_SIZE: 5 }) - appMetrics = new AppMetrics( - hub.kafkaProducer, - hub.APP_METRICS_FLUSH_FREQUENCY_MS, - hub.APP_METRICS_FLUSH_MAX_QUEUE_SIZE - ) - // doesn't flush again on the next call, i.e. flust metrics were reset - jest.spyOn(hub.kafkaProducer, 'queueMessage').mockReturnValue(Promise.resolve()) + let kafkaProducer: KafkaProducerWrapper + + beforeEach(() => { + kafkaProducer = { + producer: jest.fn(), + waitForAck: jest.fn(), + produce: jest.fn(), + queueMessage: jest.fn(), + flush: jest.fn(), + disconnect: jest.fn(), + } as unknown as KafkaProducerWrapper + + appMetrics = new AppMetrics(kafkaProducer, 100, 5) }) - afterEach(async () => { + afterEach(() => { jest.useRealTimers() - await closeHub() }) describe('queueMetric()', () => { @@ -257,7 +258,7 @@ describe('AppMetrics()', () => { describe('flush()', () => { it('flushes queued messages', async () => { - const spy = jest.spyOn(hub.kafkaProducer, 'queueMessage') + const spy = jest.spyOn(kafkaProducer, 'queueMessage') await appMetrics.queueMetric({ ...metric, jobId: '000-000', successes: 1 }, timestamp) await appMetrics.flush() @@ -268,11 +269,25 @@ describe('AppMetrics()', () => { it('does nothing if nothing queued', async () => { await appMetrics.flush() - expect(hub.kafkaProducer.queueMessage).not.toHaveBeenCalled() + expect(kafkaProducer.queueMessage).not.toHaveBeenCalled() }) }) describe('reading writes from clickhouse', () => { + let hub: Hub + let closeHub: () => Promise + + beforeEach(async () => { + ;[hub, closeHub] = await createHub({ + APP_METRICS_FLUSH_FREQUENCY_MS: 100, + APP_METRICS_FLUSH_MAX_QUEUE_SIZE: 5, + }) + // doesn't flush again on the next call, i.e. flust metrics were reset + jest.spyOn(hub.kafkaProducer, 'queueMessage').mockReturnValue(Promise.resolve()) + }) + afterEach(async () => { + await closeHub() + }) async function fetchRowsFromClickhouse() { return (await hub.db.clickhouseQuery(`SELECT * FROM app_metrics FINAL`)).data } @@ -284,12 +299,12 @@ describe('AppMetrics()', () => { it('can read its own writes', async () => { await Promise.all([ - appMetrics.queueMetric({ ...metric, successes: 1 }, timestamp), - appMetrics.queueMetric({ ...metric, successes: 2, successesOnRetry: 4 }, timestamp), - appMetrics.queueMetric({ ...metric, failures: 1 }, timestamp), + hub.appMetrics.queueMetric({ ...metric, successes: 1 }, timestamp), + hub.appMetrics.queueMetric({ ...metric, successes: 2, successesOnRetry: 4 }, timestamp), + hub.appMetrics.queueMetric({ ...metric, failures: 1 }, timestamp), ]) - await appMetrics.flush() + await hub.appMetrics.flush() await hub.kafkaProducer.flush() const rows = await delayUntilEventIngested(fetchRowsFromClickhouse) @@ -314,12 +329,12 @@ describe('AppMetrics()', () => { it('can read errors', async () => { jest.spyOn - await appMetrics.queueError( + await hub.appMetrics.queueError( { ...metric, failures: 1 }, { error: new Error('foobar'), eventCount: 1 }, timestamp ), - await appMetrics.flush() + await hub.appMetrics.flush() await hub.kafkaProducer.flush() const rows = await delayUntilEventIngested(fetchRowsFromClickhouse) diff --git a/plugin-server/tests/worker/ingestion/hooks.test.ts b/plugin-server/tests/worker/ingestion/hooks.test.ts index 19e1f0eb684e5..8f8a1337c0276 100644 --- a/plugin-server/tests/worker/ingestion/hooks.test.ts +++ b/plugin-server/tests/worker/ingestion/hooks.test.ts @@ -4,6 +4,7 @@ import fetch, { FetchError } from 'node-fetch' import { Action, PostIngestionEvent, Team } from '../../../src/types' import { isCloud } from '../../../src/utils/env-utils' import { UUIDT } from '../../../src/utils/utils' +import { AppMetrics } from '../../../src/worker/ingestion/app-metrics' import { determineWebhookType, getActionDetails, @@ -489,7 +490,9 @@ describe('hooks', () => { {} as any, {} as any, {} as any, - new Set([hook.team_id]) // Hostname guard enabled + new Set([hook.team_id]), // Hostname guard enabled + // mock object with queueError function as no-op + { queueError: () => Promise.resolve(), queueMetric: () => Promise.resolve() } as AppMetrics ) }) diff --git a/posthog/queries/app_metrics/serializers.py b/posthog/queries/app_metrics/serializers.py index ee27d39861de9..f32aa6b01d753 100644 --- a/posthog/queries/app_metrics/serializers.py +++ b/posthog/queries/app_metrics/serializers.py @@ -4,7 +4,7 @@ class AppMetricsRequestSerializer(serializers.Serializer): category = serializers.ChoiceField( # Keep in sync with plugin-server/src/worker/ingestion/app-metrics.ts - choices=["processEvent", "onEvent", "exportEvents", "scheduledTask"], + choices=["processEvent", "onEvent", "exportEvents", "scheduledTask", "webhook"], help_text="What to gather metrics for", required=True, ) @@ -22,7 +22,7 @@ class AppMetricsRequestSerializer(serializers.Serializer): class AppMetricsErrorsRequestSerializer(serializers.Serializer): category = serializers.ChoiceField( # Keep in sync with plugin-server/src/worker/ingestion/app-metrics.ts - choices=["processEvent", "onEvent", "exportEvents", "scheduledTask"], + choices=["processEvent", "onEvent", "exportEvents", "scheduledTask", "webhook"], help_text="What to gather errors for", required=True, )