Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add app metrics to webhooks #17608

Merged
merged 1 commit into from
Oct 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { StatsD } from 'hot-shots'
import { Consumer, Kafka } from 'kafkajs'
import * as schedule from 'node-schedule'
import { AppMetrics } from 'worker/ingestion/app-metrics'

import { KAFKA_EVENTS_JSON, prefix as KAFKA_PREFIX } from '../../config/kafka-topics'
import { Hub, PluginsServerConfig } from '../../types'
Expand Down Expand Up @@ -55,13 +56,15 @@ export const startAsyncWebhooksHandlerConsumer = async ({
organizationManager,
statsd,
serverConfig,
appMetrics,
}: {
kafka: Kafka
postgres: PostgresRouter
teamManager: TeamManager
organizationManager: OrganizationManager
statsd: StatsD | undefined
serverConfig: PluginsServerConfig
appMetrics: AppMetrics
}) => {
/*
Consumes analytics events from the Kafka topic `clickhouse_events_json`
Expand Down Expand Up @@ -90,6 +93,7 @@ export const startAsyncWebhooksHandlerConsumer = async ({
teamManager,
organizationManager,
new Set(serverConfig.FETCH_HOSTNAME_GUARD_TEAMS.split(',').filter(String).map(Number)),
appMetrics,
statsd
)
const concurrency = serverConfig.TASKS_PER_WORKER || 20
Expand Down
12 changes: 11 additions & 1 deletion plugin-server/src/main/pluginsServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,14 @@ import v8Profiler from 'v8-profiler-next'
import { getPluginServerCapabilities } from '../capabilities'
import { defaultConfig, sessionRecordingConsumerConfig } from '../config/config'
import { Hub, PluginServerCapabilities, PluginsServerConfig } from '../types'
import { createHub, createKafkaClient, createStatsdClient } from '../utils/db/hub'
import { createHub, createKafkaClient, createKafkaProducerWrapper, createStatsdClient } from '../utils/db/hub'
import { PostgresRouter } from '../utils/db/postgres'
import { captureEventLoopMetrics } from '../utils/metrics'
import { cancelAllScheduledJobs } from '../utils/node-schedule'
import { PubSub } from '../utils/pubsub'
import { status } from '../utils/status'
import { delay } from '../utils/utils'
import { AppMetrics } from '../worker/ingestion/app-metrics'
import { OrganizationManager } from '../worker/ingestion/organization-manager'
import { TeamManager } from '../worker/ingestion/team-manager'
import Piscina, { makePiscina as defaultMakePiscina } from '../worker/piscina'
Expand Down Expand Up @@ -339,6 +340,14 @@ export async function startPluginsServer(
const kafka = hub?.kafka ?? createKafkaClient(serverConfig)
const teamManager = hub?.teamManager ?? new TeamManager(postgres, serverConfig, statsd)
const organizationManager = hub?.organizationManager ?? new OrganizationManager(postgres, teamManager)
const KafkaProducerWrapper = hub?.kafkaProducer ?? (await createKafkaProducerWrapper(serverConfig))
const appMetrics =
hub?.appMetrics ??
new AppMetrics(
KafkaProducerWrapper,
serverConfig.APP_METRICS_FLUSH_FREQUENCY_MS,
serverConfig.APP_METRICS_FLUSH_MAX_QUEUE_SIZE
)

const { stop: webhooksStopConsumer, isHealthy: isWebhooksIngestionHealthy } =
await startAsyncWebhooksHandlerConsumer({
Expand All @@ -347,6 +356,7 @@ export async function startPluginsServer(
teamManager: teamManager,
organizationManager: organizationManager,
serverConfig: serverConfig,
appMetrics: appMetrics,
statsd: statsd,
})

Expand Down
11 changes: 7 additions & 4 deletions plugin-server/src/utils/db/hub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ pgTypes.setTypeParser(1184 /* types.TypeId.TIMESTAMPTZ */, (timeStr) =>
timeStr ? DateTime.fromSQL(timeStr, { zone: 'utc' }).toISO() : null
)

export async function createKafkaProducerWrapper(serverConfig: PluginsServerConfig): Promise<KafkaProducerWrapper> {
const kafkaConnectionConfig = createRdConnectionConfigFromEnvVars(serverConfig)
const producer = await createKafkaProducer({ ...kafkaConnectionConfig, 'linger.ms': 0 })
return new KafkaProducerWrapper(producer, serverConfig.KAFKA_PRODUCER_WAIT_FOR_ACK)
}

