Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cdp): Track event for hog function state transitions #24217

Merged
merged 52 commits into from
Aug 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
36948ec
Refactoring logging
benjackwhite Aug 6, 2024
0f02afb
Fix
benjackwhite Aug 6, 2024
e8a0b67
More logging
benjackwhite Aug 6, 2024
99cf5f4
Added some helper events when a state transition occurs
benjackwhite Aug 6, 2024
69a402f
Setup team ids
benjackwhite Aug 6, 2024
2acb2c2
Simplified capture
benjackwhite Aug 6, 2024
d69823f
Merge branch 'master' into feat/cdp-posthog-events
benjackwhite Aug 6, 2024
f4c0c0a
Fixes
benjackwhite Aug 6, 2024
f9656bc
Fixes
benjackwhite Aug 6, 2024
c876808
Fixed up states
benjackwhite Aug 6, 2024
941cc65
Token rate limit option instead
benjackwhite Aug 7, 2024
b4a46ab
Fixes
benjackwhite Aug 7, 2024
b701c21
Fixes
benjackwhite Aug 7, 2024
b2c89a8
Update UI snapshots for `chromium` (1)
github-actions[bot] Aug 7, 2024
d6f87c4
Update UI snapshots for `chromium` (2)
github-actions[bot] Aug 7, 2024
19bb85a
Fixes
benjackwhite Aug 7, 2024
398322e
Update UI snapshots for `chromium` (2)
github-actions[bot] Aug 7, 2024
f87fb97
Fixed up oveflow
benjackwhite Aug 7, 2024
a98dea4
Merge branch 'feat/cdp-hog-watcher-v2' into feat/cdp-hog-watcher-v3
benjackwhite Aug 7, 2024
8b5d0f1
Fixes
benjackwhite Aug 7, 2024
9d0fc36
Fix tests
benjackwhite Aug 7, 2024
eb7b167
Fixes
benjackwhite Aug 7, 2024
3e791e0
Fixes
benjackwhite Aug 7, 2024
12a6dae
Added disabling code
benjackwhite Aug 7, 2024
cbbe634
Fixes
benjackwhite Aug 7, 2024
b9d8502
Update UI snapshots for `chromium` (1)
github-actions[bot] Aug 7, 2024
9d83618
Update UI snapshots for `chromium` (1)
github-actions[bot] Aug 7, 2024
53ea121
Fixes
benjackwhite Aug 8, 2024
adebdc5
Fixes
benjackwhite Aug 8, 2024
f645b8c
Fixed up disabled permanently state
benjackwhite Aug 8, 2024
654c0db
Fixes
benjackwhite Aug 8, 2024
624c5f5
Merge branch 'master' into feat/cdp-hog-watcher-v3
benjackwhite Aug 8, 2024
3879919
Fixes
benjackwhite Aug 8, 2024
4fe3183
Fixes
benjackwhite Aug 8, 2024
37c2db0
Fix
benjackwhite Aug 8, 2024
b77bc1e
Merge branch 'feat/track-function-state-changes' into feat/cdp-postho…
benjackwhite Aug 8, 2024
3b5525c
Merge branch 'master' into feat/cdp-hog-watcher-v3
benjackwhite Aug 8, 2024
9804e11
Update UI snapshots for `chromium` (1)
github-actions[bot] Aug 8, 2024
fd5454c
Added callbacks
benjackwhite Aug 8, 2024
2dea8cb
Fixed up hog watcher callback
benjackwhite Aug 8, 2024
2d2b2f3
Update UI snapshots for `chromium` (1)
github-actions[bot] Aug 8, 2024
53a38a2
Merge branch 'feat/cdp-hog-watcher-v3' into feat/cdp-posthog-events
benjackwhite Aug 8, 2024
3b8eed7
Fixes
benjackwhite Aug 8, 2024
f56faf4
Fixes
benjackwhite Aug 8, 2024
975a2ee
Merge branch 'master' into feat/cdp-posthog-events
benjackwhite Aug 9, 2024
633f281
Merge branch 'master' into feat/cdp-posthog-events
benjackwhite Aug 9, 2024
595de9e
Update UI snapshots for `chromium` (1)
github-actions[bot] Aug 9, 2024
880d30e
Fies
benjackwhite Aug 9, 2024
e25aef4
Update UI snapshots for `chromium` (1)
github-actions[bot] Aug 9, 2024
1080236
Update UI snapshots for `chromium` (1)
github-actions[bot] Aug 9, 2024
733adf7
Update UI snapshots for `chromium` (1)
github-actions[bot] Aug 9, 2024
5f269f2
Merge branch 'feat/cdp-posthog-events' of github.com:PostHog/posthog …
benjackwhite Aug 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 26 additions & 1 deletion plugin-server/src/cdp/cdp-consumers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import { addSentryBreadcrumbsEventListeners } from '../main/ingestion-queues/kaf
import { runInstrumentedFunction } from '../main/utils'
import { AppMetric2Type, Hub, RawClickHouseEvent, TeamId, TimestampFormat } from '../types'
import { KafkaProducerWrapper } from '../utils/db/kafka-producer-wrapper'
import { captureTeamEvent } from '../utils/posthog'
import { status } from '../utils/status'
import { castTimestampOrNow } from '../utils/utils'
import { RustyHook } from '../worker/rusty-hook'
Expand Down Expand Up @@ -95,14 +96,38 @@ abstract class CdpConsumerBase {
protected heartbeat = () => {}

constructor(protected hub: Hub) {
this.hogWatcher = new HogWatcher(hub)
this.hogFunctionManager = new HogFunctionManager(hub.postgres, hub)
this.hogWatcher = new HogWatcher(hub, (id, state) => {
void this.captureInternalPostHogEvent(id, 'hog function state changed', { state })
})
this.hogExecutor = new HogExecutor(this.hogFunctionManager)
const rustyHook = this.hub?.rustyHook ?? new RustyHook(this.hub)
this.asyncFunctionExecutor = new AsyncFunctionExecutor(this.hub, rustyHook)
this.groupsManager = new GroupsManager(this.hub)
}

private async captureInternalPostHogEvent(
hogFunctionId: HogFunctionType['id'],
event: string,
properties: any = {}
) {
const hogFunction = this.hogFunctionManager.getHogFunction(hogFunctionId)
if (!hogFunction) {
return
}
const team = await this.hub.teamManager.fetchTeam(hogFunction.team_id)

if (!team) {
return
}

captureTeamEvent(team, event, {
...properties,
hog_function_id: hogFunctionId,
hog_function_url: `${this.hub.SITE_URL}/project/${team.id}/pipeline/destinations/hog-${hogFunctionId}`,
})
}

protected async runWithHeartbeat<T>(func: () => Promise<T> | T): Promise<T> {
// Helper function to ensure that looping over lots of hog functions doesn't block up the thread, killing the consumer
const res = await func()
Expand Down
56 changes: 37 additions & 19 deletions plugin-server/src/cdp/hog-function-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ import { PubSub } from '../utils/pubsub'
import { status } from '../utils/status'
import { HogFunctionType, IntegrationType } from './types'

export type HogFunctionMap = Record<HogFunctionType['id'], HogFunctionType>
export type HogFunctionCache = Record<Team['id'], HogFunctionMap>
type HogFunctionCache = {
functions: Record<HogFunctionType['id'], HogFunctionType | undefined>
teams: Record<Team['id'], HogFunctionType['id'][] | undefined>
}

const HOG_FUNCTION_FIELDS = ['id', 'team_id', 'name', 'enabled', 'inputs', 'inputs_schema', 'filters', 'bytecode']

Expand All @@ -21,7 +23,10 @@ export class HogFunctionManager {
constructor(private postgres: PostgresRouter, private serverConfig: PluginsServerConfig) {
this.started = false
this.ready = false
this.cache = {}
this.cache = {
functions: {},
teams: {},
}

this.pubSub = new PubSub(this.serverConfig, {
'reload-hog-functions': async (message) => {
Expand Down Expand Up @@ -66,14 +71,27 @@ export class HogFunctionManager {
if (!this.ready) {
throw new Error('HogFunctionManager is not ready! Run HogFunctionManager.start() before this')
}
return Object.values(this.cache[teamId] || {})

return Object.values(this.cache.teams[teamId] || [])
.map((id) => this.cache.functions[id])
.filter((x) => !!x) as HogFunctionType[]
}

public getHogFunction(id: HogFunctionType['id']): HogFunctionType | undefined {
if (!this.ready) {
throw new Error('HogFunctionManager is not ready! Run HogFunctionManager.start() before this')
}
return this.cache.functions[id]
}

public getTeamHogFunction(teamId: Team['id'], hogFunctionId: HogFunctionType['id']): HogFunctionType | undefined {
if (!this.ready) {
throw new Error('HogFunctionManager is not ready! Run HogFunctionManager.start() before this')
}
return this.cache[teamId]?.[hogFunctionId]
const fn = this.cache.functions[hogFunctionId]
if (fn?.team_id === teamId) {
return fn
}
}

public teamHasHogFunctions(teamId: Team['id']): boolean {
Expand All @@ -96,13 +114,15 @@ export class HogFunctionManager {

await this.enrichWithIntegrations(items)

const cache: HogFunctionCache = {}
for (const item of items) {
if (!cache[item.team_id]) {
cache[item.team_id] = {}
}
const cache: HogFunctionCache = {
functions: {},
teams: {},
}

cache[item.team_id][item.id] = item
for (const item of items) {
cache.functions[item.id] = item
cache.teams[item.team_id] = cache.teams[item.team_id] || []
cache.teams[item.team_id]!.push(item.id)
}

this.cache = cache
Expand All @@ -125,17 +145,15 @@ export class HogFunctionManager {

await this.enrichWithIntegrations(items)

if (!this.cache[teamId]) {
this.cache[teamId] = {}
}

for (const id of ids) {
// First of all delete the item from the cache - this covers the case where the item was deleted or disabled
delete this.cache[teamId][id]
delete this.cache.functions[id]
this.cache.teams[teamId] = this.cache.teams[teamId]?.filter((x) => x !== id)
}

for (const item of items) {
this.cache[teamId][item.id] = item
this.cache.functions[item.id] = item
this.cache.teams[teamId] = this.cache.teams[teamId] || []
this.cache.teams[teamId]!.push(item.id)
}
}

Expand All @@ -157,7 +175,7 @@ export class HogFunctionManager {
public reloadIntegrations(teamId: Team['id'], ids: IntegrationType['id'][]): Promise<void> {
// We need to find all hog functions that depend on these integrations and re-enrich them

const items: HogFunctionType[] = Object.values(this.cache[teamId] || {})
const items = this.getTeamHogFunctions(teamId)
const itemsToReload = items.filter((item) => ids.some((id) => item.depends_on_integration_ids?.has(id)))

return this.enrichWithIntegrations(itemsToReload)
Expand Down
43 changes: 28 additions & 15 deletions plugin-server/src/cdp/hog-watcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ type HogWatcherRedisClient = Omit<Redis, 'pipeline'> &
}

export class HogWatcher {
constructor(private hub: Hub) {}
constructor(private hub: Hub, private onStateChange: (id: HogFunctionType['id'], state: HogWatcherState) => void) {}

private rateLimitArgs(id: HogFunctionType['id'], cost: number) {
const nowSeconds = Math.round(now() / 1000)
Expand Down Expand Up @@ -214,6 +214,8 @@ export class HogWatcher {

await pipeline.exec()
})

this.onStateChange(id, state)
}

public async observeResults(results: HogFunctionInvocationResult[]): Promise<void> {
Expand Down Expand Up @@ -278,16 +280,18 @@ export class HogWatcher {
return pipeline.exec()
})

const functionsDisabled = disabledFunctionIds.filter((_, index) => (results ? results[index][1] : false))
const functionsTempDisabled = disabledFunctionIds.filter((_, index) =>
results ? results[index][1] : false
)

if (!functionsDisabled.length) {
if (!functionsTempDisabled.length) {
return
}

// We store the history as a zset - we can then use it to determine if we should disable indefinitely
const historyResults = await this.runRedis(async (client) => {
const pipeline = client.pipeline()
functionsDisabled.forEach((id) => {
functionsTempDisabled.forEach((id) => {
const key = `${REDIS_KEY_DISABLED_HISTORY}/${id}`
pipeline.zadd(key, now(), new UUIDT().toString())
pipeline.zrange(key, 0, -1)
Expand All @@ -297,24 +301,33 @@ export class HogWatcher {
return await pipeline.exec()
})

const functionsToDisablePermanently = functionsDisabled.filter((_, index) => {
const functionsToDisablePermanently = functionsTempDisabled.filter((_, index) => {
const history = historyResults ? historyResults[index * 3 + 1][1] : []
return history.length >= this.hub.CDP_WATCHER_DISABLED_TEMPORARY_MAX_COUNT
})

if (!functionsToDisablePermanently.length) {
return
}
if (functionsToDisablePermanently.length) {
await this.runRedis(async (client) => {
const pipeline = client.pipeline()
functionsToDisablePermanently.forEach((id) => {
const key = `${REDIS_KEY_DISABLED}/${id}`
pipeline.set(key, '1')
pipeline.del(`${REDIS_KEY_DISABLED_HISTORY}/${id}`)
})

await this.runRedis(async (client) => {
const pipeline = client.pipeline()
functionsToDisablePermanently.forEach((id) => {
const key = `${REDIS_KEY_DISABLED}/${id}`
pipeline.set(key, '1')
pipeline.del(`${REDIS_KEY_DISABLED_HISTORY}/${id}`)
return await pipeline.exec()
})
}

return await pipeline.exec()
// Finally track the results
functionsToDisablePermanently.forEach((id) => {
this.onStateChange(id, HogWatcherState.disabledIndefinitely)
})

functionsTempDisabled.forEach((id) => {
if (!functionsToDisablePermanently.includes(id)) {
this.onStateChange(id, HogWatcherState.disabledForPeriod)
}
})
}
}
Expand Down
2 changes: 2 additions & 0 deletions plugin-server/src/main/pluginsServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import { Hub, PluginServerCapabilities, PluginsServerConfig } from '../types'
import { createHub, createKafkaClient, createKafkaProducerWrapper } from '../utils/db/hub'
import { PostgresRouter } from '../utils/db/postgres'
import { cancelAllScheduledJobs } from '../utils/node-schedule'
import { posthog } from '../utils/posthog'
import { PubSub } from '../utils/pubsub'
import { status } from '../utils/status'
import { createRedisClient, delay } from '../utils/utils'
Expand Down Expand Up @@ -165,6 +166,7 @@ export async function startPluginsServer(
stopSessionRecordingBlobOverflowConsumer?.(),
schedulerTasksConsumer?.disconnect(),
...shutdownCallbacks.map((cb) => cb()),
posthog.shutdownAsync(),
])

if (piscina) {
Expand Down
18 changes: 18 additions & 0 deletions plugin-server/src/utils/posthog.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,27 @@
import { PostHog } from 'posthog-node'

import { Team } from '../types'

export const posthog = new PostHog('sTMFPsFhdP1Ssg', {
host: 'https://us.i.posthog.com',
})

if (process.env.NODE_ENV === 'test') {
posthog.disable()
}

export const captureTeamEvent = (team: Team, event: string, properties: Record<string, any> = {}): void => {
posthog.capture({
distinctId: team.uuid,
event,
properties: {
team: team.uuid,
...properties,
},
groups: {
project: team.uuid,
organization: team.organization_id,
instance: process.env.SITE_URL ?? 'unknown',
},
})
}
17 changes: 2 additions & 15 deletions plugin-server/src/worker/ingestion/group-type-manager.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { GroupTypeIndex, GroupTypeToColumnIndex, Team, TeamId } from '../../types'
import { PostgresRouter, PostgresUse } from '../../utils/db/postgres'
import { timeoutGuard } from '../../utils/db/utils'
import { posthog } from '../../utils/posthog'
import { captureTeamEvent } from '../../utils/posthog'
import { getByAge } from '../../utils/utils'
import { TeamManager } from './team-manager'

Expand Down Expand Up @@ -110,19 +110,6 @@ export class GroupTypeManager {
return
}

posthog.capture({
distinctId: 'plugin-server',
event: 'group type ingested',
properties: {
team: team.uuid,
groupType,
groupTypeIndex,
},
groups: {
project: team.uuid,
organization: team.organization_id,
instance: this.instanceSiteUrl,
},
})
captureTeamEvent(team, 'group type ingested', { groupType, groupTypeIndex })
}
}
18 changes: 17 additions & 1 deletion plugin-server/tests/cdp/hog-watcher.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,18 @@ describe('HogWatcher', () => {
let hub: Hub
let closeHub: () => Promise<void>
let watcher: HogWatcher
let mockStateChangeCallback: jest.Mock

beforeEach(async () => {
;[hub, closeHub] = await createHub()

now = 1720000000000
mockNow.mockReturnValue(now)
mockStateChangeCallback = jest.fn()

await deleteKeysWithPrefix(hub.redisPool, BASE_REDIS_KEY)

watcher = new HogWatcher(hub)
watcher = new HogWatcher(hub, mockStateChangeCallback)
})

const advanceTime = (ms: number) => {
Expand Down Expand Up @@ -174,6 +176,9 @@ describe('HogWatcher', () => {

await watcher.observeResults(badResults)

expect(mockStateChangeCallback).toHaveBeenCalledTimes(1)
expect(mockStateChangeCallback).toHaveBeenCalledWith('id1', HogWatcherState.disabledForPeriod)

expect(await watcher.getState('id1')).toMatchInlineSnapshot(`
Object {
"rating": 0,
Expand Down Expand Up @@ -204,6 +209,7 @@ describe('HogWatcher', () => {
"tokens": 10000,
}
`)
expect(mockStateChangeCallback).toHaveBeenCalledWith('id1', HogWatcherState.healthy)
})
it('should force degraded', async () => {
await watcher.forceStateChange('id1', HogWatcherState.degraded)
Expand All @@ -214,6 +220,7 @@ describe('HogWatcher', () => {
"tokens": 8000,
}
`)
expect(mockStateChangeCallback).toHaveBeenCalledWith('id1', HogWatcherState.degraded)
})
it('should force disabledForPeriod', async () => {
await watcher.forceStateChange('id1', HogWatcherState.disabledForPeriod)
Expand All @@ -224,6 +231,7 @@ describe('HogWatcher', () => {
"tokens": 0,
}
`)
expect(mockStateChangeCallback).toHaveBeenCalledWith('id1', HogWatcherState.disabledForPeriod)
})
it('should force disabledIndefinitely', async () => {
await watcher.forceStateChange('id1', HogWatcherState.disabledIndefinitely)
Expand All @@ -234,6 +242,7 @@ describe('HogWatcher', () => {
"tokens": 0,
}
`)
expect(mockStateChangeCallback).toHaveBeenCalledWith('id1', HogWatcherState.disabledIndefinitely)
})
})

Expand All @@ -258,10 +267,17 @@ describe('HogWatcher', () => {
expect((await watcher.getState('id1')).state).toEqual(HogWatcherState.degraded)
}

expect(mockStateChangeCallback).toHaveBeenCalledTimes(2)
expect(mockStateChangeCallback.mock.calls[0]).toEqual(['id1', HogWatcherState.disabledForPeriod])
expect(mockStateChangeCallback.mock.calls[1]).toEqual(['id1', HogWatcherState.disabledForPeriod])

await watcher.observeResults([createResult({ id: 'id1', error: 'error!' })])
expect((await watcher.getState('id1')).state).toEqual(HogWatcherState.disabledIndefinitely)
await reallyAdvanceTime(1000)
expect((await watcher.getState('id1')).state).toEqual(HogWatcherState.disabledIndefinitely)

expect(mockStateChangeCallback).toHaveBeenCalledTimes(3)
expect(mockStateChangeCallback.mock.calls[2]).toEqual(['id1', HogWatcherState.disabledIndefinitely])
})
})
})
Expand Down
Loading
Loading