Skip to content

Commit

Permalink
feat(cdp): Track event for hog function state transitions (#24217)
Browse files Browse the repository at this point in the history
Co-authored-by: github-actions <41898282+github-actions[bot]@users.noreply.github.com>
  • Loading branch information
2 people authored and thmsobrmlr committed Aug 9, 2024
1 parent 73217db commit 9ad4c7b
Show file tree
Hide file tree
Showing 8 changed files with 136 additions and 70 deletions.
27 changes: 26 additions & 1 deletion plugin-server/src/cdp/cdp-consumers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import { addSentryBreadcrumbsEventListeners } from '../main/ingestion-queues/kaf
import { runInstrumentedFunction } from '../main/utils'
import { AppMetric2Type, Hub, RawClickHouseEvent, TeamId, TimestampFormat } from '../types'
import { KafkaProducerWrapper } from '../utils/db/kafka-producer-wrapper'
import { captureTeamEvent } from '../utils/posthog'
import { status } from '../utils/status'
import { castTimestampOrNow } from '../utils/utils'
import { RustyHook } from '../worker/rusty-hook'
Expand Down Expand Up @@ -95,14 +96,38 @@ abstract class CdpConsumerBase {
protected heartbeat = () => {}

constructor(protected hub: Hub) {
this.hogWatcher = new HogWatcher(hub)
this.hogFunctionManager = new HogFunctionManager(hub.postgres, hub)
this.hogWatcher = new HogWatcher(hub, (id, state) => {
void this.captureInternalPostHogEvent(id, 'hog function state changed', { state })
})
this.hogExecutor = new HogExecutor(this.hogFunctionManager)
const rustyHook = this.hub?.rustyHook ?? new RustyHook(this.hub)
this.asyncFunctionExecutor = new AsyncFunctionExecutor(this.hub, rustyHook)
this.groupsManager = new GroupsManager(this.hub)
}

private async captureInternalPostHogEvent(
hogFunctionId: HogFunctionType['id'],
event: string,
properties: any = {}
) {
const hogFunction = this.hogFunctionManager.getHogFunction(hogFunctionId)
if (!hogFunction) {
return
}
const team = await this.hub.teamManager.fetchTeam(hogFunction.team_id)

if (!team) {
return
}

captureTeamEvent(team, event, {
...properties,
hog_function_id: hogFunctionId,
hog_function_url: `${this.hub.SITE_URL}/project/${team.id}/pipeline/destinations/hog-${hogFunctionId}`,
})
}

protected async runWithHeartbeat<T>(func: () => Promise<T> | T): Promise<T> {
// Helper function to ensure that looping over lots of hog functions doesn't block up the thread, killing the consumer
const res = await func()
Expand Down
56 changes: 37 additions & 19 deletions plugin-server/src/cdp/hog-function-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ import { PubSub } from '../utils/pubsub'
import { status } from '../utils/status'
import { HogFunctionType, IntegrationType } from './types'

export type HogFunctionMap = Record<HogFunctionType['id'], HogFunctionType>
export type HogFunctionCache = Record<Team['id'], HogFunctionMap>
type HogFunctionCache = {
functions: Record<HogFunctionType['id'], HogFunctionType | undefined>
teams: Record<Team['id'], HogFunctionType['id'][] | undefined>
}

const HOG_FUNCTION_FIELDS = ['id', 'team_id', 'name', 'enabled', 'inputs', 'inputs_schema', 'filters', 'bytecode']

Expand All @@ -21,7 +23,10 @@ export class HogFunctionManager {
constructor(private postgres: PostgresRouter, private serverConfig: PluginsServerConfig) {
this.started = false
this.ready = false
this.cache = {}
this.cache = {
functions: {},
teams: {},
}

this.pubSub = new PubSub(this.serverConfig, {
'reload-hog-functions': async (message) => {
Expand Down Expand Up @@ -66,14 +71,27 @@ export class HogFunctionManager {
if (!this.ready) {
throw new Error('HogFunctionManager is not ready! Run HogFunctionManager.start() before this')
}
return Object.values(this.cache[teamId] || {})

return Object.values(this.cache.teams[teamId] || [])
.map((id) => this.cache.functions[id])
.filter((x) => !!x) as HogFunctionType[]
}

public getHogFunction(id: HogFunctionType['id']): HogFunctionType | undefined {
if (!this.ready) {
throw new Error('HogFunctionManager is not ready! Run HogFunctionManager.start() before this')
}
return this.cache.functions[id]
}

public getTeamHogFunction(teamId: Team['id'], hogFunctionId: HogFunctionType['id']): HogFunctionType | undefined {
if (!this.ready) {
throw new Error('HogFunctionManager is not ready! Run HogFunctionManager.start() before this')
}
return this.cache[teamId]?.[hogFunctionId]
const fn = this.cache.functions[hogFunctionId]
if (fn?.team_id === teamId) {
return fn
}
}

public teamHasHogFunctions(teamId: Team['id']): boolean {
Expand All @@ -96,13 +114,15 @@ export class HogFunctionManager {

await this.enrichWithIntegrations(items)

const cache: HogFunctionCache = {}
for (const item of items) {
if (!cache[item.team_id]) {
cache[item.team_id] = {}
}
const cache: HogFunctionCache = {
functions: {},
teams: {},
}

cache[item.team_id][item.id] = item
for (const item of items) {
cache.functions[item.id] = item
cache.teams[item.team_id] = cache.teams[item.team_id] || []
cache.teams[item.team_id]!.push(item.id)
}

this.cache = cache
Expand All @@ -125,17 +145,15 @@ export class HogFunctionManager {

await this.enrichWithIntegrations(items)

if (!this.cache[teamId]) {
this.cache[teamId] = {}
}

for (const id of ids) {
// First of all delete the item from the cache - this covers the case where the item was deleted or disabled
delete this.cache[teamId][id]
delete this.cache.functions[id]
this.cache.teams[teamId] = this.cache.teams[teamId]?.filter((x) => x !== id)
}

for (const item of items) {
this.cache[teamId][item.id] = item
this.cache.functions[item.id] = item
this.cache.teams[teamId] = this.cache.teams[teamId] || []
this.cache.teams[teamId]!.push(item.id)
}
}

Expand All @@ -157,7 +175,7 @@ export class HogFunctionManager {
public reloadIntegrations(teamId: Team['id'], ids: IntegrationType['id'][]): Promise<void> {
// We need to find all hog functions that depend on these integrations and re-enrich them

const items: HogFunctionType[] = Object.values(this.cache[teamId] || {})
const items = this.getTeamHogFunctions(teamId)
const itemsToReload = items.filter((item) => ids.some((id) => item.depends_on_integration_ids?.has(id)))

return this.enrichWithIntegrations(itemsToReload)
Expand Down
43 changes: 28 additions & 15 deletions plugin-server/src/cdp/hog-watcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ type HogWatcherRedisClient = Omit<Redis, 'pipeline'> &
}

export class HogWatcher {
constructor(private hub: Hub) {}
constructor(private hub: Hub, private onStateChange: (id: HogFunctionType['id'], state: HogWatcherState) => void) {}

private rateLimitArgs(id: HogFunctionType['id'], cost: number) {
const nowSeconds = Math.round(now() / 1000)
Expand Down Expand Up @@ -214,6 +214,8 @@ export class HogWatcher {

await pipeline.exec()
})

this.onStateChange(id, state)
}

public async observeResults(results: HogFunctionInvocationResult[]): Promise<void> {
Expand Down Expand Up @@ -278,16 +280,18 @@ export class HogWatcher {
return pipeline.exec()
})

const functionsDisabled = disabledFunctionIds.filter((_, index) => (results ? results[index][1] : false))
const functionsTempDisabled = disabledFunctionIds.filter((_, index) =>
results ? results[index][1] : false
)

if (!functionsDisabled.length) {
if (!functionsTempDisabled.length) {
return
}

// We store the history as a zset - we can then use it to determine if we should disable indefinitely
const historyResults = await this.runRedis(async (client) => {
const pipeline = client.pipeline()
functionsDisabled.forEach((id) => {
functionsTempDisabled.forEach((id) => {
const key = `${REDIS_KEY_DISABLED_HISTORY}/${id}`
pipeline.zadd(key, now(), new UUIDT().toString())
pipeline.zrange(key, 0, -1)
Expand All @@ -297,24 +301,33 @@ export class HogWatcher {
return await pipeline.exec()
})

const functionsToDisablePermanently = functionsDisabled.filter((_, index) => {
const functionsToDisablePermanently = functionsTempDisabled.filter((_, index) => {
const history = historyResults ? historyResults[index * 3 + 1][1] : []
return history.length >= this.hub.CDP_WATCHER_DISABLED_TEMPORARY_MAX_COUNT
})

if (!functionsToDisablePermanently.length) {
return
}
if (functionsToDisablePermanently.length) {
await this.runRedis(async (client) => {
const pipeline = client.pipeline()
functionsToDisablePermanently.forEach((id) => {
const key = `${REDIS_KEY_DISABLED}/${id}`
pipeline.set(key, '1')
pipeline.del(`${REDIS_KEY_DISABLED_HISTORY}/${id}`)
})

await this.runRedis(async (client) => {
const pipeline = client.pipeline()
functionsToDisablePermanently.forEach((id) => {
const key = `${REDIS_KEY_DISABLED}/${id}`
pipeline.set(key, '1')
pipeline.del(`${REDIS_KEY_DISABLED_HISTORY}/${id}`)
return await pipeline.exec()
})
}

return await pipeline.exec()
// Finally track the results
functionsToDisablePermanently.forEach((id) => {
this.onStateChange(id, HogWatcherState.disabledIndefinitely)
})

functionsTempDisabled.forEach((id) => {
if (!functionsToDisablePermanently.includes(id)) {
this.onStateChange(id, HogWatcherState.disabledForPeriod)
}
})
}
}
Expand Down
2 changes: 2 additions & 0 deletions plugin-server/src/main/pluginsServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import { Hub, PluginServerCapabilities, PluginsServerConfig } from '../types'
import { createHub, createKafkaClient, createKafkaProducerWrapper } from '../utils/db/hub'
import { PostgresRouter } from '../utils/db/postgres'
import { cancelAllScheduledJobs } from '../utils/node-schedule'
import { posthog } from '../utils/posthog'
import { PubSub } from '../utils/pubsub'
import { status } from '../utils/status'
import { createRedisClient, delay } from '../utils/utils'
Expand Down Expand Up @@ -165,6 +166,7 @@ export async function startPluginsServer(
stopSessionRecordingBlobOverflowConsumer?.(),
schedulerTasksConsumer?.disconnect(),
...shutdownCallbacks.map((cb) => cb()),
posthog.shutdownAsync(),
])

if (piscina) {
Expand Down
18 changes: 18 additions & 0 deletions plugin-server/src/utils/posthog.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,27 @@
import { PostHog } from 'posthog-node'

import { Team } from '../types'

export const posthog = new PostHog('sTMFPsFhdP1Ssg', {
host: 'https://us.i.posthog.com',
})

if (process.env.NODE_ENV === 'test') {
posthog.disable()
}

export const captureTeamEvent = (team: Team, event: string, properties: Record<string, any> = {}): void => {
posthog.capture({
distinctId: team.uuid,
event,
properties: {
team: team.uuid,
...properties,
},
groups: {
project: team.uuid,
organization: team.organization_id,
instance: process.env.SITE_URL ?? 'unknown',
},
})
}
17 changes: 2 additions & 15 deletions plugin-server/src/worker/ingestion/group-type-manager.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { GroupTypeIndex, GroupTypeToColumnIndex, Team, TeamId } from '../../types'
import { PostgresRouter, PostgresUse } from '../../utils/db/postgres'
import { timeoutGuard } from '../../utils/db/utils'
import { posthog } from '../../utils/posthog'
import { captureTeamEvent } from '../../utils/posthog'
import { getByAge } from '../../utils/utils'
import { TeamManager } from './team-manager'

Expand Down Expand Up @@ -110,19 +110,6 @@ export class GroupTypeManager {
return
}

posthog.capture({
distinctId: 'plugin-server',
event: 'group type ingested',
properties: {
team: team.uuid,
groupType,
groupTypeIndex,
},
groups: {
project: team.uuid,
organization: team.organization_id,
instance: this.instanceSiteUrl,
},
})
captureTeamEvent(team, 'group type ingested', { groupType, groupTypeIndex })
}
}
18 changes: 17 additions & 1 deletion plugin-server/tests/cdp/hog-watcher.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,18 @@ describe('HogWatcher', () => {
let hub: Hub
let closeHub: () => Promise<void>
let watcher: HogWatcher
let mockStateChangeCallback: jest.Mock

beforeEach(async () => {
;[hub, closeHub] = await createHub()

now = 1720000000000
mockNow.mockReturnValue(now)
mockStateChangeCallback = jest.fn()

await deleteKeysWithPrefix(hub.redisPool, BASE_REDIS_KEY)

watcher = new HogWatcher(hub)
watcher = new HogWatcher(hub, mockStateChangeCallback)
})

const advanceTime = (ms: number) => {
Expand Down Expand Up @@ -174,6 +176,9 @@ describe('HogWatcher', () => {

await watcher.observeResults(badResults)

expect(mockStateChangeCallback).toHaveBeenCalledTimes(1)
expect(mockStateChangeCallback).toHaveBeenCalledWith('id1', HogWatcherState.disabledForPeriod)

expect(await watcher.getState('id1')).toMatchInlineSnapshot(`
Object {
"rating": 0,
Expand Down Expand Up @@ -204,6 +209,7 @@ describe('HogWatcher', () => {
"tokens": 10000,
}
`)
expect(mockStateChangeCallback).toHaveBeenCalledWith('id1', HogWatcherState.healthy)
})
it('should force degraded', async () => {
await watcher.forceStateChange('id1', HogWatcherState.degraded)
Expand All @@ -214,6 +220,7 @@ describe('HogWatcher', () => {
"tokens": 8000,
}
`)
expect(mockStateChangeCallback).toHaveBeenCalledWith('id1', HogWatcherState.degraded)
})
it('should force disabledForPeriod', async () => {
await watcher.forceStateChange('id1', HogWatcherState.disabledForPeriod)
Expand All @@ -224,6 +231,7 @@ describe('HogWatcher', () => {
"tokens": 0,
}
`)
expect(mockStateChangeCallback).toHaveBeenCalledWith('id1', HogWatcherState.disabledForPeriod)
})
it('should force disabledIndefinitely', async () => {
await watcher.forceStateChange('id1', HogWatcherState.disabledIndefinitely)
Expand All @@ -234,6 +242,7 @@ describe('HogWatcher', () => {
"tokens": 0,
}
`)
expect(mockStateChangeCallback).toHaveBeenCalledWith('id1', HogWatcherState.disabledIndefinitely)
})
})

Expand All @@ -258,10 +267,17 @@ describe('HogWatcher', () => {
expect((await watcher.getState('id1')).state).toEqual(HogWatcherState.degraded)
}

expect(mockStateChangeCallback).toHaveBeenCalledTimes(2)
expect(mockStateChangeCallback.mock.calls[0]).toEqual(['id1', HogWatcherState.disabledForPeriod])
expect(mockStateChangeCallback.mock.calls[1]).toEqual(['id1', HogWatcherState.disabledForPeriod])

await watcher.observeResults([createResult({ id: 'id1', error: 'error!' })])
expect((await watcher.getState('id1')).state).toEqual(HogWatcherState.disabledIndefinitely)
await reallyAdvanceTime(1000)
expect((await watcher.getState('id1')).state).toEqual(HogWatcherState.disabledIndefinitely)

expect(mockStateChangeCallback).toHaveBeenCalledTimes(3)
expect(mockStateChangeCallback.mock.calls[2]).toEqual(['id1', HogWatcherState.disabledIndefinitely])
})
})
})
Expand Down
Loading

0 comments on commit 9ad4c7b

Please sign in to comment.