From 9ad4c7b3b2fac25fbccc07507fa1539e1c918fa5 Mon Sep 17 00:00:00 2001 From: Ben White Date: Fri, 9 Aug 2024 10:55:32 +0200 Subject: [PATCH] feat(cdp): Track event for hog function state transitions (#24217) Co-authored-by: github-actions <41898282+github-actions[bot]@users.noreply.github.com> --- plugin-server/src/cdp/cdp-consumers.ts | 27 ++++++++- plugin-server/src/cdp/hog-function-manager.ts | 56 ++++++++++++------- plugin-server/src/cdp/hog-watcher.ts | 43 +++++++++----- plugin-server/src/main/pluginsServer.ts | 2 + plugin-server/src/utils/posthog.ts | 18 ++++++ .../worker/ingestion/group-type-manager.ts | 17 +----- plugin-server/tests/cdp/hog-watcher.test.ts | 18 +++++- .../ingestion/group-type-manager.test.ts | 25 ++------- 8 files changed, 136 insertions(+), 70 deletions(-) diff --git a/plugin-server/src/cdp/cdp-consumers.ts b/plugin-server/src/cdp/cdp-consumers.ts index b5fa9862dfa566..2fa9ae1eda4a42 100644 --- a/plugin-server/src/cdp/cdp-consumers.ts +++ b/plugin-server/src/cdp/cdp-consumers.ts @@ -17,6 +17,7 @@ import { addSentryBreadcrumbsEventListeners } from '../main/ingestion-queues/kaf import { runInstrumentedFunction } from '../main/utils' import { AppMetric2Type, Hub, RawClickHouseEvent, TeamId, TimestampFormat } from '../types' import { KafkaProducerWrapper } from '../utils/db/kafka-producer-wrapper' +import { captureTeamEvent } from '../utils/posthog' import { status } from '../utils/status' import { castTimestampOrNow } from '../utils/utils' import { RustyHook } from '../worker/rusty-hook' @@ -95,14 +96,38 @@ abstract class CdpConsumerBase { protected heartbeat = () => {} constructor(protected hub: Hub) { - this.hogWatcher = new HogWatcher(hub) this.hogFunctionManager = new HogFunctionManager(hub.postgres, hub) + this.hogWatcher = new HogWatcher(hub, (id, state) => { + void this.captureInternalPostHogEvent(id, 'hog function state changed', { state }) + }) this.hogExecutor = new HogExecutor(this.hogFunctionManager) const rustyHook = this.hub?.rustyHook ?? new RustyHook(this.hub) this.asyncFunctionExecutor = new AsyncFunctionExecutor(this.hub, rustyHook) this.groupsManager = new GroupsManager(this.hub) } + private async captureInternalPostHogEvent( + hogFunctionId: HogFunctionType['id'], + event: string, + properties: any = {} + ) { + const hogFunction = this.hogFunctionManager.getHogFunction(hogFunctionId) + if (!hogFunction) { + return + } + const team = await this.hub.teamManager.fetchTeam(hogFunction.team_id) + + if (!team) { + return + } + + captureTeamEvent(team, event, { + ...properties, + hog_function_id: hogFunctionId, + hog_function_url: `${this.hub.SITE_URL}/project/${team.id}/pipeline/destinations/hog-${hogFunctionId}`, + }) + } + protected async runWithHeartbeat(func: () => Promise | T): Promise { // Helper function to ensure that looping over lots of hog functions doesn't block up the thread, killing the consumer const res = await func() diff --git a/plugin-server/src/cdp/hog-function-manager.ts b/plugin-server/src/cdp/hog-function-manager.ts index 17de15147441b2..b09a35242542f5 100644 --- a/plugin-server/src/cdp/hog-function-manager.ts +++ b/plugin-server/src/cdp/hog-function-manager.ts @@ -6,8 +6,10 @@ import { PubSub } from '../utils/pubsub' import { status } from '../utils/status' import { HogFunctionType, IntegrationType } from './types' -export type HogFunctionMap = Record -export type HogFunctionCache = Record +type HogFunctionCache = { + functions: Record + teams: Record +} const HOG_FUNCTION_FIELDS = ['id', 'team_id', 'name', 'enabled', 'inputs', 'inputs_schema', 'filters', 'bytecode'] @@ -21,7 +23,10 @@ export class HogFunctionManager { constructor(private postgres: PostgresRouter, private serverConfig: PluginsServerConfig) { this.started = false this.ready = false - this.cache = {} + this.cache = { + functions: {}, + teams: {}, + } this.pubSub = new PubSub(this.serverConfig, { 'reload-hog-functions': async (message) => { @@ -66,14 +71,27 @@ export class HogFunctionManager { if (!this.ready) { throw new Error('HogFunctionManager is not ready! Run HogFunctionManager.start() before this') } - return Object.values(this.cache[teamId] || {}) + + return Object.values(this.cache.teams[teamId] || []) + .map((id) => this.cache.functions[id]) + .filter((x) => !!x) as HogFunctionType[] + } + + public getHogFunction(id: HogFunctionType['id']): HogFunctionType | undefined { + if (!this.ready) { + throw new Error('HogFunctionManager is not ready! Run HogFunctionManager.start() before this') + } + return this.cache.functions[id] } public getTeamHogFunction(teamId: Team['id'], hogFunctionId: HogFunctionType['id']): HogFunctionType | undefined { if (!this.ready) { throw new Error('HogFunctionManager is not ready! Run HogFunctionManager.start() before this') } - return this.cache[teamId]?.[hogFunctionId] + const fn = this.cache.functions[hogFunctionId] + if (fn?.team_id === teamId) { + return fn + } } public teamHasHogFunctions(teamId: Team['id']): boolean { @@ -96,13 +114,15 @@ export class HogFunctionManager { await this.enrichWithIntegrations(items) - const cache: HogFunctionCache = {} - for (const item of items) { - if (!cache[item.team_id]) { - cache[item.team_id] = {} - } + const cache: HogFunctionCache = { + functions: {}, + teams: {}, + } - cache[item.team_id][item.id] = item + for (const item of items) { + cache.functions[item.id] = item + cache.teams[item.team_id] = cache.teams[item.team_id] || [] + cache.teams[item.team_id]!.push(item.id) } this.cache = cache @@ -125,17 +145,15 @@ export class HogFunctionManager { await this.enrichWithIntegrations(items) - if (!this.cache[teamId]) { - this.cache[teamId] = {} - } - for (const id of ids) { - // First of all delete the item from the cache - this covers the case where the item was deleted or disabled - delete this.cache[teamId][id] + delete this.cache.functions[id] + this.cache.teams[teamId] = this.cache.teams[teamId]?.filter((x) => x !== id) } for (const item of items) { - this.cache[teamId][item.id] = item + this.cache.functions[item.id] = item + this.cache.teams[teamId] = this.cache.teams[teamId] || [] + this.cache.teams[teamId]!.push(item.id) } } @@ -157,7 +175,7 @@ export class HogFunctionManager { public reloadIntegrations(teamId: Team['id'], ids: IntegrationType['id'][]): Promise { // We need to find all hog functions that depend on these integrations and re-enrich them - const items: HogFunctionType[] = Object.values(this.cache[teamId] || {}) + const items = this.getTeamHogFunctions(teamId) const itemsToReload = items.filter((item) => ids.some((id) => item.depends_on_integration_ids?.has(id))) return this.enrichWithIntegrations(itemsToReload) diff --git a/plugin-server/src/cdp/hog-watcher.ts b/plugin-server/src/cdp/hog-watcher.ts index 3c933b69f5347d..04b647613a0a0b 100644 --- a/plugin-server/src/cdp/hog-watcher.ts +++ b/plugin-server/src/cdp/hog-watcher.ts @@ -91,7 +91,7 @@ type HogWatcherRedisClient = Omit & } export class HogWatcher { - constructor(private hub: Hub) {} + constructor(private hub: Hub, private onStateChange: (id: HogFunctionType['id'], state: HogWatcherState) => void) {} private rateLimitArgs(id: HogFunctionType['id'], cost: number) { const nowSeconds = Math.round(now() / 1000) @@ -214,6 +214,8 @@ export class HogWatcher { await pipeline.exec() }) + + this.onStateChange(id, state) } public async observeResults(results: HogFunctionInvocationResult[]): Promise { @@ -278,16 +280,18 @@ export class HogWatcher { return pipeline.exec() }) - const functionsDisabled = disabledFunctionIds.filter((_, index) => (results ? results[index][1] : false)) + const functionsTempDisabled = disabledFunctionIds.filter((_, index) => + results ? results[index][1] : false + ) - if (!functionsDisabled.length) { + if (!functionsTempDisabled.length) { return } // We store the history as a zset - we can then use it to determine if we should disable indefinitely const historyResults = await this.runRedis(async (client) => { const pipeline = client.pipeline() - functionsDisabled.forEach((id) => { + functionsTempDisabled.forEach((id) => { const key = `${REDIS_KEY_DISABLED_HISTORY}/${id}` pipeline.zadd(key, now(), new UUIDT().toString()) pipeline.zrange(key, 0, -1) @@ -297,24 +301,33 @@ export class HogWatcher { return await pipeline.exec() }) - const functionsToDisablePermanently = functionsDisabled.filter((_, index) => { + const functionsToDisablePermanently = functionsTempDisabled.filter((_, index) => { const history = historyResults ? historyResults[index * 3 + 1][1] : [] return history.length >= this.hub.CDP_WATCHER_DISABLED_TEMPORARY_MAX_COUNT }) - if (!functionsToDisablePermanently.length) { - return - } + if (functionsToDisablePermanently.length) { + await this.runRedis(async (client) => { + const pipeline = client.pipeline() + functionsToDisablePermanently.forEach((id) => { + const key = `${REDIS_KEY_DISABLED}/${id}` + pipeline.set(key, '1') + pipeline.del(`${REDIS_KEY_DISABLED_HISTORY}/${id}`) + }) - await this.runRedis(async (client) => { - const pipeline = client.pipeline() - functionsToDisablePermanently.forEach((id) => { - const key = `${REDIS_KEY_DISABLED}/${id}` - pipeline.set(key, '1') - pipeline.del(`${REDIS_KEY_DISABLED_HISTORY}/${id}`) + return await pipeline.exec() }) + } - return await pipeline.exec() + // Finally track the results + functionsToDisablePermanently.forEach((id) => { + this.onStateChange(id, HogWatcherState.disabledIndefinitely) + }) + + functionsTempDisabled.forEach((id) => { + if (!functionsToDisablePermanently.includes(id)) { + this.onStateChange(id, HogWatcherState.disabledForPeriod) + } }) } } diff --git a/plugin-server/src/main/pluginsServer.ts b/plugin-server/src/main/pluginsServer.ts index d12a2f4362fe17..0bcbf0e63597f3 100644 --- a/plugin-server/src/main/pluginsServer.ts +++ b/plugin-server/src/main/pluginsServer.ts @@ -17,6 +17,7 @@ import { Hub, PluginServerCapabilities, PluginsServerConfig } from '../types' import { createHub, createKafkaClient, createKafkaProducerWrapper } from '../utils/db/hub' import { PostgresRouter } from '../utils/db/postgres' import { cancelAllScheduledJobs } from '../utils/node-schedule' +import { posthog } from '../utils/posthog' import { PubSub } from '../utils/pubsub' import { status } from '../utils/status' import { createRedisClient, delay } from '../utils/utils' @@ -165,6 +166,7 @@ export async function startPluginsServer( stopSessionRecordingBlobOverflowConsumer?.(), schedulerTasksConsumer?.disconnect(), ...shutdownCallbacks.map((cb) => cb()), + posthog.shutdownAsync(), ]) if (piscina) { diff --git a/plugin-server/src/utils/posthog.ts b/plugin-server/src/utils/posthog.ts index b63604628eb2f4..2de721b06ef4c5 100644 --- a/plugin-server/src/utils/posthog.ts +++ b/plugin-server/src/utils/posthog.ts @@ -1,5 +1,7 @@ import { PostHog } from 'posthog-node' +import { Team } from '../types' + export const posthog = new PostHog('sTMFPsFhdP1Ssg', { host: 'https://us.i.posthog.com', }) @@ -7,3 +9,19 @@ export const posthog = new PostHog('sTMFPsFhdP1Ssg', { if (process.env.NODE_ENV === 'test') { posthog.disable() } + +export const captureTeamEvent = (team: Team, event: string, properties: Record = {}): void => { + posthog.capture({ + distinctId: team.uuid, + event, + properties: { + team: team.uuid, + ...properties, + }, + groups: { + project: team.uuid, + organization: team.organization_id, + instance: process.env.SITE_URL ?? 'unknown', + }, + }) +} diff --git a/plugin-server/src/worker/ingestion/group-type-manager.ts b/plugin-server/src/worker/ingestion/group-type-manager.ts index 21940b22d4d512..7263a2429cccaf 100644 --- a/plugin-server/src/worker/ingestion/group-type-manager.ts +++ b/plugin-server/src/worker/ingestion/group-type-manager.ts @@ -1,7 +1,7 @@ import { GroupTypeIndex, GroupTypeToColumnIndex, Team, TeamId } from '../../types' import { PostgresRouter, PostgresUse } from '../../utils/db/postgres' import { timeoutGuard } from '../../utils/db/utils' -import { posthog } from '../../utils/posthog' +import { captureTeamEvent } from '../../utils/posthog' import { getByAge } from '../../utils/utils' import { TeamManager } from './team-manager' @@ -110,19 +110,6 @@ export class GroupTypeManager { return } - posthog.capture({ - distinctId: 'plugin-server', - event: 'group type ingested', - properties: { - team: team.uuid, - groupType, - groupTypeIndex, - }, - groups: { - project: team.uuid, - organization: team.organization_id, - instance: this.instanceSiteUrl, - }, - }) + captureTeamEvent(team, 'group type ingested', { groupType, groupTypeIndex }) } } diff --git a/plugin-server/tests/cdp/hog-watcher.test.ts b/plugin-server/tests/cdp/hog-watcher.test.ts index fddbb0fc7e6774..2980a789f36837 100644 --- a/plugin-server/tests/cdp/hog-watcher.test.ts +++ b/plugin-server/tests/cdp/hog-watcher.test.ts @@ -43,16 +43,18 @@ describe('HogWatcher', () => { let hub: Hub let closeHub: () => Promise let watcher: HogWatcher + let mockStateChangeCallback: jest.Mock beforeEach(async () => { ;[hub, closeHub] = await createHub() now = 1720000000000 mockNow.mockReturnValue(now) + mockStateChangeCallback = jest.fn() await deleteKeysWithPrefix(hub.redisPool, BASE_REDIS_KEY) - watcher = new HogWatcher(hub) + watcher = new HogWatcher(hub, mockStateChangeCallback) }) const advanceTime = (ms: number) => { @@ -174,6 +176,9 @@ describe('HogWatcher', () => { await watcher.observeResults(badResults) + expect(mockStateChangeCallback).toHaveBeenCalledTimes(1) + expect(mockStateChangeCallback).toHaveBeenCalledWith('id1', HogWatcherState.disabledForPeriod) + expect(await watcher.getState('id1')).toMatchInlineSnapshot(` Object { "rating": 0, @@ -204,6 +209,7 @@ describe('HogWatcher', () => { "tokens": 10000, } `) + expect(mockStateChangeCallback).toHaveBeenCalledWith('id1', HogWatcherState.healthy) }) it('should force degraded', async () => { await watcher.forceStateChange('id1', HogWatcherState.degraded) @@ -214,6 +220,7 @@ describe('HogWatcher', () => { "tokens": 8000, } `) + expect(mockStateChangeCallback).toHaveBeenCalledWith('id1', HogWatcherState.degraded) }) it('should force disabledForPeriod', async () => { await watcher.forceStateChange('id1', HogWatcherState.disabledForPeriod) @@ -224,6 +231,7 @@ describe('HogWatcher', () => { "tokens": 0, } `) + expect(mockStateChangeCallback).toHaveBeenCalledWith('id1', HogWatcherState.disabledForPeriod) }) it('should force disabledIndefinitely', async () => { await watcher.forceStateChange('id1', HogWatcherState.disabledIndefinitely) @@ -234,6 +242,7 @@ describe('HogWatcher', () => { "tokens": 0, } `) + expect(mockStateChangeCallback).toHaveBeenCalledWith('id1', HogWatcherState.disabledIndefinitely) }) }) @@ -258,10 +267,17 @@ describe('HogWatcher', () => { expect((await watcher.getState('id1')).state).toEqual(HogWatcherState.degraded) } + expect(mockStateChangeCallback).toHaveBeenCalledTimes(2) + expect(mockStateChangeCallback.mock.calls[0]).toEqual(['id1', HogWatcherState.disabledForPeriod]) + expect(mockStateChangeCallback.mock.calls[1]).toEqual(['id1', HogWatcherState.disabledForPeriod]) + await watcher.observeResults([createResult({ id: 'id1', error: 'error!' })]) expect((await watcher.getState('id1')).state).toEqual(HogWatcherState.disabledIndefinitely) await reallyAdvanceTime(1000) expect((await watcher.getState('id1')).state).toEqual(HogWatcherState.disabledIndefinitely) + + expect(mockStateChangeCallback).toHaveBeenCalledTimes(3) + expect(mockStateChangeCallback.mock.calls[2]).toEqual(['id1', HogWatcherState.disabledIndefinitely]) }) }) }) diff --git a/plugin-server/tests/worker/ingestion/group-type-manager.test.ts b/plugin-server/tests/worker/ingestion/group-type-manager.test.ts index 46f2ada3086fc1..3999d5a78cb935 100644 --- a/plugin-server/tests/worker/ingestion/group-type-manager.test.ts +++ b/plugin-server/tests/worker/ingestion/group-type-manager.test.ts @@ -1,15 +1,12 @@ import { Hub } from '../../../src/types' import { createHub } from '../../../src/utils/db/hub' -import { posthog } from '../../../src/utils/posthog' +import { captureTeamEvent } from '../../../src/utils/posthog' import { GroupTypeManager } from '../../../src/worker/ingestion/group-type-manager' import { resetTestDatabase } from '../../helpers/sql' jest.mock('../../../src/utils/status') jest.mock('../../../src/utils/posthog', () => ({ - posthog: { - identify: jest.fn(), - capture: jest.fn(), - }, + captureTeamEvent: jest.fn(), })) describe('GroupTypeManager()', () => { @@ -102,7 +99,7 @@ describe('GroupTypeManager()', () => { expect(hub.db.postgres.query).toHaveBeenCalledTimes(1) expect(groupTypeManager.insertGroupType).toHaveBeenCalledTimes(0) - expect(posthog.capture).not.toHaveBeenCalled() + expect(captureTeamEvent).not.toHaveBeenCalled() }) it('inserts value if it does not exist yet at next index, resets cache', async () => { @@ -117,19 +114,9 @@ describe('GroupTypeManager()', () => { expect(hub.db.postgres.query).toHaveBeenCalledTimes(3) // FETCH + INSERT + Team lookup const team = await hub.db.fetchTeam(2) - expect(posthog.capture).toHaveBeenCalledWith({ - distinctId: 'plugin-server', - event: 'group type ingested', - properties: { - team: team!.uuid, - groupType: 'second', - groupTypeIndex: 1, - }, - groups: { - project: team!.uuid, - organization: team!.organization_id, - instance: 'unknown', - }, + expect(captureTeamEvent).toHaveBeenCalledWith(team, 'group type ingested', { + groupType: 'second', + groupTypeIndex: 1, }) expect(await groupTypeManager.fetchGroupTypeIndex(2, 'third')).toEqual(2)