Skip to content

Commit

Permalink
chore: App metrics no hub no promise manager (#17581)
Browse files Browse the repository at this point in the history
  • Loading branch information
tiina303 authored Sep 25, 2023
1 parent c7d06af commit bf1a80d
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 87 deletions.
1 change: 1 addition & 0 deletions plugin-server/src/config/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ export function getDefaultConfig(): PluginsServerConfig {
KAFKA_MAX_MESSAGE_BATCH_SIZE: isDevEnv() ? 0 : 900_000,
KAFKA_FLUSH_FREQUENCY_MS: isTestEnv() ? 5 : 500,
APP_METRICS_FLUSH_FREQUENCY_MS: isTestEnv() ? 5 : 20_000,
APP_METRICS_FLUSH_MAX_QUEUE_SIZE: isTestEnv() ? 5 : 1000,
REDIS_URL: 'redis://127.0.0.1',
POSTHOG_REDIS_PASSWORD: '',
POSTHOG_REDIS_HOST: '',
Expand Down
1 change: 1 addition & 0 deletions plugin-server/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ export interface PluginsServerConfig {
KAFKA_MAX_MESSAGE_BATCH_SIZE: number
KAFKA_FLUSH_FREQUENCY_MS: number
APP_METRICS_FLUSH_FREQUENCY_MS: number
APP_METRICS_FLUSH_MAX_QUEUE_SIZE: number
BASE_DIR: string // base path for resolving local plugins
PLUGINS_RELOAD_PUBSUB_CHANNEL: string // Redis channel for reload events'
LOG_LEVEL: LogLevel
Expand Down
10 changes: 9 additions & 1 deletion plugin-server/src/utils/db/hub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import { AppMetrics } from '../../worker/ingestion/app-metrics'
import { OrganizationManager } from '../../worker/ingestion/organization-manager'
import { EventsProcessor } from '../../worker/ingestion/process-event'
import { TeamManager } from '../../worker/ingestion/team-manager'
import { isTestEnv } from '../env-utils'
import { status } from '../status'
import { createRedisPool, UUIDT } from '../utils'
import { PluginsApiKeyManager } from './../../worker/vm/extensions/helpers/api-key-manager'
Expand Down Expand Up @@ -192,9 +193,16 @@ export async function createHub(
// :TODO: This is only used on worker threads, not main
hub.eventsProcessor = new EventsProcessor(hub as Hub)

hub.appMetrics = new AppMetrics(hub as Hub)
hub.appMetrics = new AppMetrics(
kafkaProducer,
serverConfig.APP_METRICS_FLUSH_FREQUENCY_MS,
serverConfig.APP_METRICS_FLUSH_MAX_QUEUE_SIZE
)

const closeHub = async () => {
if (!isTestEnv()) {
await hub.appMetrics?.flush()
}
await Promise.allSettled([kafkaProducer.disconnect(), redisPool.drain(), hub.postgres?.end()])
await redisPool.clear()

Expand Down
85 changes: 37 additions & 48 deletions plugin-server/src/worker/ingestion/app-metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ import * as Sentry from '@sentry/node'
import { Message } from 'kafkajs'
import { DateTime } from 'luxon'
import { configure } from 'safe-stable-stringify'
import { KafkaProducerWrapper } from 'utils/db/kafka-producer-wrapper'

import { KAFKA_APP_METRICS } from '../../config/kafka-topics'
import { Hub, TeamId, TimestampFormat } from '../../types'
import { TeamId, TimestampFormat } from '../../types'
import { cleanErrorStackTrace } from '../../utils/db/error'
import { status } from '../../utils/status'
import { castTimestampOrNow, UUIDT } from '../../utils/utils'
Expand Down Expand Up @@ -61,52 +62,43 @@ const safeJSONStringify = configure({
})

export class AppMetrics {
hub: Hub
kafkaProducer: KafkaProducerWrapper
queuedData: Record<string, QueuedMetric>

flushFrequencyMs: number
maxQueueSize: number

timer: NodeJS.Timeout | null
lastFlushTime: number
// For quick access to queueSize instead of using Object.keys(queuedData).length every time
queueSize: number

constructor(hub: Hub) {
this.hub = hub
constructor(kafkaProducer: KafkaProducerWrapper, flushFrequencyMs: number, maxQueueSize: number) {
this.queuedData = {}

this.flushFrequencyMs = hub.APP_METRICS_FLUSH_FREQUENCY_MS
this.timer = null
this.kafkaProducer = kafkaProducer
this.flushFrequencyMs = flushFrequencyMs
this.maxQueueSize = maxQueueSize
this.lastFlushTime = Date.now()
this.queueSize = 0
}

async isAvailable(metric: AppMetric, errorWithContext?: ErrorWithContext): Promise<boolean> {
if (this.hub.APP_METRICS_GATHERED_FOR_ALL) {
return true
}

// :TRICKY: If postgres connection is down, we ignore this metric
try {
return await this.hub.organizationManager.hasAvailableFeature(metric.teamId, 'app_metrics')
} catch (err) {
status.warn(
'⚠️',
'Error querying whether app_metrics is available. Ignoring this metric',
metric,
errorWithContext,
err
)
return false
async queueMetric(metric: AppMetric, timestamp?: number): Promise<void> {
// We don't want to immediately flush all the metrics every time as we can internally
// aggregate them quite a bit and reduce the message count by a lot.
// However, we also don't want to wait too long, nor have the queue grow too big resulting in
// the flush taking a long time.
const now = Date.now()
if (now - this.lastFlushTime > this.flushFrequencyMs || this.queueSize > this.maxQueueSize) {
await this.flush()
}
}

async queueMetric(metric: AppMetric, timestamp?: number): Promise<void> {
timestamp = timestamp || Date.now()
timestamp = timestamp || now
const key = this._key(metric)

if (!(await this.isAvailable(metric))) {
return
}

const { successes, successesOnRetry, failures, errorUuid, errorType, errorDetails, ...metricInfo } = metric

if (!this.queuedData[key]) {
this.queueSize += 1
this.queuedData[key] = {
successes: 0,
successesOnRetry: 0,
Expand All @@ -131,33 +123,29 @@ export class AppMetrics {
this.queuedData[key].failures += failures
}
this.queuedData[key].lastTimestamp = timestamp

if (this.timer === null) {
this.timer = setTimeout(() => {
this.hub.promiseManager.trackPromise(this.flush(), 'app metrics')
this.timer = null
}, this.flushFrequencyMs)
}
}

async queueError(metric: AppMetric, errorWithContext: ErrorWithContext, timestamp?: number) {
if (await this.isAvailable(metric, errorWithContext)) {
await this.queueMetric(
{
...metric,
...this._metricErrorParameters(errorWithContext),
},
timestamp
)
}
await this.queueMetric(
{
...metric,
...this._metricErrorParameters(errorWithContext),
},
timestamp
)
}

async flush(): Promise<void> {
console.log(`Flushing app metrics`)
const startTime = Date.now()
this.lastFlushTime = startTime
if (Object.keys(this.queuedData).length === 0) {
return
}

// TODO: We might be dropping some metrics here if someone wrote between queue assigment and queuedData={} assignment
const queue = this.queuedData
this.queueSize = 0
this.queuedData = {}

const kafkaMessages: Message[] = Object.values(queue).map((value) => ({
Expand All @@ -178,10 +166,11 @@ export class AppMetrics {
}),
}))

await this.hub.kafkaProducer.queueMessage({
await this.kafkaProducer.queueMessage({
topic: KAFKA_APP_METRICS,
messages: kafkaMessages,
})
console.log(`Finisehd flushing app metrics, took ${Date.now() - startTime}ms`)
}

_metricErrorParameters(errorWithContext: ErrorWithContext): Partial<AppMetric> {
Expand Down
66 changes: 28 additions & 38 deletions plugin-server/tests/worker/ingestion/app-metrics.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,18 @@ describe('AppMetrics()', () => {
let closeHub: () => Promise<void>

beforeEach(async () => {
;[hub, closeHub] = await createHub({ APP_METRICS_FLUSH_FREQUENCY_MS: 100 })
appMetrics = new AppMetrics(hub)

jest.spyOn(hub.organizationManager, 'hasAvailableFeature').mockResolvedValue(true)
;[hub, closeHub] = await createHub({ APP_METRICS_FLUSH_FREQUENCY_MS: 100, APP_METRICS_FLUSH_MAX_QUEUE_SIZE: 5 })
appMetrics = new AppMetrics(
hub.kafkaProducer,
hub.APP_METRICS_FLUSH_FREQUENCY_MS,
hub.APP_METRICS_FLUSH_MAX_QUEUE_SIZE
)
// doesn't flush again on the next call, i.e. flust metrics were reset
jest.spyOn(hub.kafkaProducer, 'queueMessage').mockReturnValue(Promise.resolve())
})

afterEach(async () => {
jest.useRealTimers()
if (appMetrics.timer) {
clearTimeout(appMetrics.timer)
}
await closeHub()
})

Expand Down Expand Up @@ -164,44 +164,34 @@ describe('AppMetrics()', () => {
])
})

it('creates timer to flush if no timer before', async () => {
jest.spyOn(appMetrics, 'flush')
jest.useFakeTimers()

await appMetrics.queueMetric({ ...metric, successes: 1 }, timestamp)

const timer = appMetrics.timer
expect(timer).not.toBeNull()

jest.advanceTimersByTime(120)
it('flushes when time is up', async () => {
Date.now = jest.fn(() => 1600000000)
await appMetrics.flush()

expect(appMetrics.timer).toBeNull()
expect(appMetrics.flush).toHaveBeenCalled()
})
jest.spyOn(appMetrics, 'flush')
Date.now = jest.fn(() => 1600000120)

it('does not create a timer on subsequent requests', async () => {
await appMetrics.queueMetric({ ...metric, successes: 1 }, timestamp)
const originalTimer = appMetrics.timer
await appMetrics.queueMetric({ ...metric, successes: 1 }, timestamp)

expect(originalTimer).not.toBeNull()
expect(appMetrics.timer).toEqual(originalTimer)
})

it('does nothing if feature is not available', async () => {
jest.mocked(hub.organizationManager.hasAvailableFeature).mockResolvedValue(false)

expect(appMetrics.flush).toHaveBeenCalledTimes(1)
// doesn't flush again on the next call, i.e. flust metrics were reset
Date.now = jest.fn(() => 1600000130)
await appMetrics.queueMetric({ ...metric, successes: 1 }, timestamp)
expect(appMetrics.queuedData).toEqual({})
expect(appMetrics.flush).toHaveBeenCalledTimes(1)
})

it('does not query `hasAvailableFeature` if not needed', async () => {
hub.APP_METRICS_GATHERED_FOR_ALL = true

await appMetrics.queueMetric({ ...metric, successes: 1 }, timestamp)

expect(appMetrics.queuedData).not.toEqual({})
expect(hub.organizationManager.hasAvailableFeature).not.toHaveBeenCalled()
it('flushes when max queue size is hit', async () => {
jest.spyOn(appMetrics, 'flush')
// parallel could trigger multiple flushes and make the test flaky
for (let i = 0; i < 7; i++) {
await appMetrics.queueMetric({ ...metric, successes: 1, teamId: i }, timestamp)
}
expect(appMetrics.flush).toHaveBeenCalledTimes(1)
// we only count different keys, so this should not trigger a flush
for (let i = 0; i < 7; i++) {
await appMetrics.queueMetric({ ...metric, successes: 1 }, timestamp)
}
expect(appMetrics.flush).toHaveBeenCalledTimes(1)
})
})

Expand Down

0 comments on commit bf1a80d

Please sign in to comment.