Skip to content

Commit

Permalink
app-metrics-for-webhooks
Browse files Browse the repository at this point in the history
  • Loading branch information
tiina303 committed Sep 25, 2023
1 parent 7a8a268 commit 34964e2
Show file tree
Hide file tree
Showing 7 changed files with 110 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import Piscina from '../../worker/piscina'
import { eachBatchAppsOnEventHandlers } from './batch-processing/each-batch-onevent'
import { eachBatchWebhooksHandlers } from './batch-processing/each-batch-webhooks'
import { KafkaJSIngestionConsumer, setupEventHandlers } from './kafka-queue'
import { AppMetrics } from 'worker/ingestion/app-metrics'

export const startAsyncOnEventHandlerConsumer = async ({
hub, // TODO: remove needing to pass in the whole hub and be more selective on dependency injection.
Expand Down Expand Up @@ -55,13 +56,15 @@ export const startAsyncWebhooksHandlerConsumer = async ({
organizationManager,
statsd,
serverConfig,
appMetrics,
}: {
kafka: Kafka
postgres: PostgresRouter
teamManager: TeamManager
organizationManager: OrganizationManager
statsd: StatsD | undefined
serverConfig: PluginsServerConfig
serverConfig: PluginsServerConfig,
appMetrics: AppMetrics,
}) => {
/*
Consumes analytics events from the Kafka topic `clickhouse_events_json`
Expand Down Expand Up @@ -90,7 +93,8 @@ export const startAsyncWebhooksHandlerConsumer = async ({
teamManager,
organizationManager,
new Set(serverConfig.FETCH_HOSTNAME_GUARD_TEAMS.split(',').filter(String).map(Number)),
statsd
appMetrics,
statsd,
)
const concurrency = serverConfig.TASKS_PER_WORKER || 20

Expand Down
12 changes: 11 additions & 1 deletion plugin-server/src/main/pluginsServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,14 @@ import v8Profiler from 'v8-profiler-next'
import { getPluginServerCapabilities } from '../capabilities'
import { defaultConfig, sessionRecordingConsumerConfig } from '../config/config'
import { Hub, PluginServerCapabilities, PluginsServerConfig } from '../types'
import { createHub, createKafkaClient, createStatsdClient } from '../utils/db/hub'
import { createHub, createKafkaClient, createKafkaProducerWrapper, createStatsdClient } from '../utils/db/hub'
import { PostgresRouter } from '../utils/db/postgres'
import { captureEventLoopMetrics } from '../utils/metrics'
import { cancelAllScheduledJobs } from '../utils/node-schedule'
import { PubSub } from '../utils/pubsub'
import { status } from '../utils/status'
import { delay } from '../utils/utils'
import { AppMetrics } from '../worker/ingestion/app-metrics'
import { OrganizationManager } from '../worker/ingestion/organization-manager'
import { TeamManager } from '../worker/ingestion/team-manager'
import Piscina, { makePiscina as defaultMakePiscina } from '../worker/piscina'
Expand Down Expand Up @@ -339,6 +340,14 @@ export async function startPluginsServer(
const kafka = hub?.kafka ?? createKafkaClient(serverConfig)
const teamManager = hub?.teamManager ?? new TeamManager(postgres, serverConfig, statsd)
const organizationManager = hub?.organizationManager ?? new OrganizationManager(postgres, teamManager)
const KafkaProducerWrapper = hub?.kafkaProducer ?? (await createKafkaProducerWrapper(serverConfig))
const appMetrics =
hub?.appMetrics ??
new AppMetrics(
KafkaProducerWrapper,
serverConfig.APP_METRICS_FLUSH_FREQUENCY_MS,
serverConfig.APP_METRICS_FLUSH_MAX_QUEUE_SIZE
)

const { stop: webhooksStopConsumer, isHealthy: isWebhooksIngestionHealthy } =
await startAsyncWebhooksHandlerConsumer({
Expand All @@ -347,6 +356,7 @@ export async function startPluginsServer(
teamManager: teamManager,
organizationManager: organizationManager,
serverConfig: serverConfig,
appMetrics: appMetrics,
statsd: statsd,
})

Expand Down
11 changes: 7 additions & 4 deletions plugin-server/src/utils/db/hub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ pgTypes.setTypeParser(1184 /* types.TypeId.TIMESTAMPTZ */, (timeStr) =>
timeStr ? DateTime.fromSQL(timeStr, { zone: 'utc' }).toISO() : null
)

export async function createKafkaProducerWrapper(serverConfig: PluginsServerConfig): Promise<KafkaProducerWrapper> {
const kafkaConnectionConfig = createRdConnectionConfigFromEnvVars(serverConfig)
const producer = await createKafkaProducer({ ...kafkaConnectionConfig, 'linger.ms': 0 })
return new KafkaProducerWrapper(producer, serverConfig.KAFKA_PRODUCER_WAIT_FOR_ACK)
}

export async function createHub(
config: Partial<PluginsServerConfig> = {},
threadId: number | null = null,
Expand Down Expand Up @@ -101,10 +107,7 @@ export async function createHub(
status.info('🤔', `Connecting to Kafka...`)

const kafka = createKafkaClient(serverConfig)
const kafkaConnectionConfig = createRdConnectionConfigFromEnvVars(serverConfig)
const producer = await createKafkaProducer({ ...kafkaConnectionConfig, 'linger.ms': 0 })

const kafkaProducer = new KafkaProducerWrapper(producer, serverConfig.KAFKA_PRODUCER_WAIT_FOR_ACK)
const kafkaProducer = await createKafkaProducerWrapper(serverConfig)
status.info('👍', `Kafka ready`)

const postgres = new PostgresRouter(serverConfig, statsd)
Expand Down
2 changes: 1 addition & 1 deletion plugin-server/src/worker/ingestion/app-metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ export interface AppMetricIdentifier {
pluginConfigId: number
jobId?: string
// Keep in sync with posthog/queries/app_metrics/serializers.py
category: 'processEvent' | 'onEvent' | 'exportEvents' | 'scheduledTask'
category: 'processEvent' | 'onEvent' | 'exportEvents' | 'scheduledTask' | 'webhook'
}

export interface AppMetric extends AppMetricIdentifier {
Expand Down
48 changes: 46 additions & 2 deletions plugin-server/src/worker/ingestion/hooks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { status } from '../../utils/status'
import { getPropertyValueByPath, stringify } from '../../utils/utils'
import { OrganizationManager } from './organization-manager'
import { TeamManager } from './team-manager'
import { AppMetrics } from './app-metrics'

export const webhookProcessStepDuration = new Histogram({
name: 'webhook_process_event_duration',
Expand Down Expand Up @@ -255,6 +256,7 @@ export class HookCommander {
postgres: PostgresRouter
teamManager: TeamManager
organizationManager: OrganizationManager
appMetrics: AppMetrics
statsd: StatsD | undefined
siteUrl: string
/** null means that the hostname guard is enabled for everyone */
Expand All @@ -268,7 +270,8 @@ export class HookCommander {
teamManager: TeamManager,
organizationManager: OrganizationManager,
fetchHostnameGuardTeams: Set<number> | null = new Set(),
statsd?: StatsD
appMetrics: AppMetrics,
statsd: StatsD | undefined,
) {
this.postgres = postgres
this.teamManager = teamManager
Expand All @@ -281,6 +284,7 @@ export class HookCommander {
this.siteUrl = ''
}
this.statsd = statsd
this.appMetrics = appMetrics
}

public async findAndFireHooks(event: PostIngestionEvent, actionMatches: Action[]): Promise<void> {
Expand All @@ -304,7 +308,7 @@ export class HookCommander {
const webhookRequests = actionMatches
.filter((action) => action.post_to_slack)
.map((action) => this.postWebhook(webhookUrl, action, event, team))
await Promise.all(webhookRequests).catch((error) =>
await Promise.all(webhookRequests).catch((error) =>
captureException(error, { tags: { team_id: event.teamId } })
)
})
Expand Down Expand Up @@ -382,6 +386,26 @@ export class HookCommander {
this.statsd?.increment('webhook_firings', {
team_id: event.teamId.toString(),
})
await this.appMetrics.queueMetric({
teamId: event.teamId,
pluginConfigId: 0, // 0 is hardcoded to mean webhooks
category: 'webhook',
successes: 1,
})
} catch (error) {
await this.appMetrics.queueError(
{
teamId: event.teamId,
pluginConfigId: 0, // 0 is hardcoded to mean webhooks
category: 'webhook',
failures: 1,
},
{
error,
event,
}
)
throw error
} finally {
clearTimeout(timeout)
}
Expand Down Expand Up @@ -427,6 +451,26 @@ export class HookCommander {
status.warn('⚠️', `Rest hook failed status ${request.status} for team ${event.teamId}`)
}
this.statsd?.increment('rest_hook_firings')
await this.appMetrics.queueMetric({
teamId: event.teamId,
pluginConfigId: -1, // -1 is hardcoded to mean resthooks
category: 'webhook',
successes: 1,
})
} catch (error) {
await this.appMetrics.queueError(
{
teamId: event.teamId,
pluginConfigId: -1, // -1 is hardcoded to mean resthooks
category: 'webhook',
failures: 1,
},
{
error,
event,
}
)
throw error
} finally {
clearTimeout(timeout)
}
Expand Down
59 changes: 37 additions & 22 deletions plugin-server/tests/worker/ingestion/app-metrics.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { Hub } from '../../../src/types'
import { createHub } from '../../../src/utils/db/hub'
import { KafkaProducerWrapper } from '../../../src/utils/db/kafka-producer-wrapper'
import { UUIDT } from '../../../src/utils/utils'
import { AppMetricIdentifier, AppMetrics } from '../../../src/worker/ingestion/app-metrics'
import { delayUntilEventIngested, resetTestDatabaseClickhouse } from '../../helpers/clickhouse'
Expand All @@ -19,23 +20,23 @@ const uuid2 = new UUIDT().toString()

describe('AppMetrics()', () => {
let appMetrics: AppMetrics
let hub: Hub
let closeHub: () => Promise<void>

beforeEach(async () => {
;[hub, closeHub] = await createHub({ APP_METRICS_FLUSH_FREQUENCY_MS: 100, APP_METRICS_FLUSH_MAX_QUEUE_SIZE: 5 })
appMetrics = new AppMetrics(
hub.kafkaProducer,
hub.APP_METRICS_FLUSH_FREQUENCY_MS,
hub.APP_METRICS_FLUSH_MAX_QUEUE_SIZE
)
// doesn't flush again on the next call, i.e. flust metrics were reset
jest.spyOn(hub.kafkaProducer, 'queueMessage').mockReturnValue(Promise.resolve())
let kafkaProducer: KafkaProducerWrapper

beforeEach(() => {
kafkaProducer = {
producer: jest.fn(),
waitForAck: jest.fn(),
produce: jest.fn(),
queueMessage: jest.fn(),
flush: jest.fn(),
disconnect: jest.fn(),
} as unknown as KafkaProducerWrapper

appMetrics = new AppMetrics(kafkaProducer, 100, 5)
})

afterEach(async () => {
afterEach(() => {
jest.useRealTimers()
await closeHub()
})

describe('queueMetric()', () => {
Expand Down Expand Up @@ -257,7 +258,7 @@ describe('AppMetrics()', () => {

describe('flush()', () => {
it('flushes queued messages', async () => {
const spy = jest.spyOn(hub.kafkaProducer, 'queueMessage')
const spy = jest.spyOn(kafkaProducer, 'queueMessage')

await appMetrics.queueMetric({ ...metric, jobId: '000-000', successes: 1 }, timestamp)
await appMetrics.flush()
Expand All @@ -268,11 +269,25 @@ describe('AppMetrics()', () => {
it('does nothing if nothing queued', async () => {
await appMetrics.flush()

expect(hub.kafkaProducer.queueMessage).not.toHaveBeenCalled()
expect(kafkaProducer.queueMessage).not.toHaveBeenCalled()
})
})

describe('reading writes from clickhouse', () => {
let hub: Hub
let closeHub: () => Promise<void>

beforeEach(async () => {
;[hub, closeHub] = await createHub({
APP_METRICS_FLUSH_FREQUENCY_MS: 100,
APP_METRICS_FLUSH_MAX_QUEUE_SIZE: 5,
})
// doesn't flush again on the next call, i.e. flust metrics were reset
jest.spyOn(hub.kafkaProducer, 'queueMessage').mockReturnValue(Promise.resolve())
})
afterEach(async () => {
await closeHub()
})
async function fetchRowsFromClickhouse() {
return (await hub.db.clickhouseQuery(`SELECT * FROM app_metrics FINAL`)).data
}
Expand All @@ -284,12 +299,12 @@ describe('AppMetrics()', () => {

it('can read its own writes', async () => {
await Promise.all([
appMetrics.queueMetric({ ...metric, successes: 1 }, timestamp),
appMetrics.queueMetric({ ...metric, successes: 2, successesOnRetry: 4 }, timestamp),
appMetrics.queueMetric({ ...metric, failures: 1 }, timestamp),
hub.appMetrics.queueMetric({ ...metric, successes: 1 }, timestamp),
hub.appMetrics.queueMetric({ ...metric, successes: 2, successesOnRetry: 4 }, timestamp),
hub.appMetrics.queueMetric({ ...metric, failures: 1 }, timestamp),
])

await appMetrics.flush()
await hub.appMetrics.flush()
await hub.kafkaProducer.flush()

const rows = await delayUntilEventIngested(fetchRowsFromClickhouse)
Expand All @@ -314,12 +329,12 @@ describe('AppMetrics()', () => {
it('can read errors', async () => {
jest.spyOn

await appMetrics.queueError(
await hub.appMetrics.queueError(
{ ...metric, failures: 1 },
{ error: new Error('foobar'), eventCount: 1 },
timestamp
),
await appMetrics.flush()
await hub.appMetrics.flush()
await hub.kafkaProducer.flush()

const rows = await delayUntilEventIngested(fetchRowsFromClickhouse)
Expand Down
4 changes: 2 additions & 2 deletions posthog/queries/app_metrics/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
class AppMetricsRequestSerializer(serializers.Serializer):
category = serializers.ChoiceField(
# Keep in sync with plugin-server/src/worker/ingestion/app-metrics.ts
choices=["processEvent", "onEvent", "exportEvents", "scheduledTask"],
choices=["processEvent", "onEvent", "exportEvents", "scheduledTask", "webhook"],
help_text="What to gather metrics for",
required=True,
)
Expand All @@ -22,7 +22,7 @@ class AppMetricsRequestSerializer(serializers.Serializer):
class AppMetricsErrorsRequestSerializer(serializers.Serializer):
category = serializers.ChoiceField(
# Keep in sync with plugin-server/src/worker/ingestion/app-metrics.ts
choices=["processEvent", "onEvent", "exportEvents", "scheduledTask"],
choices=["processEvent", "onEvent", "exportEvents", "scheduledTask", "webhook"],
help_text="What to gather errors for",
required=True,
)
Expand Down

0 comments on commit 34964e2

Please sign in to comment.