export async function createHub(
config: Partial<PluginsServerConfig> = {},
threadId: number | null = null,
Expand Down Expand Up @@ -101,10 +107,7 @@ export async function createHub(
status.info('🤔', `Connecting to Kafka...`)

const kafka = createKafkaClient(serverConfig)
const kafkaConnectionConfig = createRdConnectionConfigFromEnvVars(serverConfig)
const producer = await createKafkaProducer({ ...kafkaConnectionConfig, 'linger.ms': 0 })

const kafkaProducer = new KafkaProducerWrapper(producer, serverConfig.KAFKA_PRODUCER_WAIT_FOR_ACK)
const kafkaProducer = await createKafkaProducerWrapper(serverConfig)
status.info('👍', `Kafka ready`)

const postgres = new PostgresRouter(serverConfig, statsd)
Expand Down
2 changes: 1 addition & 1 deletion plugin-server/src/worker/ingestion/app-metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ export interface AppMetricIdentifier {
pluginConfigId: number
jobId?: string
// Keep in sync with posthog/queries/app_metrics/serializers.py
category: 'processEvent' | 'onEvent' | 'exportEvents' | 'scheduledTask'
category: 'processEvent' | 'onEvent' | 'exportEvents' | 'scheduledTask' | 'webhook'
}

export interface AppMetric extends AppMetricIdentifier {
Expand Down
46 changes: 45 additions & 1 deletion plugin-server/src/worker/ingestion/hooks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { isCloud } from '../../utils/env-utils'
import { safeTrackedFetch, trackedFetch } from '../../utils/fetch'
import { status } from '../../utils/status'
import { getPropertyValueByPath, stringify } from '../../utils/utils'
import { AppMetrics } from './app-metrics'
import { OrganizationManager } from './organization-manager'
import { TeamManager } from './team-manager'

Expand Down Expand Up @@ -255,6 +256,7 @@ export class HookCommander {
postgres: PostgresRouter
teamManager: TeamManager
organizationManager: OrganizationManager
appMetrics: AppMetrics
statsd: StatsD | undefined
siteUrl: string
/** null means that the hostname guard is enabled for everyone */
Expand All @@ -268,7 +270,8 @@ export class HookCommander {
teamManager: TeamManager,
organizationManager: OrganizationManager,
fetchHostnameGuardTeams: Set<number> | null = new Set(),
statsd?: StatsD
appMetrics: AppMetrics,
statsd: StatsD | undefined
) {
this.postgres = postgres
this.teamManager = teamManager
Expand All @@ -281,6 +284,7 @@ export class HookCommander {
this.siteUrl = ''
}
this.statsd = statsd
this.appMetrics = appMetrics
}

public async findAndFireHooks(event: PostIngestionEvent, actionMatches: Action[]): Promise<void> {
Expand Down Expand Up @@ -382,6 +386,26 @@ export class HookCommander {
this.statsd?.increment('webhook_firings', {
team_id: event.teamId.toString(),
})
await this.appMetrics.queueMetric({
teamId: event.teamId,
pluginConfigId: -2, // -2 is hardcoded to mean webhooks
category: 'webhook',
successes: 1,
})
} catch (error) {
await this.appMetrics.queueError(
{
teamId: event.teamId,
pluginConfigId: -2, // -2 is hardcoded to mean webhooks
category: 'webhook',
failures: 1,
},
{
error,
event,
}
)
throw error
} finally {
clearTimeout(timeout)
}
Expand Down Expand Up @@ -427,6 +451,26 @@ export class HookCommander {
status.warn('⚠️', `Rest hook failed status ${request.status} for team ${event.teamId}`)
}
this.statsd?.increment('rest_hook_firings')
await this.appMetrics.queueMetric({
teamId: event.teamId,
pluginConfigId: -1, // -1 is hardcoded to mean resthooks
category: 'webhook',
successes: 1,
})
} catch (error) {
await this.appMetrics.queueError(
{
teamId: event.teamId,
pluginConfigId: -1, // -1 is hardcoded to mean resthooks
category: 'webhook',
failures: 1,
},
{
error,
event,
}
)
throw error
} finally {
clearTimeout(timeout)
}
Expand Down
59 changes: 37 additions & 22 deletions plugin-server/tests/worker/ingestion/app-metrics.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { Hub } from '../../../src/types'
import { createHub } from '../../../src/utils/db/hub'
import { KafkaProducerWrapper } from '../../../src/utils/db/kafka-producer-wrapper'
import { UUIDT } from '../../../src/utils/utils'
import { AppMetricIdentifier, AppMetrics } from '../../../src/worker/ingestion/app-metrics'
import { delayUntilEventIngested, resetTestDatabaseClickhouse } from '../../helpers/clickhouse'
Expand All @@ -19,23 +20,23 @@ const uuid2 = new UUIDT().toString()

describe('AppMetrics()', () => {
let appMetrics: AppMetrics
let hub: Hub
let closeHub: () => Promise<void>

beforeEach(async () => {
;[hub, closeHub] = await createHub({ APP_METRICS_FLUSH_FREQUENCY_MS: 100, APP_METRICS_FLUSH_MAX_QUEUE_SIZE: 5 })
appMetrics = new AppMetrics(
hub.kafkaProducer,
hub.APP_METRICS_FLUSH_FREQUENCY_MS,
hub.APP_METRICS_FLUSH_MAX_QUEUE_SIZE
)
// doesn't flush again on the next call, i.e. flust metrics were reset
jest.spyOn(hub.kafkaProducer, 'queueMessage').mockReturnValue(Promise.resolve())
let kafkaProducer: KafkaProducerWrapper

beforeEach(() => {
kafkaProducer = {
producer: jest.fn(),
waitForAck: jest.fn(),
produce: jest.fn(),
queueMessage: jest.fn(),
flush: jest.fn(),
disconnect: jest.fn(),
} as unknown as KafkaProducerWrapper

appMetrics = new AppMetrics(kafkaProducer, 100, 5)
})

afterEach(async () => {
afterEach(() => {
jest.useRealTimers()
await closeHub()
})

describe('queueMetric()', () => {
Expand Down Expand Up @@ -257,7 +258,7 @@ describe('AppMetrics()', () => {

describe('flush()', () => {
it('flushes queued messages', async () => {
const spy = jest.spyOn(hub.kafkaProducer, 'queueMessage')
const spy = jest.spyOn(kafkaProducer, 'queueMessage')

await appMetrics.queueMetric({ ...metric, jobId: '000-000', successes: 1 }, timestamp)
await appMetrics.flush()
Expand All @@ -268,11 +269,25 @@ describe('AppMetrics()', () => {
it('does nothing if nothing queued', async () => {
await appMetrics.flush()

expect(hub.kafkaProducer.queueMessage).not.toHaveBeenCalled()
expect(kafkaProducer.queueMessage).not.toHaveBeenCalled()
})
})

describe('reading writes from clickhouse', () => {
let hub: Hub
let closeHub: () => Promise<void>

beforeEach(async () => {
;[hub, closeHub] = await createHub({
APP_METRICS_FLUSH_FREQUENCY_MS: 100,
APP_METRICS_FLUSH_MAX_QUEUE_SIZE: 5,
})
// doesn't flush again on the next call, i.e. flust metrics were reset
jest.spyOn(hub.kafkaProducer, 'queueMessage').mockReturnValue(Promise.resolve())
})
afterEach(async () => {
await closeHub()
})
async function fetchRowsFromClickhouse() {
return (await hub.db.clickhouseQuery(`SELECT * FROM app_metrics FINAL`)).data
}
Expand All @@ -284,12 +299,12 @@ describe('AppMetrics()', () => {

it('can read its own writes', async () => {
await Promise.all([
appMetrics.queueMetric({ ...metric, successes: 1 }, timestamp),
appMetrics.queueMetric({ ...metric, successes: 2, successesOnRetry: 4 }, timestamp),
appMetrics.queueMetric({ ...metric, failures: 1 }, timestamp),
hub.appMetrics.queueMetric({ ...metric, successes: 1 }, timestamp),
hub.appMetrics.queueMetric({ ...metric, successes: 2, successesOnRetry: 4 }, timestamp),
hub.appMetrics.queueMetric({ ...metric, failures: 1 }, timestamp),
])

await appMetrics.flush()
await hub.appMetrics.flush()
await hub.kafkaProducer.flush()

const rows = await delayUntilEventIngested(fetchRowsFromClickhouse)
Expand All @@ -314,12 +329,12 @@ describe('AppMetrics()', () => {
it('can read errors', async () => {
jest.spyOn

await appMetrics.queueError(
await hub.appMetrics.queueError(
{ ...metric, failures: 1 },
{ error: new Error('foobar'), eventCount: 1 },
timestamp
),
await appMetrics.flush()
await hub.appMetrics.flush()
await hub.kafkaProducer.flush()

const rows = await delayUntilEventIngested(fetchRowsFromClickhouse)
Expand Down
5 changes: 4 additions & 1 deletion plugin-server/tests/worker/ingestion/hooks.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import fetch, { FetchError } from 'node-fetch'
import { Action, PostIngestionEvent, Team } from '../../../src/types'
import { isCloud } from '../../../src/utils/env-utils'
import { UUIDT } from '../../../src/utils/utils'
import { AppMetrics } from '../../../src/worker/ingestion/app-metrics'
import {
determineWebhookType,
getActionDetails,
Expand Down Expand Up @@ -489,7 +490,9 @@ describe('hooks', () => {
{} as any,
{} as any,
{} as any,
new Set([hook.team_id]) // Hostname guard enabled
new Set([hook.team_id]), // Hostname guard enabled
// mock object with queueError function as no-op
{ queueError: () => Promise.resolve(), queueMetric: () => Promise.resolve() } as AppMetrics
)
})

Expand Down
4 changes: 2 additions & 2 deletions posthog/queries/app_metrics/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
class AppMetricsRequestSerializer(serializers.Serializer):
category = serializers.ChoiceField(
# Keep in sync with plugin-server/src/worker/ingestion/app-metrics.ts
choices=["processEvent", "onEvent", "exportEvents", "scheduledTask"],
choices=["processEvent", "onEvent", "exportEvents", "scheduledTask", "webhook"],
help_text="What to gather metrics for",
required=True,
)
Expand All @@ -22,7 +22,7 @@ class AppMetricsRequestSerializer(serializers.Serializer):
class AppMetricsErrorsRequestSerializer(serializers.Serializer):
category = serializers.ChoiceField(
# Keep in sync with plugin-server/src/worker/ingestion/app-metrics.ts
choices=["processEvent", "onEvent", "exportEvents", "scheduledTask"],
choices=["processEvent", "onEvent", "exportEvents", "scheduledTask", "webhook"],
help_text="What to gather errors for",
required=True,
)
Expand Down