Skip to content

Commit

Permalink
app-metrics-for-webhooks
Browse files Browse the repository at this point in the history
  • Loading branch information
tiina303 committed Sep 25, 2023
1 parent 7a8a268 commit ccd8739
Show file tree
Hide file tree
Showing 7 changed files with 107 additions and 31 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { StatsD } from 'hot-shots'
import { Consumer, Kafka } from 'kafkajs'
import * as schedule from 'node-schedule'
import { AppMetrics } from 'worker/ingestion/app-metrics'

import { KAFKA_EVENTS_JSON, prefix as KAFKA_PREFIX } from '../../config/kafka-topics'
import { Hub, PluginsServerConfig } from '../../types'
Expand Down Expand Up @@ -55,13 +56,15 @@ export const startAsyncWebhooksHandlerConsumer = async ({
organizationManager,
statsd,
serverConfig,
appMetrics,
}: {
kafka: Kafka
postgres: PostgresRouter
teamManager: TeamManager
organizationManager: OrganizationManager
statsd: StatsD | undefined
serverConfig: PluginsServerConfig
appMetrics: AppMetrics
}) => {
/*
Consumes analytics events from the Kafka topic `clickhouse_events_json`
Expand Down Expand Up @@ -90,6 +93,7 @@ export const startAsyncWebhooksHandlerConsumer = async ({
teamManager,
organizationManager,
new Set(serverConfig.FETCH_HOSTNAME_GUARD_TEAMS.split(',').filter(String).map(Number)),
appMetrics,
statsd
)
const concurrency = serverConfig.TASKS_PER_WORKER || 20
Expand Down
12 changes: 11 additions & 1 deletion plugin-server/src/main/pluginsServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,14 @@ import v8Profiler from 'v8-profiler-next'
import { getPluginServerCapabilities } from '../capabilities'
import { defaultConfig, sessionRecordingConsumerConfig } from '../config/config'
import { Hub, PluginServerCapabilities, PluginsServerConfig } from '../types'
import { createHub, createKafkaClient, createStatsdClient } from '../utils/db/hub'
import { createHub, createKafkaClient, createKafkaProducerWrapper, createStatsdClient } from '../utils/db/hub'
import { PostgresRouter } from '../utils/db/postgres'
import { captureEventLoopMetrics } from '../utils/metrics'
import { cancelAllScheduledJobs } from '../utils/node-schedule'
import { PubSub } from '../utils/pubsub'
import { status } from '../utils/status'
import { delay } from '../utils/utils'
import { AppMetrics } from '../worker/ingestion/app-metrics'
import { OrganizationManager } from '../worker/ingestion/organization-manager'
import { TeamManager } from '../worker/ingestion/team-manager'
import Piscina, { makePiscina as defaultMakePiscina } from '../worker/piscina'
Expand Down Expand Up @@ -339,6 +340,14 @@ export async function startPluginsServer(
const kafka = hub?.kafka ?? createKafkaClient(serverConfig)
const teamManager = hub?.teamManager ?? new TeamManager(postgres, serverConfig, statsd)
const organizationManager = hub?.organizationManager ?? new OrganizationManager(postgres, teamManager)
const KafkaProducerWrapper = hub?.kafkaProducer ?? (await createKafkaProducerWrapper(serverConfig))
const appMetrics =
hub?.appMetrics ??
new AppMetrics(
KafkaProducerWrapper,
serverConfig.APP_METRICS_FLUSH_FREQUENCY_MS,
serverConfig.APP_METRICS_FLUSH_MAX_QUEUE_SIZE
)

const { stop: webhooksStopConsumer, isHealthy: isWebhooksIngestionHealthy } =
await startAsyncWebhooksHandlerConsumer({
Expand All @@ -347,6 +356,7 @@ export async function startPluginsServer(
teamManager: teamManager,
organizationManager: organizationManager,
serverConfig: serverConfig,
appMetrics: appMetrics,
statsd: statsd,
})

Expand Down
11 changes: 7 additions & 4 deletions plugin-server/src/utils/db/hub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ pgTypes.setTypeParser(1184 /* types.TypeId.TIMESTAMPTZ */, (timeStr) =>
timeStr ? DateTime.fromSQL(timeStr, { zone: 'utc' }).toISO() : null
)

export async function createKafkaProducerWrapper(serverConfig: PluginsServerConfig): Promise<KafkaProducerWrapper> {
const kafkaConnectionConfig = createRdConnectionConfigFromEnvVars(serverConfig)
const producer = await createKafkaProducer({ ...kafkaConnectionConfig, 'linger.ms': 0 })
return new KafkaProducerWrapper(producer, serverConfig.KAFKA_PRODUCER_WAIT_FOR_ACK)
}

