Skip to content

Commit

Permalink
feat(plugin-server): better profiling capabilities: adjust sampling p…
Browse files Browse the repository at this point in the history
…recision + profile the pod startup (#17343)
  • Loading branch information
xvello authored Sep 11, 2023
1 parent 0747e00 commit 8592992
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 4 deletions.
6 changes: 6 additions & 0 deletions plugin-server/src/config/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,12 @@ export function getDefaultConfig(): PluginsServerConfig {
USE_KAFKA_FOR_SCHEDULED_TASKS: true,
CLOUD_DEPLOYMENT: 'default', // Used as a Sentry tag

STARTUP_PROFILE_DURATION_SECONDS: 300, // 5 minutes
STARTUP_PROFILE_CPU: false,
STARTUP_PROFILE_HEAP: false,
STARTUP_PROFILE_HEAP_INTERVAL: 512 * 1024, // default v8 value
STARTUP_PROFILE_HEAP_DEPTH: 16, // default v8 value

SESSION_RECORDING_KAFKA_HOSTS: undefined,
SESSION_RECORDING_KAFKA_SECURITY_PROTOCOL: undefined,
SESSION_RECORDING_KAFKA_BATCH_SIZE: 500,
Expand Down
26 changes: 26 additions & 0 deletions plugin-server/src/main/pluginsServer.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import * as Sentry from '@sentry/node'
import fs from 'fs'
import { Server } from 'http'
import { CompressionCodecs, CompressionTypes, Consumer, KafkaJSProtocolError } from 'kafkajs'
// @ts-expect-error no type definitions
import SnappyCodec from 'kafkajs-snappy'
import * as schedule from 'node-schedule'
import { Counter } from 'prom-client'
import v8Profiler from 'v8-profiler-next'

import { getPluginServerCapabilities } from '../capabilities'
import { defaultConfig, sessionRecordingConsumerConfig } from '../config/config'
Expand Down Expand Up @@ -63,6 +65,7 @@ export async function startPluginsServer(

status.updatePrompt(serverConfig.PLUGIN_SERVER_MODE)
status.info('ℹ️', `${serverConfig.WORKER_CONCURRENCY} workers, ${serverConfig.TASKS_PER_WORKER} tasks per worker`)
runStartupProfiles(serverConfig)

// Structure containing initialized clients for Postgres, Kafka, Redis, etc.
let hub: Hub | undefined
Expand Down Expand Up @@ -508,3 +511,26 @@ const kafkaProtocolErrors = new Counter({
help: 'Kafka protocol errors encountered, by type',
labelNames: ['type', 'code'],
})

function runStartupProfiles(config: PluginsServerConfig) {
if (config.STARTUP_PROFILE_CPU) {
status.info('🩺', `Collecting cpu profile...`)
v8Profiler.setGenerateType(1)
v8Profiler.startProfiling('startup', true)
setTimeout(() => {
const profile = v8Profiler.stopProfiling('startup')
fs.writeFileSync('./startup.cpuprofile', JSON.stringify(profile))
status.info('🩺', `Wrote cpu profile to disk`)
profile.delete()
}, config.STARTUP_PROFILE_DURATION_SECONDS * 1000)
}
if (config.STARTUP_PROFILE_HEAP) {
status.info('🩺', `Collecting heap profile...`)
v8Profiler.startSamplingHeapProfiling(config.STARTUP_PROFILE_HEAP_INTERVAL, config.STARTUP_PROFILE_HEAP_DEPTH)
setTimeout(() => {
const profile = v8Profiler.stopSamplingHeapProfiling()
fs.writeFileSync('./startup.heapprofile', JSON.stringify(profile))
status.info('🩺', `Wrote heap profile to disk`)
}, config.STARTUP_PROFILE_DURATION_SECONDS * 1000)
}
}
7 changes: 6 additions & 1 deletion plugin-server/src/main/services/http-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,13 @@ function exportProfile(req: IncomingMessage, res: ServerResponse) {
}, durationSeconds * 1000)
break
case 'heap':
// Additional params for sampling heap profile, higher precision means bigger profile.
// Defaults are taken from https://v8.github.io/api/head/classv8_1_1HeapProfiler.html
const interval = url.searchParams.get('interval') ? parseInt(url.searchParams.get('interval')!) : 512 * 1024
const depth = url.searchParams.get('depth') ? parseInt(url.searchParams.get('depth')!) : 16

sendHeaders('heapprofile')
v8Profiler.startSamplingHeapProfiling()
v8Profiler.startSamplingHeapProfiling(interval, depth)
setTimeout(() => {
outputProfileResult(res, type, v8Profiler.stopSamplingHeapProfiling())
}, durationSeconds * 1000)
Expand Down
12 changes: 9 additions & 3 deletions plugin-server/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import { VM } from 'vm2'
import { ObjectStorage } from './main/services/object_storage'
import { DB } from './utils/db/db'
import { KafkaProducerWrapper } from './utils/db/kafka-producer-wrapper'
import { PostgresRouter } from './utils/db/postgres' /** Re-export Element from scaffolding, for backwards compat. */
import { PostgresRouter } from './utils/db/postgres'
import { UUID } from './utils/utils'
import { AppMetrics } from './worker/ingestion/app-metrics'
import { EventPipelineResult } from './worker/ingestion/event-pipeline/runner'
Expand All @@ -33,8 +33,7 @@ import { RootAccessManager } from './worker/vm/extensions/helpers/root-acess-man
import { LazyPluginVM } from './worker/vm/lazy'
import { PromiseManager } from './worker/vm/promise-manager'

/** Re-export Element from scaffolding, for backwards compat. */
export { Element } from '@posthog/plugin-scaffold'
export { Element } from '@posthog/plugin-scaffold' // Re-export Element from scaffolding, for backwards compat.

type Brand<K, T> = K & { __brand: T }

Expand Down Expand Up @@ -201,6 +200,13 @@ export interface PluginsServerConfig {
EVENT_OVERFLOW_BUCKET_REPLENISH_RATE: number
CLOUD_DEPLOYMENT: string

// dump profiles to disk, covering the first N seconds of runtime
STARTUP_PROFILE_DURATION_SECONDS: number
STARTUP_PROFILE_CPU: boolean
STARTUP_PROFILE_HEAP: boolean
STARTUP_PROFILE_HEAP_INTERVAL: number
STARTUP_PROFILE_HEAP_DEPTH: number

// local directory might be a volume mount or a directory on disk (e.g. in local dev)
SESSION_RECORDING_LOCAL_DIRECTORY: string
SESSION_RECORDING_MAX_BUFFER_AGE_SECONDS: number
Expand Down

0 comments on commit 8592992

Please sign in to comment.