Skip to content

Commit

Permalink
Fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
benjackwhite committed Aug 7, 2024
1 parent a98dea4 commit 8b5d0f1
Show file tree
Hide file tree
Showing 6 changed files with 162 additions and 179 deletions.
163 changes: 71 additions & 92 deletions plugin-server/src/cdp/hog-watcher.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { captureException } from '@sentry/node'
import { readFileSync } from 'fs'
import { Redis } from 'ioredis'
import { Pipeline, Redis } from 'ioredis'
import path from 'path'

import { Hub } from '../types'
Expand All @@ -9,9 +9,11 @@ import { status } from '../utils/status'
import { HogFunctionInvocationResult, HogFunctionType } from './types'

export const BASE_REDIS_KEY = process.env.NODE_ENV == 'test' ? '@posthog-test/hog-watcher' : '@posthog/hog-watcher'
const REDIS_KEY_SCORES = `${BASE_REDIS_KEY}/scores`
const REDIS_KEY_HEALTH = `${BASE_REDIS_KEY}/health`
const REDIS_TIMEOUT_SECONDS = 5

const LUA_TOKEN_BUCKET = readFileSync(path.join(__dirname, 'lua', 'token-bucket.lua')).toString()

export enum HogWatcherState {
healthy = 1,
degraded = 2,
Expand All @@ -27,34 +29,43 @@ export enum HogWatcherState {

export type HogWatcherFunctionState = {
state: HogWatcherState
score: number
tokens: number
rating: number
}

type WithCheckRateLimit<T> = {
checkRateLimit: (key: string, now: number, cost: number, poolMax: number, fillRate: number, expiry: number) => T
}

type HogWatcherRedisClientPipeline = Pipeline & WithCheckRateLimit<number>

type HogWatcherRedisClient = Omit<Redis, 'pipeline'> &
WithCheckRateLimit<Promise<number>> & {
pipeline: () => HogWatcherRedisClientPipeline
}

export class HogWatcher {
constructor(private hub: Hub) {}

private async runRedis<T>(fn: (client: Redis) => Promise<T>): Promise<T | null> {
private rateLimitArgs(id: HogFunctionType['id'], cost: number) {
const now = Date.now()
return [
`${REDIS_KEY_HEALTH}/${id}`,
now,
cost,
this.hub.CDP_WATCHER_BUCKET_SIZE,
this.hub.CDP_WATCHER_REFILL_RATE,
this.hub.CDP_WATCHER_TTL,
] as const
}

private async runRedis<T>(fn: (client: HogWatcherRedisClient) => Promise<T>): Promise<T | null> {
// We want all of this to fail open in the issue of redis being unavailable - we'd rather have the function continue
const client = await this.hub.redisPool.acquire()

client.defineCommand('consumeTokenBucket', {
numberOfKeys: 2,
lua: `
local key = KEYS[1]
local bucket_size = tonumber(ARGV[1])
local refill_rate = tonumber(ARGV[2])
local cost = tonumber(ARGV[3])
local current = tonumber(redis.call('get', key) or 0)
local refill_amount = math.max(0, (current - refill_rate) + cost)
if refill_amount > bucket_size then
return 0
end
redis.call('set', key, refill_amount)
return 1
`,
client.defineCommand('checkRateLimit', {
numberOfKeys: 1,
lua: LUA_TOKEN_BUCKET,
})

const timeout = timeoutGuard(
Expand All @@ -63,7 +74,7 @@ export class HogWatcher {
REDIS_TIMEOUT_SECONDS * 1000
)
try {
return await fn(client)
return await fn(client as HogWatcherRedisClient)
} catch (e) {
status.error('HogWatcher Redis error', e)
captureException(e)
Expand All @@ -74,55 +85,46 @@ export class HogWatcher {
}
}

public scoreToState(score: number): HogWatcherState {
if (score <= this.hub.CDP_WATCHER_THRESHOLD_DISABLED) {
return HogWatcherState.disabledForPeriod
} else if (score <= this.hub.CDP_WATCHER_THRESHOLD_DEGRADED) {
return HogWatcherState.degraded
}
public tokensToFunctionState(tokens?: number | null): HogWatcherFunctionState {
tokens = tokens ?? this.hub.CDP_WATCHER_BUCKET_SIZE
const rating = tokens / this.hub.CDP_WATCHER_BUCKET_SIZE

return HogWatcherState.healthy
const state =
rating >= this.hub.CDP_WATCHER_THRESHOLD_DEGRADED
? HogWatcherState.healthy
: rating > 0
? HogWatcherState.degraded
: HogWatcherState.disabledForPeriod

return { state, tokens, rating }
}

public async getStates(
ids: HogFunctionType['id'][]
): Promise<Record<HogFunctionType['id'], HogWatcherFunctionState>> {
const idsSet = new Set(ids)

const states = await this.runRedis(async (client) => {
const buckets = await this.runRedis(async (client) => {
const pipeline = client.pipeline()

for (const id of idsSet) {
pipeline.get(`${REDIS_KEY_SCORES}/${id}`)
pipeline.checkRateLimit(...this.rateLimitArgs(id, 0))
}

return pipeline.exec()
})

return Array.from(idsSet).reduce((acc, id, index) => {
const score = states ? Number(states[index][1]) : 0
return {
...acc,
[id]: {
state: this.scoreToState(score),
score,
},
[id]: this.tokensToFunctionState(buckets ? buckets[index][1] : undefined),
}
}, {} as Record<HogFunctionType['id'], HogWatcherFunctionState>)
}

public async getState(id: HogFunctionType['id']): Promise<HogWatcherFunctionState> {
const res = await this.runRedis(async (client) => {
const score = await client.get(`${REDIS_KEY_SCORES}/${id}`)
return score
})

const score = Number(res ?? 0)

return {
state: this.scoreToState(score),
score,
}
const res = await this.runRedis(async (client) => client.checkRateLimit(...this.rateLimitArgs(id, 0)))
return this.tokensToFunctionState(res)
}

public async forceStateChange(id: HogFunctionType['id'], state: HogWatcherState): Promise<void> {
Expand All @@ -131,77 +133,54 @@ export class HogWatcher {

const newScore =
state === HogWatcherState.healthy
? 0
? this.hub.CDP_WATCHER_BUCKET_SIZE
: state === HogWatcherState.degraded
? this.hub.CDP_WATCHER_THRESHOLD_DEGRADED
: this.hub.CDP_WATCHER_THRESHOLD_DISABLED
? this.hub.CDP_WATCHER_BUCKET_SIZE * this.hub.CDP_WATCHER_THRESHOLD_DEGRADED
: 0

pipeline.set(`${REDIS_KEY_SCORES}/${id}`, newScore)
pipeline.expire(`${REDIS_KEY_SCORES}/${id}`, this.hub.CDP_WATCHER_TTL)
pipeline.hset(`${REDIS_KEY_HEALTH}/${id}`, 'pool', newScore)

await pipeline.exec()
})
}

public async observeResults(results: HogFunctionInvocationResult[]): Promise<void> {
const changes: Record<HogFunctionType['id'], number> = {}
const costs: Record<HogFunctionType['id'], number> = {}

results.forEach((result) => {
let change = (changes[result.invocation.hogFunctionId] = changes[result.invocation.hogFunctionId] || 0)
let cost = (costs[result.invocation.hogFunctionId] = costs[result.invocation.hogFunctionId] || 0)

if (result.finished) {
// If it is finished we can calculate the score based off of the timings

const totalDurationMs = result.invocation.timings.reduce((acc, timing) => acc + timing.duration_ms, 0)

const lowerBound = this.hub.CDP_WATCHER_SCORE_TIMING_LOWER
const upperBound = this.hub.CDP_WATCHER_SCORE_TIMING_UPPER
const scoreSuccess = this.hub.CDP_WATCHER_SCORE_SUCCESS
const lowerBound = this.hub.CDP_WATCHER_COST_TIMING_LOWER_MS
const upperBound = this.hub.CDP_WATCHER_COST_TIMING_UPPER_MS
const costTiming = this.hub.CDP_WATCHER_COST_TIMING
const ratio = Math.max(totalDurationMs - lowerBound, 0) / (upperBound - lowerBound)

change += Math.round(scoreSuccess - scoreSuccess * ratio)
cost += Math.round(costTiming * ratio)
}

if (result.error) {
change += this.hub.CDP_WATCHER_SCORE_ERROR // Errors incur medium penalties
cost += this.hub.CDP_WATCHER_COST_ERROR
}

changes[result.invocation.hogFunctionId] = change
})

await this.runRedis(async (client) => {
let pipeline = client.pipeline()

const changeEntries = Object.entries(changes)

changeEntries.forEach(([id, change]) => {
pipeline.incrby(`${REDIS_KEY_SCORES}/${id}`, change)
pipeline.expire(`${REDIS_KEY_SCORES}/${id}`, this.hub.CDP_WATCHER_TTL)
})

const results = await pipeline.exec()
console.log('COST', cost)

pipeline = client.pipeline()

changeEntries.forEach(([id], index) => {
const [err, value] = results[index * 2]
let override: number | null = null
costs[result.invocation.hogFunctionId] = cost
})

if (err) {
// If there was an error, we can just skip
} else if (value < this.hub.CDP_WATCHER_THRESHOLD_DISABLED) {
override = this.hub.CDP_WATCHER_THRESHOLD_DISABLED
} else if (value > this.hub.CDP_WATCHER_THRESHOLD_HEALTHY) {
override = this.hub.CDP_WATCHER_THRESHOLD_HEALTHY
}
const res = await this.runRedis(async (client) => {
const pipeline = client.pipeline()

if (override !== null) {
pipeline.set(`${REDIS_KEY_SCORES}/${id}`, override)
pipeline.expire(`${REDIS_KEY_SCORES}/${id}`, this.hub.CDP_WATCHER_TTL)
}
Object.entries(costs).forEach(([id, change]) => {
pipeline.checkRateLimit(...this.rateLimitArgs(id, change))
})

await pipeline.exec()
return await pipeline.exec()
})

console.log('Observe results', res)
}
}
48 changes: 48 additions & 0 deletions plugin-server/src/cdp/lua/token-bucket.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
local key = KEYS[1]
local now = ARGV[1]
local cost = ARGV[2]
local poolMax = ARGV[3]
local fillRate = ARGV[4]
local expiry = ARGV[5]
local before = redis.call('hget', key, 'ts')

-- If we don't have a timestamp then we set it to now and fill up the bucket
if before == false then
local ret = poolMax - cost
redis.call('hset', key, 'ts', now)
redis.call('hset', key, 'pool', ret)
-- redis.call('expire', key, expiry)
return ret
end

-- We update the timestamp if it has changed
local timediff = now - before

if timediff > 0 then
redis.call('hset', key, 'ts', now)
else
timediff = 0
end

-- Calculate how much should be refilled in the bucket and add it
local owedTokens = timediff / fillRate
local currentTokens = redis.call('hget', key, 'pool')

if currentTokens == false then
currentTokens = poolMax
end

currentTokens = math.min(currentTokens + owedTokens, poolMax)

-- Remove the cost and return the new number of tokens
if currentTokens - cost >= 0 then
currentTokens = currentTokens - cost
else
currentTokens = -1
end

redis.call('hset', key, 'pool', currentTokens)
redis.call('expire', key, expiry)

-- Finally return the value - if it's negative then we've hit the limit
return currentTokens
47 changes: 0 additions & 47 deletions plugin-server/src/cdp/redis/token-bucket.lua

This file was deleted.

16 changes: 8 additions & 8 deletions plugin-server/src/config/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -176,15 +176,15 @@ export function getDefaultConfig(): PluginsServerConfig {
SESSION_RECORDING_KAFKA_CONSUMPTION_STATISTICS_EVENT_INTERVAL_MS: 0, // 0 disables stats collection
SESSION_RECORDING_KAFKA_FETCH_MIN_BYTES: 1_048_576, // 1MB
// CDP
CDP_WATCHER_SCORE_ERROR: -20,
CDP_WATCHER_SCORE_SUCCESS: 10,
CDP_WATCHER_SCORE_TIMING_UPPER: 5000,
CDP_WATCHER_SCORE_TIMING_LOWER: 100,
CDP_WATCHER_THRESHOLD_HEALTHY: 10000,
CDP_WATCHER_THRESHOLD_DEGRADED: -1000,
CDP_WATCHER_THRESHOLD_DISABLED: -10000,
CDP_WATCHER_COST_ERROR: 20,
CDP_WATCHER_COST_TIMING: 100,
CDP_WATCHER_COST_TIMING_LOWER_MS: 100,
CDP_WATCHER_COST_TIMING_UPPER_MS: 5000,
CDP_WATCHER_THRESHOLD_DEGRADED: 0.8,
CDP_WATCHER_BUCKET_SIZE: 10000,
CDP_WATCHER_DISABLED_TTL: 60 * 10,
CDP_WATCHER_TTL: 60 * 60,
CDP_WATCHER_TTL: 60 * 10,
CDP_WATCHER_REFILL_RATE: 10,
CDP_WATCHER_DISABLED_TEMPORARY_MAX_COUNT: 3,
CDP_ASYNC_FUNCTIONS_RUSTY_HOOK_TEAMS: '',
}
Expand Down
Loading

0 comments on commit 8b5d0f1

Please sign in to comment.