export async function createHub(
config: Partial<PluginsServerConfig> = {},
threadId: number | null = null,
Expand Down Expand Up @@ -101,10 +107,7 @@ export async function createHub(
status.info('🤔', `Connecting to Kafka...`)

const kafka = createKafkaClient(serverConfig)
const kafkaConnectionConfig = createRdConnectionConfigFromEnvVars(serverConfig)
const producer = await createKafkaProducer({ ...kafkaConnectionConfig, 'linger.ms': 0 })

const kafkaProducer = new KafkaProducerWrapper(producer, serverConfig.KAFKA_PRODUCER_WAIT_FOR_ACK)
const kafkaProducer = await createKafkaProducerWrapper(serverConfig)
status.info('👍', `Kafka ready`)

const postgres = new PostgresRouter(serverConfig, statsd)
Expand Down
2 changes: 1 addition & 1 deletion plugin-server/src/worker/ingestion/app-metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ export interface AppMetricIdentifier {
pluginConfigId: number
jobId?: string
// Keep in sync with posthog/queries/app_metrics/serializers.py
category: 'processEvent' | 'onEvent' | 'exportEvents' | 'scheduledTask'
category: 'processEvent' | 'onEvent' | 'exportEvents' | 'scheduledTask' | 'webhook'
}

export interface AppMetric extends AppMetricIdentifier {
Expand Down
46 changes: 45 additions & 1 deletion plugin-server/src/worker/ingestion/hooks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { isCloud } from '../../utils/env-utils'
import { safeTrackedFetch, trackedFetch } from '../../utils/fetch'
import { status } from '../../utils/status'
import { getPropertyValueByPath, stringify } from '../../utils/utils'
import { AppMetrics } from './app-metrics'
import { OrganizationManager } from './organization-manager'
import { TeamManager } from './team-manager'

Expand Down Expand Up @@ -255,6 +256,7 @@ export class HookCommander {
postgres: PostgresRouter
teamManager: TeamManager
organizationManager: OrganizationManager
appMetrics: AppMetrics
statsd: StatsD | undefined
siteUrl: string
/** null means that the hostname guard is enabled for everyone */
Expand All @@ -268,7 +270,8 @@ export class HookCommander {
teamManager: TeamManager,
organizationManager: OrganizationManager,
fetchHostnameGuardTeams: Set<number> | null = new Set(),
statsd?: StatsD
appMetrics: AppMetrics,
statsd: StatsD | undefined
) {
this.postgres = postgres
this.teamManager = teamManager
Expand All @@ -281,6 +284,7 @@ export class HookCommander {
this.siteUrl = ''
}
this.statsd = statsd
this.appMetrics = appMetrics
}

public async findAndFireHooks(event: PostIngestionEvent, actionMatches: Action[]): Promise<void> {
Expand Down Expand Up @@ -382,6 +386,26 @@ export class HookCommander {
this.statsd?.increment('webhook_firings', {
team_id: event.teamId.toString(),
})
await this.appMetrics.queueMetric({
teamId: event.teamId,
pluginConfigId: 0, // 0 is hardcoded to mean webhooks
category: 'webhook',
successes: 1,
})
} catch (error) {
await this.appMetrics.queueError(
{
teamId: event.teamId,
pluginConfigId: 0, // 0 is hardcoded to mean webhooks
category: 'webhook',
failures: 1,
},
{
error,
event,
}
)
throw error
} finally {
clearTimeout(timeout)
}
Expand Down Expand Up @@ -427,6 +451,26 @@ export class HookCommander {
status.warn('⚠️', `Rest hook failed status ${request.status} for team ${event.teamId}`)
}
this.statsd?.increment('rest_hook_firings')
await this.appMetrics.queueMetric({
teamId: event.teamId,
pluginConfigId: -1, // -1 is hardcoded to mean resthooks
category: 'webhook',
successes: 1,
})
} catch (error) {
await this.appMetrics.queueError(
{
teamId: event.teamId,
pluginConfigId: -1, // -1 is hardcoded to mean resthooks
category: 'webhook',
failures: 1,
},
{
error,
event,
}
)
throw error
} finally {
clearTimeout(timeout)
}
Expand Down
59 changes: 37 additions & 22 deletions plugin-server/tests/worker/ingestion/app-metrics.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { Hub } from '../../../src/types'
import { createHub } from '../../../src/utils/db/hub'
import { KafkaProducerWrapper } from '../../../src/utils/db/kafka-producer-wrapper'
import { UUIDT } from '../../../src/utils/utils'
import { AppMetricIdentifier, AppMetrics } from '../../../src/worker/ingestion/app-metrics'
import { delayUntilEventIngested, resetTestDatabaseClickhouse } from '../../helpers/clickhouse'
Expand All @@ -19,23 +20,23 @@ const uuid2 = new UUIDT().toString()

describe('AppMetrics()', () => {
let appMetrics: AppMetrics
let hub: Hub
let closeHub: () => Promise<void>

beforeEach(async () => {
;[hub, closeHub] = await createHub({ APP_METRICS_FLUSH_FREQUENCY_MS: 100, APP_METRICS_FLUSH_MAX_QUEUE_SIZE: 5 })
appMetrics = new AppMetrics(
hub.kafkaProducer,
hub.APP_METRICS_FLUSH_FREQUENCY_MS,
hub.APP_METRICS_FLUSH_MAX_QUEUE_SIZE
)
// doesn't flush again on the next call, i.e. flust metrics were reset
jest.spyOn(hub.kafkaProducer, 'queueMessage').mockReturnValue(Promise.resolve())
let kafkaProducer: KafkaProducerWrapper

beforeEach(() => {
kafkaProducer = {
producer: jest.fn(),
waitForAck: jest.fn(),
produce: jest.fn(),
queueMessage: jest.fn(),
flush: jest.fn(),
disconnect: jest.fn(),
} as unknown as KafkaProducerWrapper

appMetrics = new AppMetrics(kafkaProducer, 100, 5)
})

afterEach(async () => {
afterEach(() => {
jest.useRealTimers()
await closeHub()
})

describe('queueMetric()', () => {
Expand Down Expand Up @@ -257,7 +258,7 @@ describe('AppMetrics()', () => {

describe('flush()', () => {
it('flushes queued messages', async () => {
const spy = jest.spyOn(hub.kafkaProducer, 'queueMessage')
const spy = jest.spyOn(kafkaProducer, 'queueMessage')

await appMetrics.queueMetric({ ...metric, jobId: '000-000', successes: 1 }, timestamp)
await appMetrics.flush()
Expand All @@ -268,11 +269,25 @@ describe('AppMetrics()', () => {
it('does nothing if nothing queued', async () => {
await appMetrics.flush()

expect(hub.kafkaProducer.queueMessage).not.toHaveBeenCalled()
expect(kafkaProducer.queueMessage).not.toHaveBeenCalled()
})
})

describe('reading writes from clickhouse', () => {
let hub: Hub
let closeHub: () => Promise<void>

beforeEach(async () => {
;[hub, closeHub] = await createHub({
APP_METRICS_FLUSH_FREQUENCY_MS: 100,
APP_METRICS_FLUSH_MAX_QUEUE_SIZE: 5,
})
// doesn't flush again on the next call, i.e. flust metrics were reset
jest.spyOn(hub.kafkaProducer, 'queueMessage').mockReturnValue(Promise.resolve())
})
afterEach(async () => {
await closeHub()
})
async function fetchRowsFromClickhouse() {
return (await hub.db.clickhouseQuery(`SELECT * FROM app_metrics FINAL`)).data
}
Expand All @@ -284,12 +299,12 @@ describe('AppMetrics()', () => {

it('can read its own writes', async () => {
await Promise.all([
appMetrics.queueMetric({ ...metric, successes: 1 }, timestamp),
appMetrics.queueMetric({ ...metric, successes: 2, successesOnRetry: 4 }, timestamp),
appMetrics.queueMetric({ ...metric, failures: 1 }, timestamp),
hub.appMetrics.queueMetric({ ...metric, successes: 1 }, timestamp),
hub.appMetrics.queueMetric({ ...metric, successes: 2, successesOnRetry: 4 }, timestamp),
hub.appMetrics.queueMetric({ ...metric, failures: 1 }, timestamp),
])

await appMetrics.flush()
await hub.appMetrics.flush()
await hub.kafkaProducer.flush()

const rows = await delayUntilEventIngested(fetchRowsFromClickhouse)
Expand All @@ -314,12 +329,12 @@ describe('AppMetrics()', () => {
it('can read errors', async () => {
jest.spyOn

await appMetrics.queueError(
await hub.appMetrics.queueError(
{ ...metric, failures: 1 },
{ error: new Error('foobar'), eventCount: 1 },
timestamp
),
await appMetrics.flush()
await hub.appMetrics.flush()
await hub.kafkaProducer.flush()

const rows = await delayUntilEventIngested(fetchRowsFromClickhouse)
Expand Down
4 changes: 2 additions & 2 deletions posthog/queries/app_metrics/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
class AppMetricsRequestSerializer(serializers.Serializer):
category = serializers.ChoiceField(
# Keep in sync with plugin-server/src/worker/ingestion/app-metrics.ts
choices=["processEvent", "onEvent", "exportEvents", "scheduledTask"],
choices=["processEvent", "onEvent", "exportEvents", "scheduledTask", "webhook"],
help_text="What to gather metrics for",
required=True,
)
Expand All @@ -22,7 +22,7 @@ class AppMetricsRequestSerializer(serializers.Serializer):
class AppMetricsErrorsRequestSerializer(serializers.Serializer):
category = serializers.ChoiceField(
# Keep in sync with plugin-server/src/worker/ingestion/app-metrics.ts
choices=["processEvent", "onEvent", "exportEvents", "scheduledTask"],
choices=["processEvent", "onEvent", "exportEvents", "scheduledTask", "webhook"],
help_text="What to gather errors for",
required=True,
)
Expand Down

0 comments on commit ccd8739

Please sign in to comment.