Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cdp): Hog Watcher v2 #24222

Closed
wants to merge 12 commits into from
2 changes: 1 addition & 1 deletion .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@
"WORKER_CONCURRENCY": "2",
"OBJECT_STORAGE_ENABLED": "True",
"HOG_HOOK_URL": "http://localhost:3300/hoghook",
"CDP_ASYNC_FUNCTIONS_RUSTY_HOOK_TEAMS": ""
"CDP_ASYNC_FUNCTIONS_RUSTY_HOOK_TEAMS": "*"
},
"presentation": {
"group": "main"
Expand Down
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
import { TZLabel } from '@posthog/apps-common'
import { LemonDropdown, LemonTable, LemonTag, LemonTagProps } from '@posthog/lemon-ui'
import { LemonDropdown, LemonTag, LemonTagProps } from '@posthog/lemon-ui'
import { useValues } from 'kea'
import { dayjs } from 'lib/dayjs'

import { HogWatcherState } from '~/types'

Expand Down Expand Up @@ -66,12 +64,6 @@ export function HogFunctionStatusIndicator(): JSX.Element | null {
? displayMap[hogFunction.status.state]
: DEFAULT_DISPLAY

const noRatings = hogFunction.status?.ratings.length === 0

const averageRating = hogFunction.status?.ratings.length
? hogFunction.status.ratings.reduce((acc, x) => acc + x.rating, 0) / hogFunction.status.ratings.length
: 0

return (
<LemonDropdown
overlay={
Expand All @@ -81,45 +73,7 @@ export function HogFunctionStatusIndicator(): JSX.Element | null {
Function status - <LemonTag type={tagType}>{display}</LemonTag>
</h2>

<p>
Your function has{' '}
{noRatings ? (
<>
no ratings yet. There are either no recent invocations or data is still being
gathered.
</>
) : (
<>
a rating of <b>{Math.round(averageRating * 100)}%</b>.
</>
)}{' '}
A rating of 100% means the function is running perfectly, with 0% meaning it is failing
every time.
</p>

<p>{description}</p>

<h4>History</h4>
<ul>
<LemonTable
columns={[
{
title: 'Timestamp',
key: 'timestamp',
render: (_, { timestamp }) => <TZLabel time={dayjs(timestamp)} />,
},
{
title: 'Status',
key: 'state',
render: (_, { state }) => {
const { tagType, display } = displayMap[state] || DEFAULT_DISPLAY
return <LemonTag type={tagType}>{display}</LemonTag>
},
},
]}
dataSource={hogFunction.status?.states ?? []}
/>
</ul>
</div>
</>
}
Expand Down
9 changes: 1 addition & 8 deletions frontend/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4330,14 +4330,7 @@ export enum HogWatcherState {

export type HogFunctionStatus = {
state: HogWatcherState
states: {
timestamp: number
state: HogWatcherState
}[]
ratings: {
timestamp: number
rating: number
}[]
score: number
}

export type HogFunctionInvocationGlobals = {
Expand Down
9 changes: 4 additions & 5 deletions plugin-server/src/cdp/cdp-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@ import { delay } from '../utils/utils'
import { AsyncFunctionExecutor } from './async-function-executor'
import { HogExecutor } from './hog-executor'
import { HogFunctionManager } from './hog-function-manager'
import { HogWatcher } from './hog-watcher/hog-watcher'
import { HogWatcherState } from './hog-watcher/types'
import { HogWatcher, HogWatcherState } from './hog-watcher'
import { HogFunctionInvocation, HogFunctionInvocationAsyncRequest, HogFunctionType, LogEntry } from './types'

export class CdpApi {
Expand Down Expand Up @@ -52,7 +51,7 @@ export class CdpApi {
() =>
async (req: express.Request, res: express.Response): Promise<void> => {
const { id } = req.params
const summary = await this.hogWatcher.fetchWatcher(id)
const summary = await this.hogWatcher.getState(id)

res.json(summary)
}
Expand All @@ -69,7 +68,7 @@ export class CdpApi {
return
}

const summary = await this.hogWatcher.fetchWatcher(id)
const summary = await this.hogWatcher.getState(id)

// Only allow patching the status if it is different from the current status

Expand All @@ -80,7 +79,7 @@ export class CdpApi {
// Hacky - wait for a little to give a chance for the state to change
await delay(100)

res.json(await this.hogWatcher.fetchWatcher(id))
res.json(await this.hogWatcher.getState(id))
}

private postFunctionInvocation = async (req: express.Request, res: express.Response): Promise<void> => {
Expand Down
114 changes: 57 additions & 57 deletions plugin-server/src/cdp/cdp-consumers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ import { AsyncFunctionExecutor } from './async-function-executor'
import { GroupsManager } from './groups-manager'
import { HogExecutor } from './hog-executor'
import { HogFunctionManager } from './hog-function-manager'
import { HogWatcher } from './hog-watcher/hog-watcher'
import { HogWatcherState } from './hog-watcher/types'
import { HogWatcher, HogWatcherState } from './hog-watcher'
import {
CdpOverflowMessage,
HogFunctionAsyncFunctionResponse,
Expand Down Expand Up @@ -257,8 +256,6 @@ abstract class CdpConsumerBase {
return await runInstrumentedFunction({
statsKey: `cdpConsumer.handleEachBatch.executeAsyncResponses`,
func: async () => {
// NOTE: Disabled for now as it needs some rethinking
// this.hogWatcher.currentObservations.observeAsyncFunctionResponses(asyncResponses)
asyncResponses.forEach((x) => {
counterAsyncFunctionResponse.inc({
outcome: x.asyncFunctionResponse.error ? 'failed' : 'succeeded',
Expand Down Expand Up @@ -286,7 +283,7 @@ abstract class CdpConsumerBase {
this.hogExecutor.executeAsyncResponse(...item)
)

this.hogWatcher.currentObservations.observeResults(results)
await this.hogWatcher.observeResults(results)
return results
},
})
Expand All @@ -298,14 +295,23 @@ abstract class CdpConsumerBase {
return await runInstrumentedFunction({
statsKey: `cdpConsumer.handleEachBatch.executeMatchingFunctions`,
func: async () => {
const invocations: { globals: HogFunctionInvocationGlobals; hogFunction: HogFunctionType }[] = []
const possibleInvocations: { globals: HogFunctionInvocationGlobals; hogFunction: HogFunctionType }[] =
[]

// TODO: Add a helper to hog functions to determine if they require groups or not and then only load those
await this.groupsManager.enrichGroups(invocationGlobals)

// Find all functions that could need running
invocationGlobals.forEach((globals) => {
const { matchingFunctions, nonMatchingFunctions } = this.hogExecutor.findMatchingFunctions(globals)

possibleInvocations.push(
...matchingFunctions.map((hogFunction) => ({
globals,
hogFunction,
}))
)

nonMatchingFunctions.forEach((item) =>
this.produceAppMetric({
team_id: item.team_id,
Expand All @@ -315,59 +321,52 @@ abstract class CdpConsumerBase {
count: 1,
})
)
})

// Filter for overflowed and disabled functions
const hogFunctionsByState = matchingFunctions.reduce((acc, item) => {
const state = this.hogWatcher.getFunctionState(item.id)
return {
...acc,
[state]: [...(acc[state] ?? []), item],
}
}, {} as Record<HogWatcherState, HogFunctionType[] | undefined>)

if (hogFunctionsByState[HogWatcherState.overflowed]?.length) {
const overflowed = hogFunctionsByState[HogWatcherState.overflowed]!
// Group all overflowed functions into one event
counterFunctionInvocation.inc({ outcome: 'overflowed' }, overflowed.length)

this.messagesToProduce.push({
topic: KAFKA_CDP_FUNCTION_OVERFLOW,
value: {
source: 'event_invocations',
payload: {
hogFunctionIds: overflowed.map((x) => x.id),
globals,
},
},
key: globals.event.uuid,
})
}
const states = await this.hogWatcher.getStates(possibleInvocations.map((x) => x.hogFunction.id))

hogFunctionsByState[HogWatcherState.disabledForPeriod]?.forEach((item) => {
this.produceAppMetric({
team_id: item.team_id,
app_source_id: item.id,
metric_kind: 'failure',
metric_name: 'disabled_temporarily',
count: 1,
})
})
const overflowGlobalsAndFunctions: Record<string, HogFunctionOverflowedGlobals> = {}

hogFunctionsByState[HogWatcherState.disabledIndefinitely]?.forEach((item) => {
const invocations = possibleInvocations.filter((item) => {
const state = states[item.hogFunction.id].state
if (state >= HogWatcherState.disabledForPeriod) {
this.produceAppMetric({
team_id: item.team_id,
app_source_id: item.id,
team_id: item.globals.project.id,
app_source_id: item.hogFunction.id,
metric_kind: 'failure',
metric_name: 'disabled_permanently',
metric_name:
state === HogWatcherState.disabledForPeriod
? 'disabled_temporarily'
: 'disabled_permanently',
count: 1,
})
})
return false
}

hogFunctionsByState[HogWatcherState.healthy]?.forEach((item) => {
invocations.push({
globals,
hogFunction: item,
})
if (state === HogWatcherState.degraded) {
const key = `${item.globals.project.id}-${item.globals.event.uuid}`
overflowGlobalsAndFunctions[key] = overflowGlobalsAndFunctions[key] || {
globals: item.globals,
hogFunctionIds: [],
}

overflowGlobalsAndFunctions[key].hogFunctionIds.push(item.hogFunction.id)
counterFunctionInvocation.inc({ outcome: 'overflowed' }, 1)

return false
}

return true
})

Object.values(overflowGlobalsAndFunctions).forEach((item) => {
this.messagesToProduce.push({
topic: KAFKA_CDP_FUNCTION_OVERFLOW,
value: {
source: 'event_invocations',
payload: item,
},
key: item.globals.event.uuid,
})
})

Expand All @@ -377,7 +376,7 @@ abstract class CdpConsumerBase {
)
).filter((x) => !!x) as HogFunctionInvocationResult[]

this.hogWatcher.currentObservations.observeResults(results)
await this.hogWatcher.observeResults(results)
return results
},
})
Expand All @@ -393,7 +392,7 @@ abstract class CdpConsumerBase {
const globalConnectionConfig = createRdConnectionConfigFromEnvVars(this.hub)
const globalProducerConfig = createRdProducerConfigFromEnvVars(this.hub)

await Promise.all([this.hogFunctionManager.start(), this.hogWatcher.start()])
await Promise.all([this.hogFunctionManager.start()])

this.kafkaProducer = new KafkaProducerWrapper(
await createKafkaProducer(globalConnectionConfig, globalProducerConfig)
Expand Down Expand Up @@ -447,13 +446,12 @@ abstract class CdpConsumerBase {
status.info('🔁', `${this.name} - stopping kafka producer`)
await this.kafkaProducer?.disconnect()
status.info('🔁', `${this.name} - stopping hog function manager and hog watcher`)
await Promise.all([this.hogFunctionManager.stop(), this.hogWatcher.stop()])
await Promise.all([this.hogFunctionManager.stop()])

status.info('👍', `${this.name} - stopped!`)
}

public isHealthy() {
// TODO: Maybe extend this to check if we are shutting down so we don't get killed early.
return this.batchConsumer?.isHealthy()
}
}
Expand Down Expand Up @@ -598,9 +596,11 @@ export class CdpOverflowConsumer extends CdpConsumerBase {
)
.flat()

const states = await this.hogWatcher.getStates(invocationGlobals.map((x) => x.hogFunctionIds).flat())

const results = (
await this.runManyWithHeartbeat(invocations, (item) => {
const state = this.hogWatcher.getFunctionState(item.hogFunctionId)
const state = states[item.hogFunctionId].state
if (state >= HogWatcherState.disabledForPeriod) {
this.produceAppMetric({
team_id: item.globals.project.id,
Expand All @@ -618,7 +618,7 @@ export class CdpOverflowConsumer extends CdpConsumerBase {
})
).filter((x) => !!x) as HogFunctionInvocationResult[]

this.hogWatcher.currentObservations.observeResults(results)
await this.hogWatcher.observeResults(results)
return results
},
})
Expand Down
Loading
Loading