Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cdp): Token bucket based hog watcher #24234

Merged
merged 41 commits into from
Aug 8, 2024
Merged
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
f9656bc
Fixes
benjackwhite Aug 6, 2024
c876808
Fixed up states
benjackwhite Aug 6, 2024
941cc65
Token rate limit option instead
benjackwhite Aug 7, 2024
b4a46ab
Fixes
benjackwhite Aug 7, 2024
b701c21
Fixes
benjackwhite Aug 7, 2024
b2c89a8
Update UI snapshots for `chromium` (1)
github-actions[bot] Aug 7, 2024
d6f87c4
Update UI snapshots for `chromium` (2)
github-actions[bot] Aug 7, 2024
19bb85a
Fixes
benjackwhite Aug 7, 2024
398322e
Update UI snapshots for `chromium` (2)
github-actions[bot] Aug 7, 2024
f87fb97
Fixed up oveflow
benjackwhite Aug 7, 2024
a98dea4
Merge branch 'feat/cdp-hog-watcher-v2' into feat/cdp-hog-watcher-v3
benjackwhite Aug 7, 2024
8b5d0f1
Fixes
benjackwhite Aug 7, 2024
9d0fc36
Fix tests
benjackwhite Aug 7, 2024
eb7b167
Fixes
benjackwhite Aug 7, 2024
3e791e0
Fixes
benjackwhite Aug 7, 2024
12a6dae
Added disabling code
benjackwhite Aug 7, 2024
cbbe634
Fixes
benjackwhite Aug 7, 2024
b9d8502
Update UI snapshots for `chromium` (1)
github-actions[bot] Aug 7, 2024
9d83618
Update UI snapshots for `chromium` (1)
github-actions[bot] Aug 7, 2024
53ea121
Fixes
benjackwhite Aug 8, 2024
adebdc5
Fixes
benjackwhite Aug 8, 2024
f645b8c
Fixed up disabled permanently state
benjackwhite Aug 8, 2024
654c0db
Fixes
benjackwhite Aug 8, 2024
624c5f5
Merge branch 'master' into feat/cdp-hog-watcher-v3
benjackwhite Aug 8, 2024
3879919
Fixes
benjackwhite Aug 8, 2024
4fe3183
Fixes
benjackwhite Aug 8, 2024
37c2db0
Fix
benjackwhite Aug 8, 2024
3b5525c
Merge branch 'master' into feat/cdp-hog-watcher-v3
benjackwhite Aug 8, 2024
9804e11
Update UI snapshots for `chromium` (1)
github-actions[bot] Aug 8, 2024
2d2b2f3
Update UI snapshots for `chromium` (1)
github-actions[bot] Aug 8, 2024
aabe412
Update UI snapshots for `chromium` (1)
github-actions[bot] Aug 8, 2024
b3758f2
Update UI snapshots for `chromium` (2)
github-actions[bot] Aug 8, 2024
6afb2f8
Update UI snapshots for `chromium` (2)
github-actions[bot] Aug 8, 2024
4e31101
Merge branch 'master' into feat/cdp-hog-watcher-v3
benjackwhite Aug 8, 2024
343316e
Update UI snapshots for `chromium` (1)
github-actions[bot] Aug 8, 2024
dd3ac9c
Update UI snapshots for `chromium` (2)
github-actions[bot] Aug 8, 2024
fe2dbf5
Update UI snapshots for `chromium` (1)
github-actions[bot] Aug 8, 2024
f576077
Update UI snapshots for `chromium` (1)
github-actions[bot] Aug 8, 2024
fbc726e
Update UI snapshots for `chromium` (1)
github-actions[bot] Aug 8, 2024
ce29f52
Update UI snapshots for `chromium` (1)
github-actions[bot] Aug 8, 2024
189b832
Update UI snapshots for `chromium` (1)
github-actions[bot] Aug 8, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ export function HogFunctionConfiguration({ templateId, id }: { templateId?: stri
</div>

<div className="border bg-bg-light rounded p-3 space-y-2">
<LemonField name="filters" label="Filters by events and actions">
<LemonField name="filters" label="Filters by events and actions" className="gap-2">
{({ value, onChange }) => (
<>
<TestAccountFilterSwitch
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
import { TZLabel } from '@posthog/apps-common'
import { LemonDropdown, LemonTable, LemonTag, LemonTagProps } from '@posthog/lemon-ui'
import { LemonDropdown, LemonTag, LemonTagProps } from '@posthog/lemon-ui'
import { useValues } from 'kea'
import { dayjs } from 'lib/dayjs'

import { HogWatcherState } from '~/types'

Expand Down Expand Up @@ -66,12 +64,6 @@ export function HogFunctionStatusIndicator(): JSX.Element | null {
? displayMap[hogFunction.status.state]
: DEFAULT_DISPLAY

const noRatings = hogFunction.status?.ratings.length === 0

const averageRating = hogFunction.status?.ratings.length
? hogFunction.status.ratings.reduce((acc, x) => acc + x.rating, 0) / hogFunction.status.ratings.length
: 0

return (
<LemonDropdown
overlay={
Expand All @@ -81,45 +73,7 @@ export function HogFunctionStatusIndicator(): JSX.Element | null {
Function status - <LemonTag type={tagType}>{display}</LemonTag>
</h2>

<p>
Your function has{' '}
{noRatings ? (
<>
no ratings yet. There are either no recent invocations or data is still being
gathered.
</>
) : (
<>
a rating of <b>{Math.round(averageRating * 100)}%</b>.
</>
)}{' '}
A rating of 100% means the function is running perfectly, with 0% meaning it is failing
every time.
</p>

<p>{description}</p>

<h4>History</h4>
<ul>
<LemonTable
columns={[
{
title: 'Timestamp',
key: 'timestamp',
render: (_, { timestamp }) => <TZLabel time={dayjs(timestamp)} />,
},
{
title: 'Status',
key: 'state',
render: (_, { state }) => {
const { tagType, display } = displayMap[state] || DEFAULT_DISPLAY
return <LemonTag type={tagType}>{display}</LemonTag>
},
},
]}
dataSource={hogFunction.status?.states ?? []}
/>
</ul>
</div>
</>
}
Expand Down
10 changes: 2 additions & 8 deletions frontend/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4330,14 +4330,8 @@ export enum HogWatcherState {

export type HogFunctionStatus = {
state: HogWatcherState
states: {
timestamp: number
state: HogWatcherState
}[]
ratings: {
timestamp: number
rating: number
}[]
rating: number
tokens: number
}

export type HogFunctionInvocationGlobals = {
Expand Down
9 changes: 4 additions & 5 deletions plugin-server/src/cdp/cdp-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@ import { delay } from '../utils/utils'
import { AsyncFunctionExecutor } from './async-function-executor'
import { HogExecutor } from './hog-executor'
import { HogFunctionManager } from './hog-function-manager'
import { HogWatcher } from './hog-watcher/hog-watcher'
import { HogWatcherState } from './hog-watcher/types'
import { HogWatcher, HogWatcherState } from './hog-watcher'
import { HogFunctionInvocation, HogFunctionInvocationAsyncRequest, HogFunctionType, LogEntry } from './types'

export class CdpApi {
Expand Down Expand Up @@ -52,7 +51,7 @@ export class CdpApi {
() =>
async (req: express.Request, res: express.Response): Promise<void> => {
const { id } = req.params
const summary = await this.hogWatcher.fetchWatcher(id)
const summary = await this.hogWatcher.getState(id)

res.json(summary)
}
Expand All @@ -69,7 +68,7 @@ export class CdpApi {
return
}

const summary = await this.hogWatcher.fetchWatcher(id)
const summary = await this.hogWatcher.getState(id)

// Only allow patching the status if it is different from the current status

Expand All @@ -80,7 +79,7 @@ export class CdpApi {
// Hacky - wait for a little to give a chance for the state to change
await delay(100)

res.json(await this.hogWatcher.fetchWatcher(id))
res.json(await this.hogWatcher.getState(id))
}

private postFunctionInvocation = async (req: express.Request, res: express.Response): Promise<void> => {
Expand Down
114 changes: 57 additions & 57 deletions plugin-server/src/cdp/cdp-consumers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ import { AsyncFunctionExecutor } from './async-function-executor'
import { GroupsManager } from './groups-manager'
import { HogExecutor } from './hog-executor'
import { HogFunctionManager } from './hog-function-manager'
import { HogWatcher } from './hog-watcher/hog-watcher'
import { HogWatcherState } from './hog-watcher/types'
import { HogWatcher, HogWatcherState } from './hog-watcher'
import {
CdpOverflowMessage,
HogFunctionAsyncFunctionResponse,
Expand Down Expand Up @@ -257,8 +256,6 @@ abstract class CdpConsumerBase {
return await runInstrumentedFunction({
statsKey: `cdpConsumer.handleEachBatch.executeAsyncResponses`,
func: async () => {
// NOTE: Disabled for now as it needs some rethinking
// this.hogWatcher.currentObservations.observeAsyncFunctionResponses(asyncResponses)
asyncResponses.forEach((x) => {
counterAsyncFunctionResponse.inc({
outcome: x.asyncFunctionResponse.error ? 'failed' : 'succeeded',
Expand Down Expand Up @@ -286,7 +283,7 @@ abstract class CdpConsumerBase {
this.hogExecutor.executeAsyncResponse(...item)
)

this.hogWatcher.currentObservations.observeResults(results)
await this.hogWatcher.observeResults(results)
return results
},
})
Expand All @@ -298,14 +295,23 @@ abstract class CdpConsumerBase {
return await runInstrumentedFunction({
statsKey: `cdpConsumer.handleEachBatch.executeMatchingFunctions`,
func: async () => {
const invocations: { globals: HogFunctionInvocationGlobals; hogFunction: HogFunctionType }[] = []
const possibleInvocations: { globals: HogFunctionInvocationGlobals; hogFunction: HogFunctionType }[] =
[]

// TODO: Add a helper to hog functions to determine if they require groups or not and then only load those
await this.groupsManager.enrichGroups(invocationGlobals)

// Find all functions that could need running
invocationGlobals.forEach((globals) => {
const { matchingFunctions, nonMatchingFunctions } = this.hogExecutor.findMatchingFunctions(globals)

possibleInvocations.push(
...matchingFunctions.map((hogFunction) => ({
globals,
hogFunction,
}))
)

nonMatchingFunctions.forEach((item) =>
this.produceAppMetric({
team_id: item.team_id,
Expand All @@ -315,59 +321,52 @@ abstract class CdpConsumerBase {
count: 1,
})
)
})

// Filter for overflowed and disabled functions
const hogFunctionsByState = matchingFunctions.reduce((acc, item) => {
const state = this.hogWatcher.getFunctionState(item.id)
return {
...acc,
[state]: [...(acc[state] ?? []), item],
}
}, {} as Record<HogWatcherState, HogFunctionType[] | undefined>)

if (hogFunctionsByState[HogWatcherState.overflowed]?.length) {
const overflowed = hogFunctionsByState[HogWatcherState.overflowed]!
// Group all overflowed functions into one event
counterFunctionInvocation.inc({ outcome: 'overflowed' }, overflowed.length)

this.messagesToProduce.push({
topic: KAFKA_CDP_FUNCTION_OVERFLOW,
value: {
source: 'event_invocations',
payload: {
hogFunctionIds: overflowed.map((x) => x.id),
globals,
},
},
key: globals.event.uuid,
})
}
const states = await this.hogWatcher.getStates(possibleInvocations.map((x) => x.hogFunction.id))

hogFunctionsByState[HogWatcherState.disabledForPeriod]?.forEach((item) => {
this.produceAppMetric({
team_id: item.team_id,
app_source_id: item.id,
metric_kind: 'failure',
metric_name: 'disabled_temporarily',
count: 1,
})
})
const overflowGlobalsAndFunctions: Record<string, HogFunctionOverflowedGlobals> = {}

hogFunctionsByState[HogWatcherState.disabledIndefinitely]?.forEach((item) => {
const invocations = possibleInvocations.filter((item) => {
const state = states[item.hogFunction.id].state
if (state >= HogWatcherState.disabledForPeriod) {
this.produceAppMetric({
team_id: item.team_id,
app_source_id: item.id,
team_id: item.globals.project.id,
app_source_id: item.hogFunction.id,
metric_kind: 'failure',
metric_name: 'disabled_permanently',
metric_name:
state === HogWatcherState.disabledForPeriod
? 'disabled_temporarily'
: 'disabled_permanently',
count: 1,
})
})
return false
}

hogFunctionsByState[HogWatcherState.healthy]?.forEach((item) => {
invocations.push({
globals,
hogFunction: item,
})
if (state === HogWatcherState.degraded) {
const key = `${item.globals.project.id}-${item.globals.event.uuid}`
overflowGlobalsAndFunctions[key] = overflowGlobalsAndFunctions[key] || {
globals: item.globals,
hogFunctionIds: [],
}

overflowGlobalsAndFunctions[key].hogFunctionIds.push(item.hogFunction.id)
counterFunctionInvocation.inc({ outcome: 'overflowed' }, 1)

return false
}

return true
})

Object.values(overflowGlobalsAndFunctions).forEach((item) => {
this.messagesToProduce.push({
topic: KAFKA_CDP_FUNCTION_OVERFLOW,
value: {
source: 'event_invocations',
payload: item,
},
key: item.globals.event.uuid,
})
})

Expand All @@ -377,7 +376,7 @@ abstract class CdpConsumerBase {
)
).filter((x) => !!x) as HogFunctionInvocationResult[]

this.hogWatcher.currentObservations.observeResults(results)
await this.hogWatcher.observeResults(results)
return results
},
})
Expand All @@ -393,7 +392,7 @@ abstract class CdpConsumerBase {
const globalConnectionConfig = createRdConnectionConfigFromEnvVars(this.hub)
const globalProducerConfig = createRdProducerConfigFromEnvVars(this.hub)

await Promise.all([this.hogFunctionManager.start(), this.hogWatcher.start()])
await Promise.all([this.hogFunctionManager.start()])

this.kafkaProducer = new KafkaProducerWrapper(
await createKafkaProducer(globalConnectionConfig, globalProducerConfig)
Expand Down Expand Up @@ -447,13 +446,12 @@ abstract class CdpConsumerBase {
status.info('🔁', `${this.name} - stopping kafka producer`)
await this.kafkaProducer?.disconnect()
status.info('🔁', `${this.name} - stopping hog function manager and hog watcher`)
await Promise.all([this.hogFunctionManager.stop(), this.hogWatcher.stop()])
await Promise.all([this.hogFunctionManager.stop()])

status.info('👍', `${this.name} - stopped!`)
}

public isHealthy() {
// TODO: Maybe extend this to check if we are shutting down so we don't get killed early.
return this.batchConsumer?.isHealthy()
}
}
Expand Down Expand Up @@ -598,9 +596,11 @@ export class CdpOverflowConsumer extends CdpConsumerBase {
)
.flat()

const states = await this.hogWatcher.getStates(invocationGlobals.map((x) => x.hogFunctionIds).flat())

const results = (
await this.runManyWithHeartbeat(invocations, (item) => {
const state = this.hogWatcher.getFunctionState(item.hogFunctionId)
const state = states[item.hogFunctionId].state
if (state >= HogWatcherState.disabledForPeriod) {
this.produceAppMetric({
team_id: item.globals.project.id,
Expand All @@ -618,7 +618,7 @@ export class CdpOverflowConsumer extends CdpConsumerBase {
})
).filter((x) => !!x) as HogFunctionInvocationResult[]

this.hogWatcher.currentObservations.observeResults(results)
await this.hogWatcher.observeResults(results)
return results
},
})
Expand Down
Loading
Loading