diff --git a/plugin-server/src/cdp/hog-watcher.ts b/plugin-server/src/cdp/hog-watcher.ts index 0b02eb6a61460..d04701a353585 100644 --- a/plugin-server/src/cdp/hog-watcher.ts +++ b/plugin-server/src/cdp/hog-watcher.ts @@ -1,6 +1,6 @@ import { captureException } from '@sentry/node' import { readFileSync } from 'fs' -import { Redis } from 'ioredis' +import { Pipeline, Redis } from 'ioredis' import path from 'path' import { Hub } from '../types' @@ -9,9 +9,11 @@ import { status } from '../utils/status' import { HogFunctionInvocationResult, HogFunctionType } from './types' export const BASE_REDIS_KEY = process.env.NODE_ENV == 'test' ? '@posthog-test/hog-watcher' : '@posthog/hog-watcher' -const REDIS_KEY_SCORES = `${BASE_REDIS_KEY}/scores` +const REDIS_KEY_HEALTH = `${BASE_REDIS_KEY}/health` const REDIS_TIMEOUT_SECONDS = 5 +const LUA_TOKEN_BUCKET = readFileSync(path.join(__dirname, 'lua', 'token-bucket.lua')).toString() + export enum HogWatcherState { healthy = 1, degraded = 2, @@ -27,34 +29,43 @@ export enum HogWatcherState { export type HogWatcherFunctionState = { state: HogWatcherState - score: number + tokens: number + rating: number +} + +type WithCheckRateLimit = { + checkRateLimit: (key: string, now: number, cost: number, poolMax: number, fillRate: number, expiry: number) => T } +type HogWatcherRedisClientPipeline = Pipeline & WithCheckRateLimit + +type HogWatcherRedisClient = Omit & + WithCheckRateLimit> & { + pipeline: () => HogWatcherRedisClientPipeline + } + export class HogWatcher { constructor(private hub: Hub) {} - private async runRedis(fn: (client: Redis) => Promise): Promise { + private rateLimitArgs(id: HogFunctionType['id'], cost: number) { + const now = Date.now() + return [ + `${REDIS_KEY_HEALTH}/${id}`, + now, + cost, + this.hub.CDP_WATCHER_BUCKET_SIZE, + this.hub.CDP_WATCHER_REFILL_RATE, + this.hub.CDP_WATCHER_TTL, + ] as const + } + + private async runRedis(fn: (client: HogWatcherRedisClient) => Promise): Promise { // We want all of this to fail open in the issue of redis being unavailable - we'd rather have the function continue const client = await this.hub.redisPool.acquire() - client.defineCommand('consumeTokenBucket', { - numberOfKeys: 2, - lua: ` - local key = KEYS[1] - local bucket_size = tonumber(ARGV[1]) - local refill_rate = tonumber(ARGV[2]) - local cost = tonumber(ARGV[3]) - - local current = tonumber(redis.call('get', key) or 0) - local refill_amount = math.max(0, (current - refill_rate) + cost) - - if refill_amount > bucket_size then - return 0 - end - - redis.call('set', key, refill_amount) - return 1 - `, + client.defineCommand('checkRateLimit', { + numberOfKeys: 1, + lua: LUA_TOKEN_BUCKET, }) const timeout = timeoutGuard( @@ -63,7 +74,7 @@ export class HogWatcher { REDIS_TIMEOUT_SECONDS * 1000 ) try { - return await fn(client) + return await fn(client as HogWatcherRedisClient) } catch (e) { status.error('HogWatcher Redis error', e) captureException(e) @@ -74,14 +85,18 @@ export class HogWatcher { } } - public scoreToState(score: number): HogWatcherState { - if (score <= this.hub.CDP_WATCHER_THRESHOLD_DISABLED) { - return HogWatcherState.disabledForPeriod - } else if (score <= this.hub.CDP_WATCHER_THRESHOLD_DEGRADED) { - return HogWatcherState.degraded - } + public tokensToFunctionState(tokens?: number | null): HogWatcherFunctionState { + tokens = tokens ?? this.hub.CDP_WATCHER_BUCKET_SIZE + const rating = tokens / this.hub.CDP_WATCHER_BUCKET_SIZE - return HogWatcherState.healthy + const state = + rating >= this.hub.CDP_WATCHER_THRESHOLD_DEGRADED + ? HogWatcherState.healthy + : rating > 0 + ? HogWatcherState.degraded + : HogWatcherState.disabledForPeriod + + return { state, tokens, rating } } public async getStates( @@ -89,40 +104,27 @@ export class HogWatcher { ): Promise> { const idsSet = new Set(ids) - const states = await this.runRedis(async (client) => { + const buckets = await this.runRedis(async (client) => { const pipeline = client.pipeline() for (const id of idsSet) { - pipeline.get(`${REDIS_KEY_SCORES}/${id}`) + pipeline.checkRateLimit(...this.rateLimitArgs(id, 0)) } return pipeline.exec() }) return Array.from(idsSet).reduce((acc, id, index) => { - const score = states ? Number(states[index][1]) : 0 return { ...acc, - [id]: { - state: this.scoreToState(score), - score, - }, + [id]: this.tokensToFunctionState(buckets ? buckets[index][1] : undefined), } }, {} as Record) } public async getState(id: HogFunctionType['id']): Promise { - const res = await this.runRedis(async (client) => { - const score = await client.get(`${REDIS_KEY_SCORES}/${id}`) - return score - }) - - const score = Number(res ?? 0) - - return { - state: this.scoreToState(score), - score, - } + const res = await this.runRedis(async (client) => client.checkRateLimit(...this.rateLimitArgs(id, 0))) + return this.tokensToFunctionState(res) } public async forceStateChange(id: HogFunctionType['id'], state: HogWatcherState): Promise { @@ -131,77 +133,54 @@ export class HogWatcher { const newScore = state === HogWatcherState.healthy - ? 0 + ? this.hub.CDP_WATCHER_BUCKET_SIZE : state === HogWatcherState.degraded - ? this.hub.CDP_WATCHER_THRESHOLD_DEGRADED - : this.hub.CDP_WATCHER_THRESHOLD_DISABLED + ? this.hub.CDP_WATCHER_BUCKET_SIZE * this.hub.CDP_WATCHER_THRESHOLD_DEGRADED + : 0 - pipeline.set(`${REDIS_KEY_SCORES}/${id}`, newScore) - pipeline.expire(`${REDIS_KEY_SCORES}/${id}`, this.hub.CDP_WATCHER_TTL) + pipeline.hset(`${REDIS_KEY_HEALTH}/${id}`, 'pool', newScore) await pipeline.exec() }) } public async observeResults(results: HogFunctionInvocationResult[]): Promise { - const changes: Record = {} + const costs: Record = {} results.forEach((result) => { - let change = (changes[result.invocation.hogFunctionId] = changes[result.invocation.hogFunctionId] || 0) + let cost = (costs[result.invocation.hogFunctionId] = costs[result.invocation.hogFunctionId] || 0) if (result.finished) { // If it is finished we can calculate the score based off of the timings const totalDurationMs = result.invocation.timings.reduce((acc, timing) => acc + timing.duration_ms, 0) - - const lowerBound = this.hub.CDP_WATCHER_SCORE_TIMING_LOWER - const upperBound = this.hub.CDP_WATCHER_SCORE_TIMING_UPPER - const scoreSuccess = this.hub.CDP_WATCHER_SCORE_SUCCESS + const lowerBound = this.hub.CDP_WATCHER_COST_TIMING_LOWER_MS + const upperBound = this.hub.CDP_WATCHER_COST_TIMING_UPPER_MS + const costTiming = this.hub.CDP_WATCHER_COST_TIMING const ratio = Math.max(totalDurationMs - lowerBound, 0) / (upperBound - lowerBound) - change += Math.round(scoreSuccess - scoreSuccess * ratio) + cost += Math.round(costTiming * ratio) } if (result.error) { - change += this.hub.CDP_WATCHER_SCORE_ERROR // Errors incur medium penalties + cost += this.hub.CDP_WATCHER_COST_ERROR } - changes[result.invocation.hogFunctionId] = change - }) - - await this.runRedis(async (client) => { - let pipeline = client.pipeline() - - const changeEntries = Object.entries(changes) - - changeEntries.forEach(([id, change]) => { - pipeline.incrby(`${REDIS_KEY_SCORES}/${id}`, change) - pipeline.expire(`${REDIS_KEY_SCORES}/${id}`, this.hub.CDP_WATCHER_TTL) - }) - - const results = await pipeline.exec() + console.log('COST', cost) - pipeline = client.pipeline() - - changeEntries.forEach(([id], index) => { - const [err, value] = results[index * 2] - let override: number | null = null + costs[result.invocation.hogFunctionId] = cost + }) - if (err) { - // If there was an error, we can just skip - } else if (value < this.hub.CDP_WATCHER_THRESHOLD_DISABLED) { - override = this.hub.CDP_WATCHER_THRESHOLD_DISABLED - } else if (value > this.hub.CDP_WATCHER_THRESHOLD_HEALTHY) { - override = this.hub.CDP_WATCHER_THRESHOLD_HEALTHY - } + const res = await this.runRedis(async (client) => { + const pipeline = client.pipeline() - if (override !== null) { - pipeline.set(`${REDIS_KEY_SCORES}/${id}`, override) - pipeline.expire(`${REDIS_KEY_SCORES}/${id}`, this.hub.CDP_WATCHER_TTL) - } + Object.entries(costs).forEach(([id, change]) => { + pipeline.checkRateLimit(...this.rateLimitArgs(id, change)) }) - await pipeline.exec() + return await pipeline.exec() }) + + console.log('Observe results', res) } } diff --git a/plugin-server/src/cdp/lua/token-bucket.lua b/plugin-server/src/cdp/lua/token-bucket.lua new file mode 100644 index 0000000000000..6d0f1d91a71b8 --- /dev/null +++ b/plugin-server/src/cdp/lua/token-bucket.lua @@ -0,0 +1,48 @@ +local key = KEYS[1] +local now = ARGV[1] +local cost = ARGV[2] +local poolMax = ARGV[3] +local fillRate = ARGV[4] +local expiry = ARGV[5] +local before = redis.call('hget', key, 'ts') + +-- If we don't have a timestamp then we set it to now and fill up the bucket +if before == false then + local ret = poolMax - cost + redis.call('hset', key, 'ts', now) + redis.call('hset', key, 'pool', ret) + -- redis.call('expire', key, expiry) + return ret +end + +-- We update the timestamp if it has changed +local timediff = now - before + +if timediff > 0 then + redis.call('hset', key, 'ts', now) +else + timediff = 0 +end + +-- Calculate how much should be refilled in the bucket and add it +local owedTokens = timediff / fillRate +local currentTokens = redis.call('hget', key, 'pool') + +if currentTokens == false then + currentTokens = poolMax +end + +currentTokens = math.min(currentTokens + owedTokens, poolMax) + +-- Remove the cost and return the new number of tokens +if currentTokens - cost >= 0 then + currentTokens = currentTokens - cost +else + currentTokens = -1 +end + +redis.call('hset', key, 'pool', currentTokens) +redis.call('expire', key, expiry) + +-- Finally return the value - if it's negative then we've hit the limit +return currentTokens \ No newline at end of file diff --git a/plugin-server/src/cdp/redis/token-bucket.lua b/plugin-server/src/cdp/redis/token-bucket.lua deleted file mode 100644 index 7b62d5ee84869..0000000000000 --- a/plugin-server/src/cdp/redis/token-bucket.lua +++ /dev/null @@ -1,47 +0,0 @@ -local key = KEYS[1] -local now = ARGV[1] -local cost = ARGV[2] -local poolMax = tonumber(ARGV[3]) -local fillRate = ARGV[4] -local expiry = ARGV[5] - -local timestampKey = key..'timestamp' -local poolKey = key..'pool' - -local before = redis.call('get', timestampKey) - -if before == false then - redis.call('set', timestampKey, now, 'ex', expiry) - - local ret = poolMax - cost - redis.call('set', poolKey, ret, 'ex', expiry) - return tostring(ret) -end - -local timediff = now - before - -if timediff > 0 then - redis.call('set', timestampKey, now, 'ex', expiry) -else - timediff = 0 -end - -local owed = timediff / fillRate -local r = redis.call('get', poolKey) - -if r == false then - r = poolMax -end - -r = math.min(r + owed, poolMax) - -local limit = 1 -if r - cost >= 0 then - r = r - cost -else - limit = -1 -end - -redis.call('set', poolKey, r, 'ex', expiry) - -return tostring(r * limit) \ No newline at end of file diff --git a/plugin-server/src/config/config.ts b/plugin-server/src/config/config.ts index 591ee7e2fef8a..d91e1e3f22240 100644 --- a/plugin-server/src/config/config.ts +++ b/plugin-server/src/config/config.ts @@ -176,15 +176,15 @@ export function getDefaultConfig(): PluginsServerConfig { SESSION_RECORDING_KAFKA_CONSUMPTION_STATISTICS_EVENT_INTERVAL_MS: 0, // 0 disables stats collection SESSION_RECORDING_KAFKA_FETCH_MIN_BYTES: 1_048_576, // 1MB // CDP - CDP_WATCHER_SCORE_ERROR: -20, - CDP_WATCHER_SCORE_SUCCESS: 10, - CDP_WATCHER_SCORE_TIMING_UPPER: 5000, - CDP_WATCHER_SCORE_TIMING_LOWER: 100, - CDP_WATCHER_THRESHOLD_HEALTHY: 10000, - CDP_WATCHER_THRESHOLD_DEGRADED: -1000, - CDP_WATCHER_THRESHOLD_DISABLED: -10000, + CDP_WATCHER_COST_ERROR: 20, + CDP_WATCHER_COST_TIMING: 100, + CDP_WATCHER_COST_TIMING_LOWER_MS: 100, + CDP_WATCHER_COST_TIMING_UPPER_MS: 5000, + CDP_WATCHER_THRESHOLD_DEGRADED: 0.8, + CDP_WATCHER_BUCKET_SIZE: 10000, CDP_WATCHER_DISABLED_TTL: 60 * 10, - CDP_WATCHER_TTL: 60 * 60, + CDP_WATCHER_TTL: 60 * 10, + CDP_WATCHER_REFILL_RATE: 10, CDP_WATCHER_DISABLED_TEMPORARY_MAX_COUNT: 3, CDP_ASYNC_FUNCTIONS_RUSTY_HOOK_TEAMS: '', } diff --git a/plugin-server/src/types.ts b/plugin-server/src/types.ts index 4fb947d66e0cc..226c167e10dc3 100644 --- a/plugin-server/src/types.ts +++ b/plugin-server/src/types.ts @@ -96,15 +96,15 @@ export const stringToPluginServerMode = Object.fromEntries( ) as Record export type CdpConfig = { - CDP_WATCHER_SCORE_ERROR: number - CDP_WATCHER_SCORE_SUCCESS: number - CDP_WATCHER_SCORE_TIMING_UPPER: number - CDP_WATCHER_SCORE_TIMING_LOWER: number - CDP_WATCHER_THRESHOLD_HEALTHY: number // The max score a function can have - CDP_WATCHER_THRESHOLD_DEGRADED: number // The score below which a function is moved to degraded - CDP_WATCHER_THRESHOLD_DISABLED: number // The score below which a function is moved to disabled + CDP_WATCHER_COST_ERROR: number // The max cost of an erroring function + CDP_WATCHER_COST_TIMING: number // The max cost of a slow function + CDP_WATCHER_COST_TIMING_LOWER_MS: number // The lower bound in ms where the timing cost is not incurred + CDP_WATCHER_COST_TIMING_UPPER_MS: number // The upper bound in ms where the timing cost is fully incurred + CDP_WATCHER_THRESHOLD_DEGRADED: number // Percentage of the bucket where we count it as degraded + CDP_WATCHER_BUCKET_SIZE: number // The total bucket size CDP_WATCHER_DISABLED_TTL: number // How long a function should be temporarily disabled for - CDP_WATCHER_TTL: number // How long a function's observation should be valid for before resetting + CDP_WATCHER_TTL: number // The expiry for the rate limit key + CDP_WATCHER_REFILL_RATE: number // The number of tokens to be refilled per second CDP_WATCHER_DISABLED_TEMPORARY_MAX_COUNT: number // How many times a function can be disabled before it is disabled permanently CDP_ASYNC_FUNCTIONS_RUSTY_HOOK_TEAMS: string } diff --git a/plugin-server/tests/cdp/hog-watcher.test.ts b/plugin-server/tests/cdp/hog-watcher.test.ts index 696e74bd9c240..29070c1df65f2 100644 --- a/plugin-server/tests/cdp/hog-watcher.test.ts +++ b/plugin-server/tests/cdp/hog-watcher.test.ts @@ -55,19 +55,19 @@ describe('HogWatcher', () => { it('should retrieve empty state', async () => { const res = await watcher.getStates(['id1', 'id2']) expect(res).toEqual({ - id1: { score: 0, state: 1 }, - id2: { score: 0, state: 1 }, + id1: { rating: 0, state: 1 }, + id2: { rating: 0, state: 1 }, }) }) - const cases: [{ score: number; state: number }, HogFunctionInvocationResult[]][] = [ - [{ score: 10, state: 1 }, [createResult({ id: 'id1' })]], + const cases: [{ cost: number; state: number }, HogFunctionInvocationResult[]][] = [ + [{ cost: 0, state: 1 }, [createResult({ id: 'id1' })]], [ - { score: 30, state: 1 }, + { cost: 0, state: 1 }, [createResult({ id: 'id1' }), createResult({ id: 'id1' }), createResult({ id: 'id1' })], ], [ - { score: 30, state: 1 }, + { cost: 0, state: 1 }, [ createResult({ id: 'id1', duration: 10 }), createResult({ id: 'id1', duration: 20 }), @@ -75,31 +75,34 @@ describe('HogWatcher', () => { ], ], [ - { score: 24, state: 1 }, + { cost: 24, state: 1 }, [ createResult({ id: 'id1', duration: 1000 }), createResult({ id: 'id1', duration: 1000 }), createResult({ id: 'id1', duration: 1000 }), ], ], - [{ score: 0, state: 1 }, [createResult({ id: 'id1', duration: 5000 })]], - [{ score: -10, state: 1 }, [createResult({ id: 'id1', duration: 10000 })]], - [ - { score: -41, state: 1 }, - [ - createResult({ id: 'id1', duration: 5000 }), - createResult({ id: 'id1', duration: 10000 }), - createResult({ id: 'id1', duration: 20000 }), - ], - ], - - [{ score: -10, state: 1 }, [createResult({ id: 'id1', error: 'errored!' })]], + // [{ tokens: 0, state: 1 }, [createResult({ id: 'id1', duration: 5000 })]], + // [{ tokens: -10, state: 1 }, [createResult({ id: 'id1', duration: 10000 })]], + // [ + // { tokens: -41, state: 1 }, + // [ + // createResult({ id: 'id1', duration: 5000 }), + // createResult({ id: 'id1', duration: 10000 }), + // createResult({ id: 'id1', duration: 20000 }), + // ], + // ], + + // [{ tokens: -10, state: 1 }, [createResult({ id: 'id1', error: 'errored!' })]], ] - it.each(cases)('should update scores based on results %s %s', async (expectedScore, results) => { + it.each(cases)('should update tokens based on results %s %s', async (expectedScore, results) => { await watcher.observeResults(results) expect(await watcher.getStates(['id1'])).toMatchObject({ - id1: expectedScore, + id1: { + tokens: config.CDP_WATCHER_BUCKET_SIZE - expectedScore.cost, + state: expectedScore.state, + }, }) }) @@ -109,7 +112,7 @@ describe('HogWatcher', () => { await watcher.observeResults(lotsOfResults) expect(await watcher.getStates(['id1'])).toMatchObject({ - id1: { score: config.CDP_WATCHER_THRESHOLD_DISABLED, state: 3 }, + id1: { tokens: 0, state: 3 }, }) lotsOfResults = Array(10000).fill(createResult({ id: 'id2' })) @@ -117,12 +120,12 @@ describe('HogWatcher', () => { await watcher.observeResults(lotsOfResults) expect(await watcher.getStates(['id2'])).toMatchObject({ - id2: { score: config.CDP_WATCHER_THRESHOLD_HEALTHY, state: 1 }, + id2: { tokens: 0, state: 1 }, }) }) // it('should move the function into a bad state after enough periods', async () => { - // // We need to move N times forward to get past the masking period and have enough ratings to make a decision + // // We need to move N times forward to get past the masking period and have enough tokenss to make a decision // // 2 for the persistance of the ratings, 3 more for the evaluation, 3 more for the subsequent evaluation // for (let i = 0; i < 2 + 3 + 3; i++) { // watcher1.currentObservations.observeResults([createResult('id1', false, 'error')])