diff --git a/plugin-server/src/config/config.ts b/plugin-server/src/config/config.ts index afca924b3cb1f..a6d9a373b4696 100644 --- a/plugin-server/src/config/config.ts +++ b/plugin-server/src/config/config.ts @@ -62,6 +62,7 @@ export function getDefaultConfig(): PluginsServerConfig { KAFKA_MAX_MESSAGE_BATCH_SIZE: isDevEnv() ? 0 : 900_000, KAFKA_FLUSH_FREQUENCY_MS: isTestEnv() ? 5 : 500, APP_METRICS_FLUSH_FREQUENCY_MS: isTestEnv() ? 5 : 20_000, + APP_METRICS_FLUSH_MAX_QUEUE_SIZE: isTestEnv() ? 5 : 1000, REDIS_URL: 'redis://127.0.0.1', POSTHOG_REDIS_PASSWORD: '', POSTHOG_REDIS_HOST: '', diff --git a/plugin-server/src/types.ts b/plugin-server/src/types.ts index f6c2c9077c5ac..b9bfe64dce03d 100644 --- a/plugin-server/src/types.ts +++ b/plugin-server/src/types.ts @@ -145,6 +145,7 @@ export interface PluginsServerConfig { KAFKA_MAX_MESSAGE_BATCH_SIZE: number KAFKA_FLUSH_FREQUENCY_MS: number APP_METRICS_FLUSH_FREQUENCY_MS: number + APP_METRICS_FLUSH_MAX_QUEUE_SIZE: number BASE_DIR: string // base path for resolving local plugins PLUGINS_RELOAD_PUBSUB_CHANNEL: string // Redis channel for reload events' LOG_LEVEL: LogLevel diff --git a/plugin-server/src/utils/db/hub.ts b/plugin-server/src/utils/db/hub.ts index aeb5c26c95cfa..2ae134ae1fb6a 100644 --- a/plugin-server/src/utils/db/hub.ts +++ b/plugin-server/src/utils/db/hub.ts @@ -28,6 +28,7 @@ import { AppMetrics } from '../../worker/ingestion/app-metrics' import { OrganizationManager } from '../../worker/ingestion/organization-manager' import { EventsProcessor } from '../../worker/ingestion/process-event' import { TeamManager } from '../../worker/ingestion/team-manager' +import { isTestEnv } from '../env-utils' import { status } from '../status' import { createRedisPool, UUIDT } from '../utils' import { PluginsApiKeyManager } from './../../worker/vm/extensions/helpers/api-key-manager' @@ -192,9 +193,16 @@ export async function createHub( // :TODO: This is only used on worker threads, not main hub.eventsProcessor = new EventsProcessor(hub as Hub) - hub.appMetrics = new AppMetrics(hub as Hub) + hub.appMetrics = new AppMetrics( + kafkaProducer, + serverConfig.APP_METRICS_FLUSH_FREQUENCY_MS, + serverConfig.APP_METRICS_FLUSH_MAX_QUEUE_SIZE + ) const closeHub = async () => { + if (!isTestEnv()) { + await hub.appMetrics?.flush() + } await Promise.allSettled([kafkaProducer.disconnect(), redisPool.drain(), hub.postgres?.end()]) await redisPool.clear() diff --git a/plugin-server/src/worker/ingestion/app-metrics.ts b/plugin-server/src/worker/ingestion/app-metrics.ts index a52345df75a31..333104e967d4a 100644 --- a/plugin-server/src/worker/ingestion/app-metrics.ts +++ b/plugin-server/src/worker/ingestion/app-metrics.ts @@ -2,9 +2,10 @@ import * as Sentry from '@sentry/node' import { Message } from 'kafkajs' import { DateTime } from 'luxon' import { configure } from 'safe-stable-stringify' +import { KafkaProducerWrapper } from 'utils/db/kafka-producer-wrapper' import { KAFKA_APP_METRICS } from '../../config/kafka-topics' -import { Hub, TeamId, TimestampFormat } from '../../types' +import { TeamId, TimestampFormat } from '../../types' import { cleanErrorStackTrace } from '../../utils/db/error' import { status } from '../../utils/status' import { castTimestampOrNow, UUIDT } from '../../utils/utils' @@ -61,52 +62,43 @@ const safeJSONStringify = configure({ }) export class AppMetrics { - hub: Hub + kafkaProducer: KafkaProducerWrapper queuedData: Record flushFrequencyMs: number + maxQueueSize: number - timer: NodeJS.Timeout | null + lastFlushTime: number + // For quick access to queueSize instead of using Object.keys(queuedData).length every time + queueSize: number - constructor(hub: Hub) { - this.hub = hub + constructor(kafkaProducer: KafkaProducerWrapper, flushFrequencyMs: number, maxQueueSize: number) { this.queuedData = {} - this.flushFrequencyMs = hub.APP_METRICS_FLUSH_FREQUENCY_MS - this.timer = null + this.kafkaProducer = kafkaProducer + this.flushFrequencyMs = flushFrequencyMs + this.maxQueueSize = maxQueueSize + this.lastFlushTime = Date.now() + this.queueSize = 0 } - async isAvailable(metric: AppMetric, errorWithContext?: ErrorWithContext): Promise { - if (this.hub.APP_METRICS_GATHERED_FOR_ALL) { - return true - } - - // :TRICKY: If postgres connection is down, we ignore this metric - try { - return await this.hub.organizationManager.hasAvailableFeature(metric.teamId, 'app_metrics') - } catch (err) { - status.warn( - '⚠️', - 'Error querying whether app_metrics is available. Ignoring this metric', - metric, - errorWithContext, - err - ) - return false + async queueMetric(metric: AppMetric, timestamp?: number): Promise { + // We don't want to immediately flush all the metrics every time as we can internally + // aggregate them quite a bit and reduce the message count by a lot. + // However, we also don't want to wait too long, nor have the queue grow too big resulting in + // the flush taking a long time. + const now = Date.now() + if (now - this.lastFlushTime > this.flushFrequencyMs || this.queueSize > this.maxQueueSize) { + await this.flush() } - } - async queueMetric(metric: AppMetric, timestamp?: number): Promise { - timestamp = timestamp || Date.now() + timestamp = timestamp || now const key = this._key(metric) - if (!(await this.isAvailable(metric))) { - return - } - const { successes, successesOnRetry, failures, errorUuid, errorType, errorDetails, ...metricInfo } = metric if (!this.queuedData[key]) { + this.queueSize += 1 this.queuedData[key] = { successes: 0, successesOnRetry: 0, @@ -131,33 +123,29 @@ export class AppMetrics { this.queuedData[key].failures += failures } this.queuedData[key].lastTimestamp = timestamp - - if (this.timer === null) { - this.timer = setTimeout(() => { - this.hub.promiseManager.trackPromise(this.flush(), 'app metrics') - this.timer = null - }, this.flushFrequencyMs) - } } async queueError(metric: AppMetric, errorWithContext: ErrorWithContext, timestamp?: number) { - if (await this.isAvailable(metric, errorWithContext)) { - await this.queueMetric( - { - ...metric, - ...this._metricErrorParameters(errorWithContext), - }, - timestamp - ) - } + await this.queueMetric( + { + ...metric, + ...this._metricErrorParameters(errorWithContext), + }, + timestamp + ) } async flush(): Promise { + console.log(`Flushing app metrics`) + const startTime = Date.now() + this.lastFlushTime = startTime if (Object.keys(this.queuedData).length === 0) { return } + // TODO: We might be dropping some metrics here if someone wrote between queue assigment and queuedData={} assignment const queue = this.queuedData + this.queueSize = 0 this.queuedData = {} const kafkaMessages: Message[] = Object.values(queue).map((value) => ({ @@ -178,10 +166,11 @@ export class AppMetrics { }), })) - await this.hub.kafkaProducer.queueMessage({ + await this.kafkaProducer.queueMessage({ topic: KAFKA_APP_METRICS, messages: kafkaMessages, }) + console.log(`Finisehd flushing app metrics, took ${Date.now() - startTime}ms`) } _metricErrorParameters(errorWithContext: ErrorWithContext): Partial { diff --git a/plugin-server/tests/worker/ingestion/app-metrics.test.ts b/plugin-server/tests/worker/ingestion/app-metrics.test.ts index c46f07998f460..43a2b07364208 100644 --- a/plugin-server/tests/worker/ingestion/app-metrics.test.ts +++ b/plugin-server/tests/worker/ingestion/app-metrics.test.ts @@ -23,18 +23,18 @@ describe('AppMetrics()', () => { let closeHub: () => Promise beforeEach(async () => { - ;[hub, closeHub] = await createHub({ APP_METRICS_FLUSH_FREQUENCY_MS: 100 }) - appMetrics = new AppMetrics(hub) - - jest.spyOn(hub.organizationManager, 'hasAvailableFeature').mockResolvedValue(true) + ;[hub, closeHub] = await createHub({ APP_METRICS_FLUSH_FREQUENCY_MS: 100, APP_METRICS_FLUSH_MAX_QUEUE_SIZE: 5 }) + appMetrics = new AppMetrics( + hub.kafkaProducer, + hub.APP_METRICS_FLUSH_FREQUENCY_MS, + hub.APP_METRICS_FLUSH_MAX_QUEUE_SIZE + ) + // doesn't flush again on the next call, i.e. flust metrics were reset jest.spyOn(hub.kafkaProducer, 'queueMessage').mockReturnValue(Promise.resolve()) }) afterEach(async () => { jest.useRealTimers() - if (appMetrics.timer) { - clearTimeout(appMetrics.timer) - } await closeHub() }) @@ -164,44 +164,34 @@ describe('AppMetrics()', () => { ]) }) - it('creates timer to flush if no timer before', async () => { - jest.spyOn(appMetrics, 'flush') - jest.useFakeTimers() - - await appMetrics.queueMetric({ ...metric, successes: 1 }, timestamp) - - const timer = appMetrics.timer - expect(timer).not.toBeNull() - - jest.advanceTimersByTime(120) + it('flushes when time is up', async () => { + Date.now = jest.fn(() => 1600000000) + await appMetrics.flush() - expect(appMetrics.timer).toBeNull() - expect(appMetrics.flush).toHaveBeenCalled() - }) + jest.spyOn(appMetrics, 'flush') + Date.now = jest.fn(() => 1600000120) - it('does not create a timer on subsequent requests', async () => { - await appMetrics.queueMetric({ ...metric, successes: 1 }, timestamp) - const originalTimer = appMetrics.timer await appMetrics.queueMetric({ ...metric, successes: 1 }, timestamp) - expect(originalTimer).not.toBeNull() - expect(appMetrics.timer).toEqual(originalTimer) - }) - - it('does nothing if feature is not available', async () => { - jest.mocked(hub.organizationManager.hasAvailableFeature).mockResolvedValue(false) - + expect(appMetrics.flush).toHaveBeenCalledTimes(1) + // doesn't flush again on the next call, i.e. flust metrics were reset + Date.now = jest.fn(() => 1600000130) await appMetrics.queueMetric({ ...metric, successes: 1 }, timestamp) - expect(appMetrics.queuedData).toEqual({}) + expect(appMetrics.flush).toHaveBeenCalledTimes(1) }) - it('does not query `hasAvailableFeature` if not needed', async () => { - hub.APP_METRICS_GATHERED_FOR_ALL = true - - await appMetrics.queueMetric({ ...metric, successes: 1 }, timestamp) - - expect(appMetrics.queuedData).not.toEqual({}) - expect(hub.organizationManager.hasAvailableFeature).not.toHaveBeenCalled() + it('flushes when max queue size is hit', async () => { + jest.spyOn(appMetrics, 'flush') + // parallel could trigger multiple flushes and make the test flaky + for (let i = 0; i < 7; i++) { + await appMetrics.queueMetric({ ...metric, successes: 1, teamId: i }, timestamp) + } + expect(appMetrics.flush).toHaveBeenCalledTimes(1) + // we only count different keys, so this should not trigger a flush + for (let i = 0; i < 7; i++) { + await appMetrics.queueMetric({ ...metric, successes: 1 }, timestamp) + } + expect(appMetrics.flush).toHaveBeenCalledTimes(1) }) })