diff --git a/frontend/__snapshots__/exporter-exporter--user-paths-insight--dark.png b/frontend/__snapshots__/exporter-exporter--user-paths-insight--dark.png
index acc74d32b1329..ddabab3e2704a 100644
Binary files a/frontend/__snapshots__/exporter-exporter--user-paths-insight--dark.png and b/frontend/__snapshots__/exporter-exporter--user-paths-insight--dark.png differ
diff --git a/frontend/__snapshots__/exporter-exporter--user-paths-insight--light.png b/frontend/__snapshots__/exporter-exporter--user-paths-insight--light.png
index 7a1a6ee0a0a6b..2b0e70c4b4901 100644
Binary files a/frontend/__snapshots__/exporter-exporter--user-paths-insight--light.png and b/frontend/__snapshots__/exporter-exporter--user-paths-insight--light.png differ
diff --git a/frontend/src/scenes/pipeline/hogfunctions/HogFunctionConfiguration.tsx b/frontend/src/scenes/pipeline/hogfunctions/HogFunctionConfiguration.tsx
index c5ad19c182b6f..0ab04ea9cb2d6 100644
--- a/frontend/src/scenes/pipeline/hogfunctions/HogFunctionConfiguration.tsx
+++ b/frontend/src/scenes/pipeline/hogfunctions/HogFunctionConfiguration.tsx
@@ -248,7 +248,7 @@ export function HogFunctionConfiguration({ templateId, id }: { templateId?: stri
-
+
{({ value, onChange }) => (
<>
acc + x.rating, 0) / hogFunction.status.ratings.length
- : 0
-
return (
{display}
-
- Your function has{' '}
- {noRatings ? (
- <>
- no ratings yet. There are either no recent invocations or data is still being
- gathered.
- >
- ) : (
- <>
- a rating of {Math.round(averageRating * 100)}%.
- >
- )}{' '}
- A rating of 100% means the function is running perfectly, with 0% meaning it is failing
- every time.
-
-
{description}
-
- History
-
- ,
- },
- {
- title: 'Status',
- key: 'state',
- render: (_, { state }) => {
- const { tagType, display } = displayMap[state] || DEFAULT_DISPLAY
- return {display}
- },
- },
- ]}
- dataSource={hogFunction.status?.states ?? []}
- />
-
>
}
diff --git a/frontend/src/types.ts b/frontend/src/types.ts
index f1f69ed546e4f..36732caedafe4 100644
--- a/frontend/src/types.ts
+++ b/frontend/src/types.ts
@@ -4330,14 +4330,8 @@ export enum HogWatcherState {
export type HogFunctionStatus = {
state: HogWatcherState
- states: {
- timestamp: number
- state: HogWatcherState
- }[]
- ratings: {
- timestamp: number
- rating: number
- }[]
+ rating: number
+ tokens: number
}
export type HogFunctionInvocationGlobals = {
diff --git a/plugin-server/src/cdp/cdp-api.ts b/plugin-server/src/cdp/cdp-api.ts
index 4d9cd53c58cc5..553e380e16cdf 100644
--- a/plugin-server/src/cdp/cdp-api.ts
+++ b/plugin-server/src/cdp/cdp-api.ts
@@ -8,8 +8,7 @@ import { delay } from '../utils/utils'
import { AsyncFunctionExecutor } from './async-function-executor'
import { HogExecutor } from './hog-executor'
import { HogFunctionManager } from './hog-function-manager'
-import { HogWatcher } from './hog-watcher/hog-watcher'
-import { HogWatcherState } from './hog-watcher/types'
+import { HogWatcher, HogWatcherState } from './hog-watcher'
import { HogFunctionInvocation, HogFunctionInvocationAsyncRequest, HogFunctionType, LogEntry } from './types'
export class CdpApi {
@@ -52,7 +51,7 @@ export class CdpApi {
() =>
async (req: express.Request, res: express.Response): Promise => {
const { id } = req.params
- const summary = await this.hogWatcher.fetchWatcher(id)
+ const summary = await this.hogWatcher.getState(id)
res.json(summary)
}
@@ -69,7 +68,7 @@ export class CdpApi {
return
}
- const summary = await this.hogWatcher.fetchWatcher(id)
+ const summary = await this.hogWatcher.getState(id)
// Only allow patching the status if it is different from the current status
@@ -80,7 +79,7 @@ export class CdpApi {
// Hacky - wait for a little to give a chance for the state to change
await delay(100)
- res.json(await this.hogWatcher.fetchWatcher(id))
+ res.json(await this.hogWatcher.getState(id))
}
private postFunctionInvocation = async (req: express.Request, res: express.Response): Promise => {
diff --git a/plugin-server/src/cdp/cdp-consumers.ts b/plugin-server/src/cdp/cdp-consumers.ts
index ca0accfc157dc..b5fa9862dfa56 100644
--- a/plugin-server/src/cdp/cdp-consumers.ts
+++ b/plugin-server/src/cdp/cdp-consumers.ts
@@ -24,8 +24,7 @@ import { AsyncFunctionExecutor } from './async-function-executor'
import { GroupsManager } from './groups-manager'
import { HogExecutor } from './hog-executor'
import { HogFunctionManager } from './hog-function-manager'
-import { HogWatcher } from './hog-watcher/hog-watcher'
-import { HogWatcherState } from './hog-watcher/types'
+import { HogWatcher, HogWatcherState } from './hog-watcher'
import {
CdpOverflowMessage,
HogFunctionAsyncFunctionResponse,
@@ -257,8 +256,6 @@ abstract class CdpConsumerBase {
return await runInstrumentedFunction({
statsKey: `cdpConsumer.handleEachBatch.executeAsyncResponses`,
func: async () => {
- // NOTE: Disabled for now as it needs some rethinking
- // this.hogWatcher.currentObservations.observeAsyncFunctionResponses(asyncResponses)
asyncResponses.forEach((x) => {
counterAsyncFunctionResponse.inc({
outcome: x.asyncFunctionResponse.error ? 'failed' : 'succeeded',
@@ -286,7 +283,7 @@ abstract class CdpConsumerBase {
this.hogExecutor.executeAsyncResponse(...item)
)
- this.hogWatcher.currentObservations.observeResults(results)
+ await this.hogWatcher.observeResults(results)
return results
},
})
@@ -298,14 +295,23 @@ abstract class CdpConsumerBase {
return await runInstrumentedFunction({
statsKey: `cdpConsumer.handleEachBatch.executeMatchingFunctions`,
func: async () => {
- const invocations: { globals: HogFunctionInvocationGlobals; hogFunction: HogFunctionType }[] = []
+ const possibleInvocations: { globals: HogFunctionInvocationGlobals; hogFunction: HogFunctionType }[] =
+ []
// TODO: Add a helper to hog functions to determine if they require groups or not and then only load those
await this.groupsManager.enrichGroups(invocationGlobals)
+ // Find all functions that could need running
invocationGlobals.forEach((globals) => {
const { matchingFunctions, nonMatchingFunctions } = this.hogExecutor.findMatchingFunctions(globals)
+ possibleInvocations.push(
+ ...matchingFunctions.map((hogFunction) => ({
+ globals,
+ hogFunction,
+ }))
+ )
+
nonMatchingFunctions.forEach((item) =>
this.produceAppMetric({
team_id: item.team_id,
@@ -315,59 +321,52 @@ abstract class CdpConsumerBase {
count: 1,
})
)
+ })
- // Filter for overflowed and disabled functions
- const hogFunctionsByState = matchingFunctions.reduce((acc, item) => {
- const state = this.hogWatcher.getFunctionState(item.id)
- return {
- ...acc,
- [state]: [...(acc[state] ?? []), item],
- }
- }, {} as Record)
-
- if (hogFunctionsByState[HogWatcherState.overflowed]?.length) {
- const overflowed = hogFunctionsByState[HogWatcherState.overflowed]!
- // Group all overflowed functions into one event
- counterFunctionInvocation.inc({ outcome: 'overflowed' }, overflowed.length)
-
- this.messagesToProduce.push({
- topic: KAFKA_CDP_FUNCTION_OVERFLOW,
- value: {
- source: 'event_invocations',
- payload: {
- hogFunctionIds: overflowed.map((x) => x.id),
- globals,
- },
- },
- key: globals.event.uuid,
- })
- }
+ const states = await this.hogWatcher.getStates(possibleInvocations.map((x) => x.hogFunction.id))
- hogFunctionsByState[HogWatcherState.disabledForPeriod]?.forEach((item) => {
- this.produceAppMetric({
- team_id: item.team_id,
- app_source_id: item.id,
- metric_kind: 'failure',
- metric_name: 'disabled_temporarily',
- count: 1,
- })
- })
+ const overflowGlobalsAndFunctions: Record = {}
- hogFunctionsByState[HogWatcherState.disabledIndefinitely]?.forEach((item) => {
+ const invocations = possibleInvocations.filter((item) => {
+ const state = states[item.hogFunction.id].state
+ if (state >= HogWatcherState.disabledForPeriod) {
this.produceAppMetric({
- team_id: item.team_id,
- app_source_id: item.id,
+ team_id: item.globals.project.id,
+ app_source_id: item.hogFunction.id,
metric_kind: 'failure',
- metric_name: 'disabled_permanently',
+ metric_name:
+ state === HogWatcherState.disabledForPeriod
+ ? 'disabled_temporarily'
+ : 'disabled_permanently',
count: 1,
})
- })
+ return false
+ }
- hogFunctionsByState[HogWatcherState.healthy]?.forEach((item) => {
- invocations.push({
- globals,
- hogFunction: item,
- })
+ if (state === HogWatcherState.degraded) {
+ const key = `${item.globals.project.id}-${item.globals.event.uuid}`
+ overflowGlobalsAndFunctions[key] = overflowGlobalsAndFunctions[key] || {
+ globals: item.globals,
+ hogFunctionIds: [],
+ }
+
+ overflowGlobalsAndFunctions[key].hogFunctionIds.push(item.hogFunction.id)
+ counterFunctionInvocation.inc({ outcome: 'overflowed' }, 1)
+
+ return false
+ }
+
+ return true
+ })
+
+ Object.values(overflowGlobalsAndFunctions).forEach((item) => {
+ this.messagesToProduce.push({
+ topic: KAFKA_CDP_FUNCTION_OVERFLOW,
+ value: {
+ source: 'event_invocations',
+ payload: item,
+ },
+ key: item.globals.event.uuid,
})
})
@@ -377,7 +376,7 @@ abstract class CdpConsumerBase {
)
).filter((x) => !!x) as HogFunctionInvocationResult[]
- this.hogWatcher.currentObservations.observeResults(results)
+ await this.hogWatcher.observeResults(results)
return results
},
})
@@ -393,7 +392,7 @@ abstract class CdpConsumerBase {
const globalConnectionConfig = createRdConnectionConfigFromEnvVars(this.hub)
const globalProducerConfig = createRdProducerConfigFromEnvVars(this.hub)
- await Promise.all([this.hogFunctionManager.start(), this.hogWatcher.start()])
+ await Promise.all([this.hogFunctionManager.start()])
this.kafkaProducer = new KafkaProducerWrapper(
await createKafkaProducer(globalConnectionConfig, globalProducerConfig)
@@ -447,13 +446,12 @@ abstract class CdpConsumerBase {
status.info('🔁', `${this.name} - stopping kafka producer`)
await this.kafkaProducer?.disconnect()
status.info('🔁', `${this.name} - stopping hog function manager and hog watcher`)
- await Promise.all([this.hogFunctionManager.stop(), this.hogWatcher.stop()])
+ await Promise.all([this.hogFunctionManager.stop()])
status.info('👍', `${this.name} - stopped!`)
}
public isHealthy() {
- // TODO: Maybe extend this to check if we are shutting down so we don't get killed early.
return this.batchConsumer?.isHealthy()
}
}
@@ -598,9 +596,11 @@ export class CdpOverflowConsumer extends CdpConsumerBase {
)
.flat()
+ const states = await this.hogWatcher.getStates(invocationGlobals.map((x) => x.hogFunctionIds).flat())
+
const results = (
await this.runManyWithHeartbeat(invocations, (item) => {
- const state = this.hogWatcher.getFunctionState(item.hogFunctionId)
+ const state = states[item.hogFunctionId].state
if (state >= HogWatcherState.disabledForPeriod) {
this.produceAppMetric({
team_id: item.globals.project.id,
@@ -618,7 +618,7 @@ export class CdpOverflowConsumer extends CdpConsumerBase {
})
).filter((x) => !!x) as HogFunctionInvocationResult[]
- this.hogWatcher.currentObservations.observeResults(results)
+ await this.hogWatcher.observeResults(results)
return results
},
})
diff --git a/plugin-server/src/cdp/hog-watcher.ts b/plugin-server/src/cdp/hog-watcher.ts
new file mode 100644
index 0000000000000..3c933b69f5347
--- /dev/null
+++ b/plugin-server/src/cdp/hog-watcher.ts
@@ -0,0 +1,321 @@
+import { captureException } from '@sentry/node'
+import { Pipeline, Redis } from 'ioredis'
+
+import { Hub } from '../types'
+import { timeoutGuard } from '../utils/db/utils'
+import { now } from '../utils/now'
+import { status } from '../utils/status'
+import { UUIDT } from '../utils/utils'
+import { HogFunctionInvocationResult, HogFunctionType } from './types'
+
+export const BASE_REDIS_KEY = process.env.NODE_ENV == 'test' ? '@posthog-test/hog-watcher' : '@posthog/hog-watcher'
+const REDIS_KEY_TOKENS = `${BASE_REDIS_KEY}/tokens`
+const REDIS_KEY_DISABLED = `${BASE_REDIS_KEY}/disabled`
+const REDIS_KEY_DISABLED_HISTORY = `${BASE_REDIS_KEY}/disabled_history`
+const REDIS_TIMEOUT_SECONDS = 5
+
+// NOTE: We ideally would have this in a file but the current build step doesn't handle anything other than .ts files
+const LUA_TOKEN_BUCKET = `
+local key = KEYS[1]
+local now = ARGV[1]
+local cost = ARGV[2]
+local poolMax = ARGV[3]
+local fillRate = ARGV[4]
+local expiry = ARGV[5]
+local before = redis.call('hget', key, 'ts')
+
+-- If we don't have a timestamp then we set it to now and fill up the bucket
+if before == false then
+ local ret = poolMax - cost
+ redis.call('hset', key, 'ts', now)
+ redis.call('hset', key, 'pool', ret)
+ redis.call('expire', key, expiry)
+ return ret
+end
+
+-- We update the timestamp if it has changed
+local timeDiffSeconds = now - before
+
+if timeDiffSeconds > 0 then
+ redis.call('hset', key, 'ts', now)
+else
+ timeDiffSeconds = 0
+end
+
+-- Calculate how much should be refilled in the bucket and add it
+local owedTokens = timeDiffSeconds * fillRate
+local currentTokens = redis.call('hget', key, 'pool')
+
+if currentTokens == false then
+ currentTokens = poolMax
+end
+
+currentTokens = math.min(currentTokens + owedTokens, poolMax)
+
+-- Remove the cost and return the new number of tokens
+if currentTokens - cost >= 0 then
+ currentTokens = currentTokens - cost
+else
+ currentTokens = -1
+end
+
+redis.call('hset', key, 'pool', currentTokens)
+redis.call('expire', key, expiry)
+
+-- Finally return the value - if it's negative then we've hit the limit
+return currentTokens
+`
+
+export enum HogWatcherState {
+ healthy = 1,
+ degraded = 2,
+ disabledForPeriod = 3,
+ disabledIndefinitely = 4,
+}
+
+export type HogWatcherFunctionState = {
+ state: HogWatcherState
+ tokens: number
+ rating: number
+}
+
+type WithCheckRateLimit = {
+ checkRateLimit: (key: string, now: number, cost: number, poolMax: number, fillRate: number, expiry: number) => T
+}
+
+type HogWatcherRedisClientPipeline = Pipeline & WithCheckRateLimit
+
+type HogWatcherRedisClient = Omit &
+ WithCheckRateLimit> & {
+ pipeline: () => HogWatcherRedisClientPipeline
+ }
+
+export class HogWatcher {
+ constructor(private hub: Hub) {}
+
+ private rateLimitArgs(id: HogFunctionType['id'], cost: number) {
+ const nowSeconds = Math.round(now() / 1000)
+ return [
+ `${REDIS_KEY_TOKENS}/${id}`,
+ nowSeconds,
+ cost,
+ this.hub.CDP_WATCHER_BUCKET_SIZE,
+ this.hub.CDP_WATCHER_REFILL_RATE,
+ this.hub.CDP_WATCHER_TTL,
+ ] as const
+ }
+
+ private async runRedis(fn: (client: HogWatcherRedisClient) => Promise): Promise {
+ // We want all of this to fail open in the issue of redis being unavailable - we'd rather have the function continue
+ const client = await this.hub.redisPool.acquire()
+
+ client.defineCommand('checkRateLimit', {
+ numberOfKeys: 1,
+ lua: LUA_TOKEN_BUCKET,
+ })
+
+ const timeout = timeoutGuard(
+ `Redis call delayed. Waiting over ${REDIS_TIMEOUT_SECONDS} seconds.`,
+ undefined,
+ REDIS_TIMEOUT_SECONDS * 1000
+ )
+ try {
+ return await fn(client as HogWatcherRedisClient)
+ } catch (e) {
+ status.error('HogWatcher Redis error', e)
+ captureException(e)
+ return null
+ } finally {
+ clearTimeout(timeout)
+ await this.hub.redisPool.release(client)
+ }
+ }
+
+ public tokensToFunctionState(tokens?: number | null, stateOverride?: HogWatcherState): HogWatcherFunctionState {
+ tokens = tokens ?? this.hub.CDP_WATCHER_BUCKET_SIZE
+ const rating = tokens / this.hub.CDP_WATCHER_BUCKET_SIZE
+
+ const state =
+ stateOverride ??
+ (rating >= this.hub.CDP_WATCHER_THRESHOLD_DEGRADED
+ ? HogWatcherState.healthy
+ : rating > 0
+ ? HogWatcherState.degraded
+ : HogWatcherState.disabledForPeriod)
+
+ return { state, tokens, rating }
+ }
+
+ public async getStates(
+ ids: HogFunctionType['id'][]
+ ): Promise> {
+ const idsSet = new Set(ids)
+
+ const res = await this.runRedis(async (client) => {
+ const pipeline = client.pipeline()
+
+ for (const id of idsSet) {
+ pipeline.checkRateLimit(...this.rateLimitArgs(id, 0))
+ pipeline.get(`${REDIS_KEY_DISABLED}/${id}`)
+ pipeline.ttl(`${REDIS_KEY_DISABLED}/${id}`)
+ }
+
+ return pipeline.exec()
+ })
+
+ return Array.from(idsSet).reduce((acc, id, index) => {
+ const resIndex = index * 3
+ const tokens = res ? res[resIndex][1] : undefined
+ const disabled = res ? res[resIndex + 1][1] : false
+ const disabledTemporarily = disabled && res ? res[resIndex + 2][1] !== -1 : false
+
+ return {
+ ...acc,
+ [id]: this.tokensToFunctionState(
+ tokens,
+ disabled
+ ? disabledTemporarily
+ ? HogWatcherState.disabledForPeriod
+ : HogWatcherState.disabledIndefinitely
+ : undefined
+ ),
+ }
+ }, {} as Record)
+ }
+
+ public async getState(id: HogFunctionType['id']): Promise {
+ const res = await this.getStates([id])
+ return res[id]
+ }
+
+ public async forceStateChange(id: HogFunctionType['id'], state: HogWatcherState): Promise {
+ await this.runRedis(async (client) => {
+ const pipeline = client.pipeline()
+
+ const newScore =
+ state === HogWatcherState.healthy
+ ? this.hub.CDP_WATCHER_BUCKET_SIZE
+ : state === HogWatcherState.degraded
+ ? this.hub.CDP_WATCHER_BUCKET_SIZE * this.hub.CDP_WATCHER_THRESHOLD_DEGRADED
+ : 0
+
+ const nowSeconds = Math.round(now() / 1000)
+
+ pipeline.hset(`${REDIS_KEY_TOKENS}/${id}`, 'pool', newScore)
+ pipeline.hset(`${REDIS_KEY_TOKENS}/${id}`, 'ts', nowSeconds)
+
+ if (state === HogWatcherState.disabledForPeriod) {
+ pipeline.set(`${REDIS_KEY_DISABLED}/${id}`, '1', 'EX', this.hub.CDP_WATCHER_DISABLED_TEMPORARY_TTL)
+ } else if (state === HogWatcherState.disabledIndefinitely) {
+ pipeline.set(`${REDIS_KEY_DISABLED}/${id}`, '1')
+ } else {
+ pipeline.del(`${REDIS_KEY_DISABLED}/${id}`)
+ }
+
+ await pipeline.exec()
+ })
+ }
+
+ public async observeResults(results: HogFunctionInvocationResult[]): Promise {
+ const costs: Record = {}
+
+ results.forEach((result) => {
+ let cost = (costs[result.invocation.hogFunctionId] = costs[result.invocation.hogFunctionId] || 0)
+
+ if (result.finished) {
+ // If it is finished we can calculate the score based off of the timings
+
+ const totalDurationMs = result.invocation.timings.reduce((acc, timing) => acc + timing.duration_ms, 0)
+ const lowerBound = this.hub.CDP_WATCHER_COST_TIMING_LOWER_MS
+ const upperBound = this.hub.CDP_WATCHER_COST_TIMING_UPPER_MS
+ const costTiming = this.hub.CDP_WATCHER_COST_TIMING
+ const ratio = Math.max(totalDurationMs - lowerBound, 0) / (upperBound - lowerBound)
+
+ cost += Math.round(costTiming * ratio)
+ }
+
+ if (result.error) {
+ cost += this.hub.CDP_WATCHER_COST_ERROR
+ }
+
+ costs[result.invocation.hogFunctionId] = cost
+ })
+
+ const res = await this.runRedis(async (client) => {
+ const pipeline = client.pipeline()
+
+ Object.entries(costs).forEach(([id, change]) => {
+ pipeline.checkRateLimit(...this.rateLimitArgs(id, change))
+ })
+
+ return await pipeline.exec()
+ })
+
+ // TRICKY: the above part is straight forward - below is more complex as we do multiple calls to ensure
+ // that we disable the function temporarily and eventually permanently. As this is only called when the function
+ // transitions to a disabled state, it is not a performance concern.
+
+ const disabledFunctionIds = Object.entries(costs)
+ .filter((_, index) => (res ? res[index][1] <= 0 : false))
+ .map(([id]) => id)
+
+ if (disabledFunctionIds.length) {
+ // Mark them all as disabled in redis
+
+ const results = await this.runRedis(async (client) => {
+ const pipeline = client.pipeline()
+
+ disabledFunctionIds.forEach((id) => {
+ pipeline.set(
+ `${REDIS_KEY_DISABLED}/${id}`,
+ '1',
+ 'EX',
+ this.hub.CDP_WATCHER_DISABLED_TEMPORARY_TTL,
+ 'NX'
+ )
+ })
+
+ return pipeline.exec()
+ })
+
+ const functionsDisabled = disabledFunctionIds.filter((_, index) => (results ? results[index][1] : false))
+
+ if (!functionsDisabled.length) {
+ return
+ }
+
+ // We store the history as a zset - we can then use it to determine if we should disable indefinitely
+ const historyResults = await this.runRedis(async (client) => {
+ const pipeline = client.pipeline()
+ functionsDisabled.forEach((id) => {
+ const key = `${REDIS_KEY_DISABLED_HISTORY}/${id}`
+ pipeline.zadd(key, now(), new UUIDT().toString())
+ pipeline.zrange(key, 0, -1)
+ pipeline.expire(key, this.hub.CDP_WATCHER_TTL)
+ })
+
+ return await pipeline.exec()
+ })
+
+ const functionsToDisablePermanently = functionsDisabled.filter((_, index) => {
+ const history = historyResults ? historyResults[index * 3 + 1][1] : []
+ return history.length >= this.hub.CDP_WATCHER_DISABLED_TEMPORARY_MAX_COUNT
+ })
+
+ if (!functionsToDisablePermanently.length) {
+ return
+ }
+
+ await this.runRedis(async (client) => {
+ const pipeline = client.pipeline()
+ functionsToDisablePermanently.forEach((id) => {
+ const key = `${REDIS_KEY_DISABLED}/${id}`
+ pipeline.set(key, '1')
+ pipeline.del(`${REDIS_KEY_DISABLED_HISTORY}/${id}`)
+ })
+
+ return await pipeline.exec()
+ })
+ }
+ }
+}
diff --git a/plugin-server/src/cdp/hog-watcher/README.md b/plugin-server/src/cdp/hog-watcher/README.md
deleted file mode 100644
index 214cd41c9ec16..0000000000000
--- a/plugin-server/src/cdp/hog-watcher/README.md
+++ /dev/null
@@ -1,58 +0,0 @@
-# How this whole thing works
-
-The HogWatcher is a class that is responsible for monitoring the health of the hog functions.
-Generally we want to make "observations" about the health of a function and then based on those observations we can determine the "state" of the function.
-Observations are made per-consumer and then aggregated by the leader to determine the state of the function.
-
-Each Watcher only needs to worry about the current state of any functions it is processing. The observations are only really interesting to the leader, as it
-is the one that will be making the decisions about the state of the function.
-
-# Rating system
-
-We want to detect when a function has gone rogue and gradually stop it from running.
-We calculate its "rating" based on how many times it has succeeded and failed.
-
-- If the rating falls too low, over a period of time we want to move it to the overflow queue as a first step to ensure it doesn't hog resources.
-- If it stays too low, we eventually want to disable it for a period of time.
-- If it _still_ behaves poorly after this time period, we want to disable it indefinitely.
-
-This can be represented as a state for the function - 1. Healthy, 2. Overflowed, 3. Disabled for a period, 4. Disabled indefinitely.
-
-To be able to do this right we need to store an array of values for the functions rating over time that represent the last say 10 minutes.
-
-In addition we need to record the last N states of the function so that we can decide to disable it indefinitely if it has spent too much time in state 3
-
-- State 1:
- - If the rating average over the time period is below 0.5, move to state 2.
-- State 2:
- - If the rating average over the time period is above 0.5, move to state 1.
- - If the rating average over the time period is below 0.5 AND the function was in state 3 for more than N of the last states, move to state 4.
- - If the rating average over the time period is below 0.5, move to state 3.
-- State 3:
- - The function is disabled for a period of time (perhaps the same as the measuring period).
- - Once it is out of this masked period, move to state 2.
-- State 4:
- - The function is disabled and requires manual intervention
-
-# Leader specific work
-
-To simplify an already relatively complex concept, there is one leader who is responsible for making sure the persisted state is efficient and up-to-date.
-It is also responsible for calculating state changes, persisting them to redis and emitting to other consumers.
-
-The state is kept in one redis @hash with keys like this:
-
-```js
-{
- "states": `[["a", 1], ["b", 2]]`,
- "FUNCTION_ID:states": `[{ t: 1, s: 0.5 }]`,
- "FUNCTION_ID:ratings": `[{ t: 1, r: 0.9 }]`,
- "FUNCTION_ID:observation:observerID:periodTimestamp": `[{ s: 1, f: 2, as: 0, af: 1 }]`
-}
-```
-
-Whenever an observation is made, it is persisted to a temporary key `FUNCTION_ID:observation:observerID:timestamp` and emitted. This allows the leader to load it from redis on startup as well as react to the emitted value, whichever it does first.
-Periodically it merges all observations together and when a finally rating is calculated it persists just the rating (to save on space).
-
-At the same time as compacting the ratings, it checks for any state changes and updates the relevant state key.
-
-This is designed to keep the workers lightweight, only having to worry about their own observations and keeping a list of states in memory. Only the leader has to keep the whole object in memory.
diff --git a/plugin-server/src/cdp/hog-watcher/hog-watcher.ts b/plugin-server/src/cdp/hog-watcher/hog-watcher.ts
deleted file mode 100644
index 3b675a44d48a1..0000000000000
--- a/plugin-server/src/cdp/hog-watcher/hog-watcher.ts
+++ /dev/null
@@ -1,582 +0,0 @@
-import { randomUUID } from 'node:crypto'
-import { Counter } from 'prom-client'
-
-import { CdpConfig, Hub } from '../../types'
-import { PubSub } from '../../utils/pubsub'
-import { status } from '../../utils/status'
-import { HogFunctionInvocationAsyncResponse, HogFunctionInvocationResult, HogFunctionType } from '../types'
-import {
- EmittedHogWatcherObservations,
- EmittedHogWatcherStates,
- HogWatcherGlobalState,
- HogWatcherObservationPeriod,
- HogWatcherObservationPeriodWithInstanceId,
- HogWatcherRatingPeriod,
- HogWatcherState,
- HogWatcherStatePeriod,
- HogWatcherSummary,
-} from './types'
-import {
- BASE_REDIS_KEY,
- calculateRating,
- deriveCurrentStateFromRatings,
- last,
- periodTimestamp,
- runRedis,
- stripFalsey,
-} from './utils'
-
-const REDIS_KEY_STATE = `${BASE_REDIS_KEY}/state`
-
-const hogStateChangeCounter = new Counter({
- name: 'cdp_hog_watcher_state_change',
- help: 'An function was moved to a different state',
- labelNames: ['state'],
-})
-
-export class HogWatcherActiveObservations {
- observations: Record = {}
-
- constructor(private config: CdpConfig) {}
-
- private addObservations(
- id: HogFunctionType['id'],
- incrs: Pick<
- Partial,
- 'successes' | 'failures' | 'asyncFunctionFailures' | 'asyncFunctionSuccesses'
- >
- ): void {
- if (!this.observations[id]) {
- this.observations[id] = {
- timestamp: periodTimestamp(this.config),
- successes: 0,
- failures: 0,
- asyncFunctionFailures: 0,
- asyncFunctionSuccesses: 0,
- }
- }
-
- this.observations[id].successes += incrs.successes ?? 0
- this.observations[id].failures += incrs.failures ?? 0
- this.observations[id].asyncFunctionFailures += incrs.asyncFunctionFailures ?? 0
- this.observations[id].asyncFunctionSuccesses += incrs.asyncFunctionSuccesses ?? 0
- }
-
- observeResults(results: HogFunctionInvocationResult[]) {
- results.forEach((result) =>
- this.addObservations(result.invocation.hogFunctionId, {
- successes: result.finished ? 1 : 0,
- failures: result.error ? 1 : 0,
- })
- )
- }
-
- observeAsyncFunctionResponses(responses: HogFunctionInvocationAsyncResponse[]) {
- // NOTE: This probably wants to be done using the response status instead :thinking:
- responses.forEach((response) =>
- this.addObservations(response.hogFunctionId, {
- asyncFunctionSuccesses: response.asyncFunctionResponse.error ? 0 : 1,
- asyncFunctionFailures: response.asyncFunctionResponse.error ? 1 : 0,
- })
- )
- }
-}
-
-export class HogWatcher {
- public readonly currentObservations: HogWatcherActiveObservations
- public states: Record = {}
- private queuedManualStates: Record = {}
-
- // Only set if we are the leader
- public globalState?: HogWatcherGlobalState
- // Only the leader should be able to write to the states
- public isLeader: boolean = false
- private pubSub: PubSub
- private instanceId: string
- private syncTimer?: NodeJS.Timeout
-
- constructor(private hub: Hub) {
- this.currentObservations = new HogWatcherActiveObservations(hub)
-
- this.instanceId = randomUUID()
- this.pubSub = new PubSub(hub, {
- 'hog-watcher-states': (message) => {
- const { states }: EmittedHogWatcherStates = JSON.parse(message)
-
- this.states = {
- ...this.states,
- ...states,
- }
- },
-
- 'hog-watcher-observations': (message) => {
- // We only care about observations from other instances if we have a global state already loaded
- if (!this.globalState) {
- return
- }
-
- const { instanceId, observations }: EmittedHogWatcherObservations = JSON.parse(message)
-
- observations.forEach(({ id, observation }) => {
- const items = (this.globalState!.observations[id] = this.globalState!.observations[id] ?? [])
- items.push({
- ...observation,
- instanceId: instanceId,
- })
- })
- },
-
- 'hog-watcher-user-state-change': (message) => {
- if (!this.isLeader) {
- return
- }
-
- const { states }: EmittedHogWatcherStates = JSON.parse(message)
-
- Object.entries(states).forEach(([id, state]) => {
- this.queuedManualStates[id] = state
- })
-
- void this.syncLoop()
- },
- })
- }
-
- async start() {
- await this.pubSub.start()
-
- // Get the initial state of the watcher
- await this.syncStates()
-
- if (process.env.NODE_ENV === 'test') {
- // Not setting up loop in test mode
- return
- }
-
- await this.syncLoop()
- }
-
- async stop() {
- await this.pubSub.stop()
-
- if (this.syncTimer) {
- clearTimeout(this.syncTimer)
- }
- if (!this.isLeader) {
- return
- }
-
- await runRedis(this.hub.redisPool, 'stop', async (client) => {
- return client.del(`${BASE_REDIS_KEY}/leader`)
- })
-
- await this.flushActiveObservations()
- }
-
- public getFunctionState(id: HogFunctionType['id']): HogWatcherState {
- return this.states[id] ?? HogWatcherState.healthy
- }
-
- private async checkIsLeader() {
- const leaderId = await runRedis(this.hub.redisPool, 'getLeader', async (client) => {
- // Set the leader to this instance if it is not set and add an expiry to it of twice our observation period
- const pipeline = client.pipeline()
-
- // TODO: This can definitely be done in a single command - just need to make sure the ttl is always extended if the ID is the same
-
- pipeline.set(
- `${BASE_REDIS_KEY}/leader`,
- this.instanceId,
- 'NX',
- // @ts-expect-error - IORedis types don't allow for NX and EX in the same command
- 'EX',
- (this.hub.CDP_WATCHER_OBSERVATION_PERIOD * 3) / 1000
- )
- pipeline.get(`${BASE_REDIS_KEY}/leader`)
- const [_, res] = await pipeline.exec()
-
- // NOTE: IORedis types don't allow for NX and GET in the same command so we have to cast it to any
- return res[1] as string
- })
-
- this.isLeader = leaderId === this.instanceId
-
- if (this.isLeader) {
- status.info('👀', '[HogWatcher] I am the leader')
- }
- }
-
- public syncLoop = async () => {
- clearTimeout(this.syncTimer)
- try {
- await this.sync()
- } finally {
- this.syncTimer = setTimeout(() => this.syncLoop(), this.hub.CDP_WATCHER_OBSERVATION_PERIOD)
- }
- }
-
- public async sync() {
- await this.checkIsLeader()
- await this.flushActiveObservations()
-
- if (this.isLeader) {
- await this.syncState()
- } else {
- // Clear any states that are only relevant to the leader
- this.globalState = undefined
- this.queuedManualStates = {}
- }
- }
-
- private async flushActiveObservations() {
- const changes: EmittedHogWatcherObservations = {
- instanceId: this.instanceId,
- observations: [],
- }
-
- const period = periodTimestamp(this.hub)
-
- Object.entries(this.currentObservations.observations).forEach(([id, observation]) => {
- if (observation.timestamp !== period) {
- changes.observations.push({ id, observation })
- delete this.currentObservations.observations[id]
- }
- })
-
- if (!changes.observations.length) {
- return
- }
-
- // Write all the info to redis
- await runRedis(this.hub.redisPool, 'syncWithRedis', async (client) => {
- const pipeline = client.pipeline()
-
- changes.observations.forEach(({ id, observation }) => {
- // We key the observations by observerId and timestamp with a ttl of the max period we want to keep the data for
- const subKey = `observation:${id}:${this.instanceId}:${observation.timestamp}`
- pipeline.hset(REDIS_KEY_STATE, subKey, Serializer.serializeObservation(observation))
- })
-
- return pipeline.exec()
- })
-
- // Now we can emit to the others so they can update their state
- await this.pubSub.publish('hog-watcher-observations', JSON.stringify(changes))
- }
-
- private async syncState() {
- // Flushing states involves a couple of things and is only done by the leader to avoid clashes
-
- // 1. Prune old states that are no longer relevant (we only keep the last N states)
- // 2. Calculate the state for each function based on their existing observations and previous states
- // 3. If the state has changed, write it to redis and emit it to the others
-
- if (!this.isLeader) {
- status.warn('👀', '[HogWatcher] Only the leader can flush states')
- return
- }
-
- const globalState = (this.globalState = this.globalState ?? (await this.fetchState()))
-
- const stateChanges: EmittedHogWatcherStates = {
- instanceId: this.instanceId,
- states: {},
- }
-
- // We want to gather all observations that are at least 1 period older than the current period
- // That gives enough time for each worker to have pushed out their observations
-
- const period = periodTimestamp(this.hub)
- const keysToRemove: string[] = []
- const changedHogFunctionRatings = new Set()
- const RATINGS_PERIOD_MASK = this.hub.CDP_WATCHER_OBSERVATION_PERIOD * 2
-
- // Group the observations by functionId and timestamp and generate their rating
- Object.entries(globalState.observations).forEach(([id, observations]) => {
- const groupedByTimestamp: Record = {}
- const [oldEnoughObservations, others] = observations.reduce(
- (acc, observation) => {
- if (observation.timestamp <= period - RATINGS_PERIOD_MASK) {
- // Add the key to be removed from redis later
- keysToRemove.push(`observation:${id}:${observation.instanceId}:${observation.timestamp}`)
- acc[0].push(observation)
- } else {
- acc[1].push(observation)
- }
- return acc
- },
- [[], []] as [HogWatcherObservationPeriodWithInstanceId[], HogWatcherObservationPeriodWithInstanceId[]]
- )
-
- // Keep only the observations that aren't ready to be persisted
- if (others.length) {
- globalState.observations[id] = others
- } else {
- delete globalState.observations[id]
- }
-
- // Group them all by timestamp to generate a new rating
- oldEnoughObservations.forEach((observation) => {
- const key = `${id}:${observation.timestamp}`
- groupedByTimestamp[key] = groupedByTimestamp[key] ?? {
- timestamp: observation.timestamp,
- successes: 0,
- failures: 0,
- asyncFunctionSuccesses: 0,
- asyncFunctionFailures: 0,
- }
- groupedByTimestamp[key].successes += observation.successes
- groupedByTimestamp[key].failures += observation.failures
- groupedByTimestamp[key].asyncFunctionSuccesses += observation.asyncFunctionSuccesses
- groupedByTimestamp[key].asyncFunctionFailures += observation.asyncFunctionFailures
- })
-
- Object.entries(groupedByTimestamp).forEach(([_, observation]) => {
- const rating = calculateRating(observation)
- globalState.ratings[id] = globalState.ratings[id] ?? []
- globalState.ratings[id].push({ timestamp: observation.timestamp, rating: rating })
- globalState.ratings[id] = globalState.ratings[id].slice(-this.hub.CDP_WATCHER_MAX_RECORDED_RATINGS)
-
- changedHogFunctionRatings.add(id)
- })
- })
-
- const transitionToState = (id: HogFunctionType['id'], newState: HogWatcherState) => {
- const state: HogWatcherStatePeriod = {
- timestamp: periodTimestamp(this.hub),
- state: newState,
- }
-
- globalState.states[id] = globalState.states[id] ?? []
- globalState.states[id].push(state)
- globalState.states[id] = globalState.states[id].slice(-this.hub.CDP_WATCHER_MAX_RECORDED_STATES)
- stateChanges.states[id] = newState
- hogStateChangeCounter.inc({ state: newState })
- }
-
- changedHogFunctionRatings.forEach((id) => {
- // Build the new ratings to be written
- // Check if the state has changed and if so add it to the list of changes
- const newRatings = globalState.ratings[id]
- const currentState = last(globalState.states[id])?.state ?? HogWatcherState.healthy
- const newState = deriveCurrentStateFromRatings(this.hub, newRatings, globalState.states[id] ?? [])
-
- if (currentState !== newState) {
- transitionToState(id, newState)
- // Extra logging to help debugging:
-
- status.info('👀', `[HogWatcher] Function ${id} changed state`, {
- oldState: currentState,
- newState: newState,
- ratings: newRatings,
- })
- }
- })
-
- // In addition we need to check temporarily disabled functions and move them back to overflow if they are behaving well
- Object.entries(globalState.states).forEach(([id, states]) => {
- const currentState = last(states)?.state
- if (currentState === HogWatcherState.disabledForPeriod) {
- // Also check the state change here
- const newState = deriveCurrentStateFromRatings(this.hub, globalState.ratings[id] ?? [], states)
-
- if (newState !== currentState) {
- transitionToState(id, newState)
- }
- }
- })
-
- // Finally we make sure any manual changes that came in are applied
- Object.entries(this.queuedManualStates).forEach(([id, state]) => {
- transitionToState(id, state)
- delete this.queuedManualStates[id]
- })
-
- if (!changedHogFunctionRatings.size && !Object.keys(stateChanges.states).length) {
- // Nothing to do
- return
- }
-
- if (Object.keys(stateChanges.states).length) {
- status.info('👀', '[HogWatcher] Functions changed state', {
- changes: stateChanges,
- })
- }
-
- // Finally write the state summary
- const states: Record = Object.fromEntries(
- Object.entries(globalState.states).map(([id, states]) => [id, last(states)!.state])
- )
-
- // Finally we write the changes to redis and emit them to the others
- await runRedis(this.hub.redisPool, 'syncWithRedis', async (client) => {
- const pipeline = client.pipeline()
-
- // Remove old observations
- keysToRemove.forEach((key) => {
- pipeline.hdel(REDIS_KEY_STATE, key)
- })
-
- // Write the new ratings
- changedHogFunctionRatings.forEach((id) => {
- const ratings = globalState.ratings[id] ?? []
- pipeline.hset(REDIS_KEY_STATE, `ratings:${id}`, Serializer.serializeRatings(ratings))
- })
-
- Object.keys(stateChanges.states).forEach((id) => {
- const states = globalState.states[id] ?? []
- pipeline.hset(REDIS_KEY_STATE, `states:${id}`, Serializer.serializeStates(states))
- })
-
- // Write the new states
- pipeline.hset(REDIS_KEY_STATE, 'states', Serializer.serializeAllStates(states))
-
- return pipeline.exec()
- })
-
- // // Now we can emit to the others so they can update their state
- await this.pubSub.publish('hog-watcher-states', JSON.stringify(stateChanges))
- }
-
- async syncStates(): Promise> {
- const res = await runRedis(this.hub.redisPool, 'fetchWatcher', async (client) => {
- return client.hget(REDIS_KEY_STATE, 'states')
- })
-
- this.states = res ? Serializer.deserializeAllStates(res) : {}
-
- return this.states
- }
-
- /**
- * Fetch the summary for HogFunction (used by the UI, hence no caching)
- */
- async fetchWatcher(id: HogFunctionType['id']): Promise {
- const [statesStr, ratingsStr] = await runRedis(this.hub.redisPool, 'fetchWatcher', async (client) => {
- return client.hmget(REDIS_KEY_STATE, `states:${id}`, `ratings:${id}`)
- })
-
- const states: HogWatcherStatePeriod[] = statesStr ? Serializer.deserializeStates(statesStr) : []
- const ratings: HogWatcherRatingPeriod[] = ratingsStr ? Serializer.deserializeRatings(ratingsStr) : []
-
- return {
- state: last(states)?.state ?? HogWatcherState.healthy,
- states: states,
- ratings: ratings,
- }
- }
-
- async forceStateChange(id: HogFunctionType['id'], state: HogWatcherState): Promise {
- // Ensure someone is the leader
- await this.checkIsLeader()
- const changes: EmittedHogWatcherStates = {
- instanceId: this.instanceId,
- states: {
- [id]: state,
- },
- }
-
- await this.pubSub.publish('hog-watcher-user-state-change', JSON.stringify(changes))
- }
-
- /**
- * Fetch the entire state object parsing into a usable object
- */
- async fetchState(): Promise {
- const redisState = await runRedis(this.hub.redisPool, 'fetchWatcher', async (client) => {
- return client.hgetall(REDIS_KEY_STATE)
- })
-
- return Serializer.deserializeGlobalState(redisState)
- }
-}
-
-class Serializer {
- // Serializer to help parsing back and forth to redis - mostly focused on reducing the size of the stored values
-
- static deserializeGlobalState(redisState: Record): HogWatcherGlobalState {
- const response: HogWatcherGlobalState = {
- states: {},
- ratings: {},
- observations: {},
- }
-
- Object.entries(redisState).forEach(([key, value]) => {
- const [kind, id, ...rest] = key.split(':')
- if (kind === 'states' && id) {
- response.states[id] = this.deserializeStates(value)
- } else if (kind === 'ratings') {
- response.ratings[id] = this.deserializeRatings(value)
- } else if (kind === 'observation') {
- const [instanceId, timestamp] = rest
- const partial = this.deserializeObservation(value)
- const observations: HogWatcherObservationPeriodWithInstanceId[] = (response.observations[id] =
- response.observations[id] ?? [])
-
- observations.push({
- ...partial,
- instanceId: instanceId,
- timestamp: parseInt(timestamp),
- })
- } else if (kind === 'states') {
- // We can ignore this as it is the global state
- } else {
- status.warn('👀', `Unknown key kind ${kind} in fetchState`)
- }
- })
-
- return response
- }
-
- static serializeAllStates(val: Record): string {
- const obj = Object.entries(val).map(([id, state]) => [id, state])
- return JSON.stringify(obj)
- }
-
- static deserializeAllStates(val: string): Record {
- const obj: (string | HogWatcherState)[][] = JSON.parse(val)
- return Object.fromEntries(obj)
- }
-
- static serializeStates(val: HogWatcherStatePeriod[]): string {
- const obj = val.map((x) => ({ t: x.timestamp, s: x.state }))
- return JSON.stringify(obj)
- }
-
- static deserializeStates(val: string): HogWatcherStatePeriod[] {
- const obj = JSON.parse(val)
- return obj.map((x: { t: number; s: HogWatcherState }) => ({ timestamp: x.t, state: x.s }))
- }
-
- static serializeRatings(val: HogWatcherRatingPeriod[]): string {
- const obj = val.map((x) => ({ t: x.timestamp, r: x.rating }))
- return JSON.stringify(obj)
- }
-
- static deserializeRatings(val: string): HogWatcherRatingPeriod[] {
- const obj = JSON.parse(val)
- return obj.map((x: { t: number; r: number }) => ({ timestamp: x.t, rating: x.r }))
- }
-
- static serializeObservation(val: HogWatcherObservationPeriod): string {
- const obj = stripFalsey({
- t: val.timestamp,
- s: val.successes,
- f: val.failures,
- af: val.asyncFunctionFailures,
- as: val.asyncFunctionSuccesses,
- })
- return JSON.stringify(obj)
- }
-
- static deserializeObservation(val: string): HogWatcherObservationPeriod {
- const obj = JSON.parse(val)
- return {
- timestamp: obj.t,
- successes: obj.s ?? 0,
- failures: obj.f ?? 0,
- asyncFunctionFailures: obj.af ?? 0,
- asyncFunctionSuccesses: obj.as ?? 0,
- }
- }
-}
diff --git a/plugin-server/src/cdp/hog-watcher/types.ts b/plugin-server/src/cdp/hog-watcher/types.ts
deleted file mode 100644
index e145f0d0dff2d..0000000000000
--- a/plugin-server/src/cdp/hog-watcher/types.ts
+++ /dev/null
@@ -1,61 +0,0 @@
-import { HogFunctionType } from '../types'
-
-export enum HogWatcherState {
- healthy = 1,
- overflowed = 2,
- disabledForPeriod = 3,
- disabledIndefinitely = 4,
-}
-
-export type HogWatcherStatePeriod = {
- timestamp: number
- state: HogWatcherState
-}
-
-export type HogWatcherRatingPeriod = {
- timestamp: number
- rating: number
-}
-
-export type HogWatcherObservationPeriod = {
- timestamp: number
- successes: number
- failures: number
- asyncFunctionFailures: number
- asyncFunctionSuccesses: number
-}
-
-export type HogWatcherObservationPeriodWithInstanceId = HogWatcherObservationPeriod & {
- instanceId: string
-}
-
-export type HogWatcherSummary = {
- state: HogWatcherState
- states: HogWatcherStatePeriod[]
- ratings: HogWatcherRatingPeriod[]
-}
-
-export type EmittedHogWatcherObservations = {
- instanceId: string
- observations: {
- id: HogFunctionType['id']
- observation: HogWatcherObservationPeriod
- }[]
-}
-
-export type EmittedHogWatcherStates = {
- instanceId: string
- states: {
- [key: HogFunctionType['id']]: HogWatcherState
- }
-}
-
-// Deserialized version of what is stored in redis
-export type HogWatcherGlobalState = {
- /** Summary of all state history for every function */
- states: Record
- /** Summary of all rating history for all functions */
- ratings: Record
- /** All in progress observations that have not been serialized into ratings */
- observations: Record
-}
diff --git a/plugin-server/src/cdp/hog-watcher/utils.ts b/plugin-server/src/cdp/hog-watcher/utils.ts
deleted file mode 100644
index f5c372c39e0ea..0000000000000
--- a/plugin-server/src/cdp/hog-watcher/utils.ts
+++ /dev/null
@@ -1,129 +0,0 @@
-import { Redis } from 'ioredis'
-
-import { CdpConfig, RedisPool } from '../../types'
-import { timeoutGuard } from '../../utils/db/utils'
-import { now } from '../../utils/now'
-import { HogWatcherObservationPeriod, HogWatcherRatingPeriod, HogWatcherState, HogWatcherStatePeriod } from './types'
-
-const REDIS_TIMEOUT_SECONDS = 5
-export const BASE_REDIS_KEY = process.env.NODE_ENV == 'test' ? '@posthog-test/hog-watcher' : '@posthog/hog-watcher'
-
-export const calculateRating = (observation: HogWatcherObservationPeriod): number => {
- // Rating is from 0 to 1
- // 1 - Function is working perfectly
- // 0 - Function is not working at all
-
- // NOTE: Once we have proper async function support we should likely change the rating system to penalize slow requests more
- // Also the function timing out should be penalized heavily as it indicates bad code (infinite loops etc.)
-
- const totalInvocations = observation.successes + observation.failures
- const totalAsyncInvocations = observation.asyncFunctionSuccesses + observation.asyncFunctionFailures
- const successRate = totalInvocations ? observation.successes / totalInvocations : 1
- const asyncSuccessRate = totalAsyncInvocations ? observation.asyncFunctionSuccesses / totalAsyncInvocations : 1
-
- return Math.min(1, successRate, asyncSuccessRate)
-}
-
-export const periodTimestamp = (config: CdpConfig, timestamp?: number): number => {
- // Returns the timestamp but rounded to the nearest period (e.g. 1 minute)
- const period = config.CDP_WATCHER_OBSERVATION_PERIOD
- return Math.floor((timestamp ?? now()) / period) * period
-}
-
-/**
- * Calculate what the state should be based on the previous rating and states
- */
-export const deriveCurrentStateFromRatings = (
- config: CdpConfig,
- ratings: HogWatcherRatingPeriod[],
- states: HogWatcherStatePeriod[]
-): HogWatcherState => {
- const {
- CDP_WATCHER_OBSERVATION_PERIOD,
- CDP_WATCHER_MAX_RECORDED_RATINGS,
- CDP_WATCHER_DISABLED_PERIOD,
- CDP_WATCHER_MIN_OBSERVATIONS,
- CDP_WATCHER_OVERFLOW_RATING_THRESHOLD,
- CDP_WATCHER_DISABLED_RATING_THRESHOLD,
- CDP_WATCHER_MAX_ALLOWED_TEMPORARY_DISABLED,
- } = config
- const currentState = states[states.length - 1] ?? {
- // Set the timestamp back far enough that all ratings are included
- timestamp: now() - CDP_WATCHER_OBSERVATION_PERIOD * CDP_WATCHER_MAX_RECORDED_RATINGS,
- state: HogWatcherState.healthy,
- }
-
- if (currentState.state === HogWatcherState.disabledIndefinitely) {
- return HogWatcherState.disabledIndefinitely
- }
-
- // If we are disabled for a period then we only check if it should no longer be disabled
- if (currentState.state === HogWatcherState.disabledForPeriod) {
- if (now() - currentState.timestamp > CDP_WATCHER_DISABLED_PERIOD) {
- return HogWatcherState.overflowed
- }
- }
-
- const ratingsSinceLastState = ratings.filter((x) => x.timestamp >= currentState.timestamp)
-
- if (ratingsSinceLastState.length < CDP_WATCHER_MIN_OBSERVATIONS) {
- // We need to give the function a chance to run before we can evaluate it
- return currentState.state
- }
-
- const averageRating = ratingsSinceLastState.reduce((acc, x) => acc + x.rating, 0) / ratingsSinceLastState.length
-
- if (currentState.state === HogWatcherState.overflowed) {
- if (averageRating > CDP_WATCHER_OVERFLOW_RATING_THRESHOLD) {
- // The function is behaving well again - move it to healthy
- return HogWatcherState.healthy
- }
-
- if (averageRating < CDP_WATCHER_DISABLED_RATING_THRESHOLD) {
- // The function is behaving worse than overflow can accept - disable it
- const disabledStates = states.filter((x) => x.state === HogWatcherState.disabledForPeriod)
-
- if (disabledStates.length >= CDP_WATCHER_MAX_ALLOWED_TEMPORARY_DISABLED) {
- // this function has spent half of the time in temporary disabled so we disable it indefinitely
- return HogWatcherState.disabledIndefinitely
- }
-
- return HogWatcherState.disabledForPeriod
- }
- }
-
- if (currentState.state === HogWatcherState.healthy) {
- if (averageRating < CDP_WATCHER_OVERFLOW_RATING_THRESHOLD) {
- return HogWatcherState.overflowed
- }
- }
-
- return currentState.state
-}
-
-export async function runRedis(
- redisPool: RedisPool,
- description: string,
- fn: (client: Redis) => Promise
-): Promise {
- const client = await redisPool.acquire()
- const timeout = timeoutGuard(
- `${description} delayed. Waiting over ${REDIS_TIMEOUT_SECONDS} seconds.`,
- undefined,
- REDIS_TIMEOUT_SECONDS * 1000
- )
- try {
- return await fn(client)
- } finally {
- clearTimeout(timeout)
- await redisPool.release(client)
- }
-}
-
-export function last(array?: T[]): T | undefined {
- return array?.[array?.length - 1]
-}
-
-export function stripFalsey(obj: T): Partial {
- return Object.fromEntries(Object.entries(obj).filter(([, value]) => value)) as Partial
-}
diff --git a/plugin-server/src/config/config.ts b/plugin-server/src/config/config.ts
index 7057d1818bd55..bb8ac06110198 100644
--- a/plugin-server/src/config/config.ts
+++ b/plugin-server/src/config/config.ts
@@ -176,14 +176,16 @@ export function getDefaultConfig(): PluginsServerConfig {
SESSION_RECORDING_KAFKA_CONSUMPTION_STATISTICS_EVENT_INTERVAL_MS: 0, // 0 disables stats collection
SESSION_RECORDING_KAFKA_FETCH_MIN_BYTES: 1_048_576, // 1MB
// CDP
- CDP_WATCHER_OBSERVATION_PERIOD: 10000,
- CDP_WATCHER_DISABLED_PERIOD: 1000 * 60 * 10,
- CDP_WATCHER_MAX_RECORDED_STATES: 10,
- CDP_WATCHER_MAX_RECORDED_RATINGS: 10,
- CDP_WATCHER_MAX_ALLOWED_TEMPORARY_DISABLED: 3,
- CDP_WATCHER_MIN_OBSERVATIONS: 3,
- CDP_WATCHER_OVERFLOW_RATING_THRESHOLD: 0.8,
- CDP_WATCHER_DISABLED_RATING_THRESHOLD: 0.5,
+ CDP_WATCHER_COST_ERROR: 100,
+ CDP_WATCHER_COST_TIMING: 20,
+ CDP_WATCHER_COST_TIMING_LOWER_MS: 100,
+ CDP_WATCHER_COST_TIMING_UPPER_MS: 5000,
+ CDP_WATCHER_THRESHOLD_DEGRADED: 0.8,
+ CDP_WATCHER_BUCKET_SIZE: 10000,
+ CDP_WATCHER_DISABLED_TEMPORARY_TTL: 60 * 10, // 5 minutes
+ CDP_WATCHER_TTL: 60 * 60 * 24, // This is really long as it is essentially only important to make sure the key is eventually deleted
+ CDP_WATCHER_REFILL_RATE: 10,
+ CDP_WATCHER_DISABLED_TEMPORARY_MAX_COUNT: 3,
CDP_ASYNC_FUNCTIONS_RUSTY_HOOK_TEAMS: '',
}
}
diff --git a/plugin-server/src/types.ts b/plugin-server/src/types.ts
index 92ec13670deed..2af0ef52b84b4 100644
--- a/plugin-server/src/types.ts
+++ b/plugin-server/src/types.ts
@@ -96,14 +96,16 @@ export const stringToPluginServerMode = Object.fromEntries(
) as Record
export type CdpConfig = {
- CDP_WATCHER_OBSERVATION_PERIOD: number
- CDP_WATCHER_DISABLED_PERIOD: number
- CDP_WATCHER_MAX_RECORDED_STATES: number
- CDP_WATCHER_MAX_RECORDED_RATINGS: number
- CDP_WATCHER_MAX_ALLOWED_TEMPORARY_DISABLED: number
- CDP_WATCHER_MIN_OBSERVATIONS: number
- CDP_WATCHER_OVERFLOW_RATING_THRESHOLD: number
- CDP_WATCHER_DISABLED_RATING_THRESHOLD: number
+ CDP_WATCHER_COST_ERROR: number // The max cost of an erroring function
+ CDP_WATCHER_COST_TIMING: number // The max cost of a slow function
+ CDP_WATCHER_COST_TIMING_LOWER_MS: number // The lower bound in ms where the timing cost is not incurred
+ CDP_WATCHER_COST_TIMING_UPPER_MS: number // The upper bound in ms where the timing cost is fully incurred
+ CDP_WATCHER_THRESHOLD_DEGRADED: number // Percentage of the bucket where we count it as degraded
+ CDP_WATCHER_BUCKET_SIZE: number // The total bucket size
+ CDP_WATCHER_TTL: number // The expiry for the rate limit key
+ CDP_WATCHER_REFILL_RATE: number // The number of tokens to be refilled per second
+ CDP_WATCHER_DISABLED_TEMPORARY_TTL: number // How long a function should be temporarily disabled for
+ CDP_WATCHER_DISABLED_TEMPORARY_MAX_COUNT: number // How many times a function can be disabled before it is disabled permanently
CDP_ASYNC_FUNCTIONS_RUSTY_HOOK_TEAMS: string
}
diff --git a/plugin-server/tests/cdp/hog-watcher.test.ts b/plugin-server/tests/cdp/hog-watcher.test.ts
new file mode 100644
index 0000000000000..fddbb0fc7e677
--- /dev/null
+++ b/plugin-server/tests/cdp/hog-watcher.test.ts
@@ -0,0 +1,268 @@
+jest.mock('../../src/utils/now', () => {
+ return {
+ now: jest.fn(() => Date.now()),
+ }
+})
+import { BASE_REDIS_KEY, HogWatcher, HogWatcherState } from '../../src/cdp/hog-watcher'
+import { HogFunctionInvocationResult } from '../../src/cdp/types'
+import { Hub } from '../../src/types'
+import { createHub } from '../../src/utils/db/hub'
+import { delay } from '../../src/utils/utils'
+import { deleteKeysWithPrefix } from '../helpers/redis'
+
+const mockNow: jest.Mock = require('../../src/utils/now').now as any
+
+const createResult = (options: {
+ id: string
+ duration?: number
+ finished?: boolean
+ error?: string
+}): HogFunctionInvocationResult => {
+ return {
+ invocation: {
+ id: 'invocation-id',
+ teamId: 2,
+ hogFunctionId: options.id,
+ globals: {} as any,
+ timings: [
+ {
+ kind: 'async_function',
+ duration_ms: options.duration ?? 0,
+ },
+ ],
+ },
+ finished: options.finished ?? true,
+ error: options.error,
+ logs: [],
+ }
+}
+
+describe('HogWatcher', () => {
+ describe('integration', () => {
+ let now: number
+ let hub: Hub
+ let closeHub: () => Promise
+ let watcher: HogWatcher
+
+ beforeEach(async () => {
+ ;[hub, closeHub] = await createHub()
+
+ now = 1720000000000
+ mockNow.mockReturnValue(now)
+
+ await deleteKeysWithPrefix(hub.redisPool, BASE_REDIS_KEY)
+
+ watcher = new HogWatcher(hub)
+ })
+
+ const advanceTime = (ms: number) => {
+ now += ms
+ mockNow.mockReturnValue(now)
+ }
+
+ afterEach(async () => {
+ jest.useRealTimers()
+ await closeHub()
+ jest.clearAllMocks()
+ })
+
+ it('should retrieve empty state', async () => {
+ const res = await watcher.getStates(['id1', 'id2'])
+ expect(res).toMatchInlineSnapshot(`
+ Object {
+ "id1": Object {
+ "rating": 1,
+ "state": 1,
+ "tokens": 10000,
+ },
+ "id2": Object {
+ "rating": 1,
+ "state": 1,
+ "tokens": 10000,
+ },
+ }
+ `)
+ })
+
+ const cases: [{ cost: number; state: number }, HogFunctionInvocationResult[]][] = [
+ [{ cost: 0, state: 1 }, [createResult({ id: 'id1' })]],
+ [
+ { cost: 0, state: 1 },
+ [createResult({ id: 'id1' }), createResult({ id: 'id1' }), createResult({ id: 'id1' })],
+ ],
+ [
+ { cost: 0, state: 1 },
+ [
+ createResult({ id: 'id1', duration: 10 }),
+ createResult({ id: 'id1', duration: 20 }),
+ createResult({ id: 'id1', duration: 100 }),
+ ],
+ ],
+ [
+ { cost: 12, state: 1 },
+ [
+ createResult({ id: 'id1', duration: 1000 }),
+ createResult({ id: 'id1', duration: 1000 }),
+ createResult({ id: 'id1', duration: 1000 }),
+ ],
+ ],
+ [{ cost: 20, state: 1 }, [createResult({ id: 'id1', duration: 5000 })]],
+ [{ cost: 40, state: 1 }, [createResult({ id: 'id1', duration: 10000 })]],
+ [
+ { cost: 141, state: 1 },
+ [
+ createResult({ id: 'id1', duration: 5000 }),
+ createResult({ id: 'id1', duration: 10000 }),
+ createResult({ id: 'id1', duration: 20000 }),
+ ],
+ ],
+
+ [{ cost: 100, state: 1 }, [createResult({ id: 'id1', error: 'errored!' })]],
+ ]
+
+ it.each(cases)('should update tokens based on results %s %s', async (expectedScore, results) => {
+ await watcher.observeResults(results)
+ const result = await watcher.getState('id1')
+
+ expect(hub.CDP_WATCHER_BUCKET_SIZE - result.tokens).toEqual(expectedScore.cost)
+ expect(result.state).toEqual(expectedScore.state)
+ })
+
+ it('should max out scores', async () => {
+ let lotsOfResults = Array(10000).fill(createResult({ id: 'id1', error: 'error!' }))
+
+ await watcher.observeResults(lotsOfResults)
+
+ expect(await watcher.getState('id1')).toMatchInlineSnapshot(`
+ Object {
+ "rating": -0.0001,
+ "state": 3,
+ "tokens": -1,
+ }
+ `)
+
+ lotsOfResults = Array(10000).fill(createResult({ id: 'id2' }))
+
+ await watcher.observeResults(lotsOfResults)
+
+ expect(await watcher.getState('id2')).toMatchInlineSnapshot(`
+ Object {
+ "rating": 1,
+ "state": 1,
+ "tokens": 10000,
+ }
+ `)
+ })
+
+ it('should refill over time', async () => {
+ hub.CDP_WATCHER_REFILL_RATE = 10
+ await watcher.observeResults([
+ createResult({ id: 'id1', duration: 10000 }),
+ createResult({ id: 'id1', duration: 10000 }),
+ createResult({ id: 'id1', duration: 10000 }),
+ ])
+
+ expect((await watcher.getState('id1')).tokens).toMatchInlineSnapshot(`9880`)
+ advanceTime(1000)
+ expect((await watcher.getState('id1')).tokens).toMatchInlineSnapshot(`9890`)
+ advanceTime(10000)
+ expect((await watcher.getState('id1')).tokens).toMatchInlineSnapshot(`9990`)
+ })
+
+ it('should remain disabled for period', async () => {
+ const badResults = Array(100).fill(createResult({ id: 'id1', error: 'error!' }))
+
+ await watcher.observeResults(badResults)
+
+ expect(await watcher.getState('id1')).toMatchInlineSnapshot(`
+ Object {
+ "rating": 0,
+ "state": 3,
+ "tokens": 0,
+ }
+ `)
+
+ advanceTime(10000)
+
+ // Should still be disabled even though tokens have been refilled
+ expect(await watcher.getState('id1')).toMatchInlineSnapshot(`
+ Object {
+ "rating": 0.01,
+ "state": 3,
+ "tokens": 100,
+ }
+ `)
+ })
+
+ describe('forceStateChange', () => {
+ it('should force healthy', async () => {
+ await watcher.forceStateChange('id1', HogWatcherState.healthy)
+ expect(await watcher.getState('id1')).toMatchInlineSnapshot(`
+ Object {
+ "rating": 1,
+ "state": 1,
+ "tokens": 10000,
+ }
+ `)
+ })
+ it('should force degraded', async () => {
+ await watcher.forceStateChange('id1', HogWatcherState.degraded)
+ expect(await watcher.getState('id1')).toMatchInlineSnapshot(`
+ Object {
+ "rating": 0.8,
+ "state": 1,
+ "tokens": 8000,
+ }
+ `)
+ })
+ it('should force disabledForPeriod', async () => {
+ await watcher.forceStateChange('id1', HogWatcherState.disabledForPeriod)
+ expect(await watcher.getState('id1')).toMatchInlineSnapshot(`
+ Object {
+ "rating": 0,
+ "state": 3,
+ "tokens": 0,
+ }
+ `)
+ })
+ it('should force disabledIndefinitely', async () => {
+ await watcher.forceStateChange('id1', HogWatcherState.disabledIndefinitely)
+ expect(await watcher.getState('id1')).toMatchInlineSnapshot(`
+ Object {
+ "rating": 0,
+ "state": 4,
+ "tokens": 0,
+ }
+ `)
+ })
+ })
+
+ describe('disable logic', () => {
+ beforeEach(() => {
+ hub.CDP_WATCHER_BUCKET_SIZE = 100
+ hub.CDP_WATCHER_DISABLED_TEMPORARY_TTL = 1 // Shorter ttl to help with testing
+ hub.CDP_WATCHER_DISABLED_TEMPORARY_MAX_COUNT = 3
+ })
+
+ const reallyAdvanceTime = async (ms: number) => {
+ advanceTime(ms)
+ await delay(ms)
+ }
+
+ it('count the number of times it has been disabled', async () => {
+ // Trigger the temporary disabled state 3 times
+ for (let i = 0; i < 2; i++) {
+ await watcher.observeResults([createResult({ id: 'id1', error: 'error!' })])
+ expect((await watcher.getState('id1')).state).toEqual(HogWatcherState.disabledForPeriod)
+ await reallyAdvanceTime(1000)
+ expect((await watcher.getState('id1')).state).toEqual(HogWatcherState.degraded)
+ }
+
+ await watcher.observeResults([createResult({ id: 'id1', error: 'error!' })])
+ expect((await watcher.getState('id1')).state).toEqual(HogWatcherState.disabledIndefinitely)
+ await reallyAdvanceTime(1000)
+ expect((await watcher.getState('id1')).state).toEqual(HogWatcherState.disabledIndefinitely)
+ })
+ })
+ })
+})
diff --git a/plugin-server/tests/cdp/hog-watcher/hog-watcher-utils.test.ts b/plugin-server/tests/cdp/hog-watcher/hog-watcher-utils.test.ts
deleted file mode 100644
index eb0215bbcfac5..0000000000000
--- a/plugin-server/tests/cdp/hog-watcher/hog-watcher-utils.test.ts
+++ /dev/null
@@ -1,211 +0,0 @@
-jest.mock('../../../src/utils/now', () => {
- return {
- now: jest.fn(() => Date.now()),
- }
-})
-
-import {
- HogWatcherObservationPeriod,
- HogWatcherRatingPeriod,
- HogWatcherState,
- HogWatcherStatePeriod,
-} from '../../../src/cdp/hog-watcher/types'
-import { calculateRating, deriveCurrentStateFromRatings, periodTimestamp } from '../../../src/cdp/hog-watcher/utils'
-import { defaultConfig } from '../../../src/config/config'
-
-const config = defaultConfig
-
-describe('HogWatcher.utils', () => {
- describe('calculateRating', () => {
- // TODO: Change rating to account for numbers as well - low volume failures can still have a high rating as their impact is not so bad
- const cases: Array<[Partial, number]> = [
- [{ successes: 9, failures: 1 }, 0.9],
- [{ successes: 1, failures: 1 }, 0.5],
- [{ successes: 0, failures: 1 }, 0],
- [{ successes: 1, failures: 0 }, 1],
- [{ asyncFunctionSuccesses: 9, asyncFunctionFailures: 1 }, 0.9],
- [{ asyncFunctionSuccesses: 1, asyncFunctionFailures: 1 }, 0.5],
- [{ asyncFunctionSuccesses: 0, asyncFunctionFailures: 1 }, 0],
- [{ asyncFunctionSuccesses: 1, asyncFunctionFailures: 0 }, 1],
-
- // Mixed results - currently whichever is worse is the rating
- [{ successes: 9, failures: 1, asyncFunctionSuccesses: 1, asyncFunctionFailures: 1 }, 0.5],
- [{ successes: 1, failures: 1, asyncFunctionSuccesses: 9, asyncFunctionFailures: 1 }, 0.5],
- [{ successes: 1, failures: 1, asyncFunctionSuccesses: 1, asyncFunctionFailures: 1 }, 0.5],
- [{ successes: 0, failures: 0, asyncFunctionSuccesses: 9, asyncFunctionFailures: 1 }, 0.9],
- ]
-
- it.each(cases)('should calculate the rating %s of %s', (vals, rating) => {
- const observation: HogWatcherObservationPeriod = {
- timestamp: Date.now(),
- successes: 0,
- failures: 0,
- asyncFunctionFailures: 0,
- asyncFunctionSuccesses: 0,
- ...vals,
- }
- expect(calculateRating(observation)).toBe(rating)
- })
- })
-
- describe('deriveCurrentStateFromRatings', () => {
- let now: number
- let ratings: HogWatcherRatingPeriod[]
- let states: HogWatcherStatePeriod[]
-
- beforeEach(() => {
- now = periodTimestamp(config)
- ratings = []
- states = []
-
- jest.useFakeTimers()
- jest.setSystemTime(now)
- })
-
- afterEach(() => {
- jest.useRealTimers()
- })
-
- const advanceTime = (ms: number) => {
- jest.advanceTimersByTime(ms)
- }
-
- const updateState = (newRatings: number[], newStates: HogWatcherState[]) => {
- for (let i = 0; i < Math.max(newRatings.length, newStates.length); i++) {
- advanceTime(config.CDP_WATCHER_OBSERVATION_PERIOD)
-
- if (newStates[i]) {
- states.push({
- timestamp: periodTimestamp(config),
- state: newStates[i],
- })
- }
-
- if (typeof newRatings[i] === 'number') {
- ratings.push({
- timestamp: Date.now(),
- rating: newRatings[i],
- })
- }
- }
- }
-
- const currentState = () => deriveCurrentStateFromRatings(config, ratings, states)
- const getAverageRating = () =>
- ratings.length ? ratings.reduce((acc, x) => acc + x.rating, 0) / ratings.length : 0
-
- describe('1 - healthy', () => {
- it('should be healthy with no ratings or previous states', () => {
- expect(currentState()).toBe(HogWatcherState.healthy)
- })
-
- it.each(Object.values(HogWatcherState))(
- 'should be whatever the last state is (%s) if no ratings',
- (lastState) => {
- updateState([], [lastState as any])
- expect(currentState()).toBe(lastState)
- }
- )
-
- it('should not change if too few ratings', () => {
- updateState([0, 0], [])
- expect(getAverageRating()).toEqual(0)
- expect(currentState()).toBe(HogWatcherState.healthy)
- })
-
- it('should move to overflow if enough ratings are unhealthy', () => {
- updateState([1, 1, 0.8, 0.6, 0.6, 0.6, 0.6], [])
- expect(states).toMatchObject([])
- expect(getAverageRating()).toBeLessThan(config.CDP_WATCHER_OVERFLOW_RATING_THRESHOLD)
- expect(currentState()).toBe(HogWatcherState.overflowed)
- })
- })
-
- describe('2 - overflow', () => {
- it('should stay in overflow if the rating does not change ', () => {
- updateState([1, 1, 0.8, 0.6, 0.6, 0.6, 0.6], [])
- expect(currentState()).toBe(HogWatcherState.overflowed)
- expect(getAverageRating()).toBeLessThan(config.CDP_WATCHER_OVERFLOW_RATING_THRESHOLD)
- expect(getAverageRating()).toBeGreaterThan(config.CDP_WATCHER_DISABLED_RATING_THRESHOLD)
-
- updateState([0.5, 0.5, 0.6, 0.7, 0.8, 1, 0.8], [])
- expect(getAverageRating()).toBeLessThan(config.CDP_WATCHER_OVERFLOW_RATING_THRESHOLD)
- expect(getAverageRating()).toBeGreaterThan(config.CDP_WATCHER_DISABLED_RATING_THRESHOLD)
- expect(currentState()).toBe(HogWatcherState.overflowed)
- })
-
- it('should move back to healthy with enough healthy activity ', () => {
- updateState([], [HogWatcherState.overflowed])
- expect(currentState()).toBe(HogWatcherState.overflowed)
- updateState([0.5, 0.8, 0.9, 0.9, 1, 0.9, 1], [])
- expect(getAverageRating()).toBeGreaterThan(config.CDP_WATCHER_OVERFLOW_RATING_THRESHOLD)
- expect(currentState()).toBe(HogWatcherState.healthy)
- })
-
- it('should move to overflow if enough observations are unhealthy', () => {
- updateState([1, 1, 0.8, 0.6, 0.6, 0.6, 0.6], [])
- expect(states).toMatchObject([])
- expect(getAverageRating()).toBeLessThan(config.CDP_WATCHER_OVERFLOW_RATING_THRESHOLD)
- expect(currentState()).toBe(HogWatcherState.overflowed)
- })
-
- it('should move to disabledForPeriod if sustained lower', () => {
- updateState([0.5, 0.4, 0.4], [])
- expect(currentState()).toBe(HogWatcherState.overflowed)
- updateState([], [HogWatcherState.overflowed]) // Add the new state
- expect(currentState()).toBe(HogWatcherState.overflowed) // Should still be the same
- updateState([0.5, 0.4], []) // Add nearly enough ratings for next evaluation
- expect(currentState()).toBe(HogWatcherState.overflowed) // Should still be the same
- updateState([0.4], []) // One more rating and it can be evaluated
- expect(getAverageRating()).toBeLessThan(config.CDP_WATCHER_DISABLED_RATING_THRESHOLD)
- expect(currentState()).toBe(HogWatcherState.disabledForPeriod)
- })
-
- it('should go to disabledIndefinitely with enough bad states', () => {
- updateState(
- [],
- [
- HogWatcherState.disabledForPeriod,
- HogWatcherState.overflowed,
- HogWatcherState.disabledForPeriod,
- HogWatcherState.overflowed,
- HogWatcherState.disabledForPeriod,
- HogWatcherState.overflowed,
- HogWatcherState.disabledForPeriod,
- HogWatcherState.overflowed,
- HogWatcherState.disabledForPeriod,
- HogWatcherState.overflowed,
- ]
- )
- expect(currentState()).toBe(HogWatcherState.overflowed)
- updateState([0.2, 0.2, 0.2, 0.2], [])
- expect(currentState()).toBe(HogWatcherState.disabledIndefinitely)
- })
- })
-
- describe('3 - disabledForPeriod', () => {
- it('should stay disabled for period until the period has passed ', () => {
- updateState([], [HogWatcherState.disabledForPeriod])
- expect(currentState()).toBe(HogWatcherState.disabledForPeriod)
- expect(states).toEqual([
- { state: HogWatcherState.disabledForPeriod, timestamp: periodTimestamp(config) },
- ])
- advanceTime(config.CDP_WATCHER_DISABLED_PERIOD - 1)
- expect(currentState()).toBe(HogWatcherState.disabledForPeriod)
- advanceTime(2)
- expect(currentState()).toBe(HogWatcherState.overflowed)
- })
- })
-
- describe('4 - disabledIndefinitely', () => {
- it('should stay in disabledIndefinitely no matter what', () => {
- updateState([], [HogWatcherState.disabledIndefinitely])
-
- expect(currentState()).toBe(HogWatcherState.disabledIndefinitely)
- // Technically this wouldn't be possible but still good to test
- updateState([1, 1, 1, 1, 1, 1, 1], [])
- expect(currentState()).toBe(HogWatcherState.disabledIndefinitely)
- })
- })
- })
-})
diff --git a/plugin-server/tests/cdp/hog-watcher/hog-watcher.test.ts b/plugin-server/tests/cdp/hog-watcher/hog-watcher.test.ts
deleted file mode 100644
index a36d72794ce99..0000000000000
--- a/plugin-server/tests/cdp/hog-watcher/hog-watcher.test.ts
+++ /dev/null
@@ -1,442 +0,0 @@
-jest.mock('../../../src/utils/now', () => {
- return {
- now: jest.fn(() => Date.now()),
- }
-})
-
-import { HogWatcher, HogWatcherActiveObservations } from '../../../src/cdp/hog-watcher/hog-watcher'
-import { BASE_REDIS_KEY, runRedis } from '../../../src/cdp/hog-watcher/utils'
-import { HogFunctionInvocationAsyncResponse, HogFunctionInvocationResult } from '../../../src/cdp/types'
-import { defaultConfig } from '../../../src/config/config'
-import { Hub } from '../../../src/types'
-import { createHub } from '../../../src/utils/db/hub'
-import { delay } from '../../../src/utils/utils'
-import { deleteKeysWithPrefix } from '../../helpers/redis'
-
-const mockNow: jest.Mock = require('../../../src/utils/now').now as any
-
-const createResult = (id: string, finished = true, error?: string): HogFunctionInvocationResult => {
- return {
- invocation: {
- id: 'invocation-id',
- teamId: 2,
- hogFunctionId: id,
- globals: {} as any,
- timings: [],
- },
- finished,
- error,
- logs: [],
- }
-}
-
-const createAsyncResponse = (id: string, success = true): HogFunctionInvocationAsyncResponse => {
- return {
- state: '',
- teamId: 2,
- hogFunctionId: id,
- asyncFunctionResponse: {
- error: !success ? 'error' : null,
- response: {},
- },
- }
-}
-
-const config = defaultConfig
-
-describe('HogWatcher', () => {
- describe('HogWatcherActiveObservations', () => {
- let observer: HogWatcherActiveObservations
-
- beforeEach(() => {
- observer = new HogWatcherActiveObservations(config)
- jest.useFakeTimers()
- jest.setSystemTime(1719229670000)
- })
-
- afterEach(() => {
- jest.useRealTimers()
- })
-
- it('should update the observation', () => {
- expect(observer.observations).toEqual({})
-
- observer.observeResults([createResult('id1'), createResult('id1', false, 'error')])
- observer.observeAsyncFunctionResponses([createAsyncResponse('id1'), createAsyncResponse('id2', false)])
-
- expect(observer.observations).toMatchInlineSnapshot(`
- Object {
- "id1": Object {
- "asyncFunctionFailures": 0,
- "asyncFunctionSuccesses": 1,
- "failures": 1,
- "successes": 1,
- "timestamp": 1719229670000,
- },
- "id2": Object {
- "asyncFunctionFailures": 1,
- "asyncFunctionSuccesses": 0,
- "failures": 0,
- "successes": 0,
- "timestamp": 1719229670000,
- },
- }
- `)
-
- observer.observeAsyncFunctionResponses([createAsyncResponse('id2'), createAsyncResponse('id2')])
-
- expect(observer.observations).toMatchInlineSnapshot(`
- Object {
- "id1": Object {
- "asyncFunctionFailures": 0,
- "asyncFunctionSuccesses": 1,
- "failures": 1,
- "successes": 1,
- "timestamp": 1719229670000,
- },
- "id2": Object {
- "asyncFunctionFailures": 1,
- "asyncFunctionSuccesses": 2,
- "failures": 0,
- "successes": 0,
- "timestamp": 1719229670000,
- },
- }
- `)
- })
- })
-
- describe('integration', () => {
- let now: number
- let hub: Hub
- let closeHub: () => Promise
-
- let watcher1: HogWatcher
- let watcher2: HogWatcher
-
- const advanceTime = (ms: number) => {
- now += ms
- mockNow.mockReturnValue(now)
- }
-
- beforeEach(async () => {
- ;[hub, closeHub] = await createHub()
-
- now = 1720000000000
- mockNow.mockReturnValue(now)
-
- await deleteKeysWithPrefix(hub.redisPool, BASE_REDIS_KEY)
-
- watcher1 = new HogWatcher(hub)
- watcher2 = new HogWatcher(hub)
- await watcher1.start()
- await watcher2.start()
- })
-
- afterEach(async () => {
- await Promise.all([watcher1, watcher2].map((watcher) => watcher.stop()))
- jest.useRealTimers()
- await closeHub()
- jest.clearAllMocks()
- })
-
- it('should retrieve empty state', async () => {
- const res = await watcher1.fetchWatcher('id1')
- expect(res).toEqual({
- ratings: [],
- state: 1,
- states: [],
- })
- })
-
- it('should store observations', () => {
- watcher1.currentObservations.observeResults([createResult('id1'), createResult('id1', false, 'error')])
- watcher1.currentObservations.observeResults([createResult('id2'), createResult('id1')])
- watcher1.currentObservations.observeResults([createResult('id1')])
-
- expect(watcher1.currentObservations.observations).toMatchObject({
- id1: {
- failures: 1,
- successes: 3,
- timestamp: now,
- },
- id2: {
- failures: 0,
- successes: 1,
- timestamp: now,
- },
- })
-
- expect(watcher2.currentObservations.observations).toEqual({})
- })
-
- it('should sync nothing if still in period', async () => {
- watcher1.currentObservations.observeResults([createResult('id2'), createResult('id1')])
- expect(watcher1.currentObservations.observations).not.toEqual({})
- await watcher1.sync()
- expect(watcher1.currentObservations.observations).not.toEqual({})
- expect(await watcher2.fetchState()).toEqual({
- observations: {},
- ratings: {},
- states: {},
- })
- })
-
- it('should persist the in flight observations to redis', async () => {
- watcher1.currentObservations.observeResults([createResult('id2'), createResult('id1')])
- advanceTime(config.CDP_WATCHER_OBSERVATION_PERIOD)
- await watcher1.sync()
- expect(watcher1.currentObservations.observations).toEqual({})
- const persistedState = await watcher2.fetchState()
- expect(persistedState).toMatchObject({
- observations: {
- id1: [
- {
- failures: 0,
- successes: 1,
- timestamp: 1720000000000,
- },
- ],
- id2: [
- {
- failures: 0,
- successes: 1,
- timestamp: 1720000000000,
- },
- ],
- },
- })
- })
-
- it('should save the states and ratings to redis if enough periods passed', async () => {
- watcher1.currentObservations.observeResults([createResult('id2'), createResult('id1')])
- watcher2.currentObservations.observeResults([
- createResult('id2', false, 'error'),
- createResult('id1', true),
- ])
-
- let expectation: any = {
- observations: {
- id1: [expect.any(Object), expect.any(Object)],
- id2: [expect.any(Object), expect.any(Object)],
- },
- ratings: {},
- states: {},
- }
-
- // Move forward one period - this passes themasking period, ensuring that the observations are persisted
- advanceTime(config.CDP_WATCHER_OBSERVATION_PERIOD)
- await watcher1.sync()
- await watcher2.sync()
- await delay(100) // Allow pubsub to happen
- expect(watcher2.states).toEqual({})
- // Watcher1 should be leader and have the globalState
- expect(watcher1.globalState).toEqual(expectation)
- expect(watcher2.globalState).toEqual(undefined)
- expect(await watcher2.fetchState()).toEqual(expectation)
-
- // Move forward one final period and the initial observations should now be ratings
- advanceTime(config.CDP_WATCHER_OBSERVATION_PERIOD)
- await watcher1.sync()
- await watcher2.sync()
- await delay(100) // Allow pubsub to happen
-
- expectation = {
- observations: {},
- ratings: {
- id1: [{ rating: 1, timestamp: 1720000000000 }],
- id2: [{ rating: 0.5, timestamp: 1720000000000 }],
- },
- states: {},
- }
-
- expect(watcher2.states).toEqual({}) // No states yet as everything is healthy
- expect(watcher1.globalState).toEqual(expectation)
- // Persisted state should match the global state
- expect(await watcher2.fetchState()).toEqual(expectation)
- })
-
- it('should move the function into a bad state after enough periods', async () => {
- // We need to move N times forward to get past the masking period and have enough ratings to make a decision
- // 2 for the persistance of the ratings, 3 more for the evaluation, 3 more for the subsequent evaluation
- for (let i = 0; i < 2 + 3 + 3; i++) {
- watcher1.currentObservations.observeResults([createResult('id1', false, 'error')])
- advanceTime(config.CDP_WATCHER_OBSERVATION_PERIOD)
- await watcher1.sync()
- }
- await delay(100)
-
- expect(watcher1.globalState).toMatchObject({
- observations: {},
- ratings: {
- id1: Array(7)
- .fill(0)
- .map((_, i) => ({
- rating: 0,
- timestamp: 1720000000000 + i * config.CDP_WATCHER_OBSERVATION_PERIOD,
- })),
- },
- states: {
- id1: [
- {
- state: 2,
- timestamp: 1720000040000,
- },
- {
- state: 3,
- timestamp: 1720000080000,
- },
- ],
- },
- })
-
- expect(watcher2.states['id1']).toEqual(3)
-
- advanceTime(config.CDP_WATCHER_DISABLED_PERIOD + 1)
- await watcher1.sync()
- await delay(100)
- expect(watcher2.states['id1']).toEqual(2)
- })
-
- it('should save the states to redis so another watcher can grab it', async () => {
- for (let i = 0; i < 5; i++) {
- watcher1.currentObservations.observeResults([createResult('id1', false, 'error')])
- advanceTime(config.CDP_WATCHER_OBSERVATION_PERIOD)
- await watcher1.sync()
- }
- await delay(100)
-
- expect(await watcher2.fetchWatcher('id1')).toMatchObject({
- state: 2,
- states: [
- {
- state: 2,
- timestamp: 1720000040000,
- },
- ],
- })
- })
-
- it('should load existing states from redis', async () => {
- for (let i = 0; i < 5; i++) {
- watcher1.currentObservations.observeResults([createResult('id1', false, 'error')])
- advanceTime(config.CDP_WATCHER_OBSERVATION_PERIOD)
- await watcher1.sync()
- }
-
- const newWatcher = new HogWatcher(hub)
- await newWatcher.start()
- expect(newWatcher.states).toEqual({
- id1: 2,
- })
- })
-
- it('should react to becoming or losing leader status', async () => {
- watcher1.currentObservations.observeResults([createResult('id1', false, 'error')])
- advanceTime(config.CDP_WATCHER_OBSERVATION_PERIOD)
- await watcher1.sync()
- const stateExpectation = {
- observations: { id1: [expect.any(Object)] },
- ratings: {},
- states: {},
- }
- expect(watcher1.isLeader).toEqual(true)
- expect(watcher1.globalState).toEqual(stateExpectation)
- expect(watcher2.isLeader).toEqual(false)
- expect(watcher2.globalState).toEqual(undefined)
-
- // Simulate the ttl running out
- await runRedis(hub.redisPool, 'test', (client) => client.del(BASE_REDIS_KEY + '/leader'))
-
- // Watcher 2 goes first so will grab leadership
- await Promise.all([watcher2.sync(), watcher1.sync()])
- expect(watcher1.isLeader).toEqual(false)
- expect(watcher1.globalState).toEqual(undefined)
- expect(watcher2.isLeader).toEqual(true)
- expect(watcher2.globalState).toEqual(stateExpectation)
- })
-
- it('should move a problematic function in and out of overflow until eventually disabled', async () => {
- // NOTE: The length here just happens be the right loop count to
-
- let maxLoops = 100
- while (watcher1.getFunctionState('id1') !== 4 && maxLoops > 0) {
- maxLoops--
- if (watcher1.getFunctionState('id1') < 3) {
- // If we are anything other than disables, simulate a bad invocations
- watcher1.currentObservations.observeResults([createResult('id1', false, 'error')])
- advanceTime(config.CDP_WATCHER_OBSERVATION_PERIOD)
- } else {
- // Skip ahead if the function is disabled
- advanceTime(config.CDP_WATCHER_DISABLED_PERIOD)
- }
- await watcher1.sync()
- await delay(5)
- }
-
- const states = watcher1.globalState?.states['id1'] ?? []
- const duration = Math.round((states[states.length - 1]!.timestamp - states[0]!.timestamp) / 1000 / 60)
- // Little helper check to remind us the total time for a bad function to get to be permanently disabled
- expect(`Time to fully disable: ${duration}mins`).toMatchInlineSnapshot(`"Time to fully disable: 63mins"`)
-
- expect(states).toMatchInlineSnapshot(`
- Array [
- Object {
- "state": 2,
- "timestamp": 1720000040000,
- },
- Object {
- "state": 3,
- "timestamp": 1720000080000,
- },
- Object {
- "state": 2,
- "timestamp": 1720001280000,
- },
- Object {
- "state": 3,
- "timestamp": 1720001320000,
- },
- Object {
- "state": 2,
- "timestamp": 1720002520000,
- },
- Object {
- "state": 3,
- "timestamp": 1720002560000,
- },
- Object {
- "state": 2,
- "timestamp": 1720003760000,
- },
- Object {
- "state": 4,
- "timestamp": 1720003800000,
- },
- ]
- `)
- })
-
- it('should react to incoming manual state changes', async () => {
- await watcher1.forceStateChange('id1', 2)
- await delay(100)
-
- const stateExpectation = {
- observations: {},
- ratings: {},
- states: {
- id1: [
- {
- state: 2,
- timestamp: 1720000000000,
- },
- ],
- },
- }
- expect(watcher1.isLeader).toEqual(true)
- expect(watcher1.globalState).toEqual(stateExpectation)
- expect(watcher2.isLeader).toEqual(false)
- expect(watcher2.globalState).toEqual(undefined)
- })
- })
-})
diff --git a/posthog/api/hog_function.py b/posthog/api/hog_function.py
index a9eddc7b1fae0..be9ee0fd475e2 100644
--- a/posthog/api/hog_function.py
+++ b/posthog/api/hog_function.py
@@ -31,8 +31,8 @@
class HogFunctionStatusSerializer(serializers.Serializer):
state = serializers.ChoiceField(choices=[state.value for state in HogFunctionState])
- states: serializers.ListField = serializers.ListField(child=serializers.DictField())
- ratings: serializers.ListField = serializers.ListField(child=serializers.DictField())
+ rating: serializers.FloatField = serializers.FloatField()
+ tokens: serializers.IntegerField = serializers.IntegerField()
class HogFunctionMinimalSerializer(serializers.ModelSerializer):
@@ -58,7 +58,7 @@ class Meta:
class HogFunctionSerializer(HogFunctionMinimalSerializer):
template = HogFunctionTemplateSerializer(read_only=True)
- status = HogFunctionStatusSerializer(read_only=True)
+ status = HogFunctionStatusSerializer(read_only=True, required=False, allow_null=True)
class Meta:
model = HogFunction
@@ -181,7 +181,7 @@ def update(self, instance: HogFunction, validated_data: dict, *args, **kwargs) -
res: HogFunction = super().update(instance, validated_data)
if res.enabled and res.status.get("state", 0) >= HogFunctionState.DISABLED_TEMPORARILY.value:
- res.set_function_status(HogFunctionState.OVERFLOWED.value)
+ res.set_function_status(HogFunctionState.DEGRADED.value)
return res
diff --git a/posthog/api/test/test_hog_function.py b/posthog/api/test/test_hog_function.py
index 57f7290cd5fc5..77bfdfd965e46 100644
--- a/posthog/api/test/test_hog_function.py
+++ b/posthog/api/test/test_hog_function.py
@@ -6,7 +6,7 @@
from posthog.constants import AvailableFeature
from posthog.models.action.action import Action
-from posthog.models.hog_functions.hog_function import HogFunction
+from posthog.models.hog_functions.hog_function import DEFAULT_STATE, HogFunction
from posthog.test.base import APIBaseTest, ClickhouseTestMixin, QueryMatchingTest
from posthog.cdp.templates.webhook.template_webhook import template as template_webhook
from posthog.cdp.templates.slack.template_slack import template as template_slack
@@ -167,7 +167,7 @@ def test_create_hog_function(self, *args):
"filters": {"bytecode": ["_h", 29]},
"icon_url": None,
"template": None,
- "status": {"ratings": [], "state": 0, "states": []},
+ "status": {"rating": 0, "state": 0, "tokens": 0},
}
@patch("posthog.permissions.posthoganalytics.feature_enabled", return_value=True)
@@ -484,7 +484,7 @@ def test_generates_filters_bytecode(self, *args):
def test_loads_status_when_enabled_and_available(self, *args):
with patch("posthog.plugins.plugin_server_api.requests.get") as mock_get:
mock_get.return_value.status_code = status.HTTP_200_OK
- mock_get.return_value.json.return_value = {"state": 1, "states": [], "ratings": []}
+ mock_get.return_value.json.return_value = {"state": 1, "tokens": 0, "rating": 0}
response = self.client.post(
f"/api/projects/{self.team.id}/hog_functions/",
@@ -499,7 +499,7 @@ def test_loads_status_when_enabled_and_available(self, *args):
assert response.status_code == status.HTTP_201_CREATED, response.json()
response = self.client.get(f"/api/projects/{self.team.id}/hog_functions/{response.json()['id']}")
- assert response.json()["status"] == {"state": 1, "states": [], "ratings": []}
+ assert response.json()["status"] == {"state": 1, "tokens": 0, "rating": 0}
@patch("posthog.permissions.posthoganalytics.feature_enabled", return_value=True)
def test_does_not_crash_when_status_not_available(self, *args):
@@ -519,14 +519,14 @@ def test_does_not_crash_when_status_not_available(self, *args):
)
assert response.status_code == status.HTTP_201_CREATED, response.json()
response = self.client.get(f"/api/projects/{self.team.id}/hog_functions/{response.json()['id']}")
- assert response.json()["status"] == {"ratings": [], "state": 0, "states": []}
+ assert response.json()["status"] == DEFAULT_STATE
@patch("posthog.permissions.posthoganalytics.feature_enabled", return_value=True)
def test_patches_status_on_enabled_update(self, *args):
with patch("posthog.plugins.plugin_server_api.requests.get") as mock_get:
with patch("posthog.plugins.plugin_server_api.requests.patch") as mock_patch:
mock_get.return_value.status_code = status.HTTP_200_OK
- mock_get.return_value.json.return_value = {"state": 4, "states": [], "ratings": []}
+ mock_get.return_value.json.return_value = {"state": 4, "tokens": 0, "rating": 0}
response = self.client.post(
f"/api/projects/{self.team.id}/hog_functions/",
diff --git a/posthog/models/hog_functions/hog_function.py b/posthog/models/hog_functions/hog_function.py
index 5d5de9ec73960..45151dcad6986 100644
--- a/posthog/models/hog_functions/hog_function.py
+++ b/posthog/models/hog_functions/hog_function.py
@@ -16,11 +16,8 @@
reload_hog_functions_on_workers,
)
-DEFAULT_STATE = {
- "state": 0,
- "ratings": [],
- "states": [],
-}
+DEFAULT_STATE = {"state": 0, "tokens": 0, "rating": 0}
+
logger = structlog.get_logger(__name__)
@@ -28,7 +25,7 @@
class HogFunctionState(enum.Enum):
UNKNOWN = 0
HEALTHY = 1
- OVERFLOWED = 2
+ DEGRADED = 2
DISABLED_TEMPORARILY = 3
DISABLED_PERMANENTLY = 4