diff --git a/plugin-server/src/cdp/hog-watcher.ts b/plugin-server/src/cdp/hog-watcher.ts index ff8c1d102a0c4..f191989170469 100644 --- a/plugin-server/src/cdp/hog-watcher.ts +++ b/plugin-server/src/cdp/hog-watcher.ts @@ -10,7 +10,8 @@ import { status } from '../utils/status' import { HogFunctionInvocationResult, HogFunctionType } from './types' export const BASE_REDIS_KEY = process.env.NODE_ENV == 'test' ? '@posthog-test/hog-watcher' : '@posthog/hog-watcher' -const REDIS_KEY_HEALTH = `${BASE_REDIS_KEY}/health` +const REDIS_KEY_TOKENS = `${BASE_REDIS_KEY}/tokens` +const REDIS_KEY_DISABLED = `${BASE_REDIS_KEY}/disabled` const REDIS_TIMEOUT_SECONDS = 5 const LUA_TOKEN_BUCKET = readFileSync(path.join(__dirname, 'lua', 'token-bucket.lua')).toString() @@ -51,7 +52,7 @@ export class HogWatcher { private rateLimitArgs(id: HogFunctionType['id'], cost: number) { const nowSeconds = Math.round(now() / 1000) return [ - `${REDIS_KEY_HEALTH}/${id}`, + `${REDIS_KEY_TOKENS}/${id}`, nowSeconds, cost, this.hub.CDP_WATCHER_BUCKET_SIZE, @@ -110,6 +111,8 @@ export class HogWatcher { for (const id of idsSet) { pipeline.checkRateLimit(...this.rateLimitArgs(id, 0)) + // Also check if it is actively disabled + pipeline.get(`${REDIS_KEY_DISABLED}/${id}`, 0, 'EX', this.hub.CDP_WATCHER_TTL) } return pipeline.exec() @@ -124,8 +127,8 @@ export class HogWatcher { } public async getState(id: HogFunctionType['id']): Promise { - const res = await this.runRedis(async (client) => client.checkRateLimit(...this.rateLimitArgs(id, 0))) - return this.tokensToFunctionState(res) + const res = await this.getStates([id]) + return res[id] } public async forceStateChange(id: HogFunctionType['id'], state: HogWatcherState): Promise { @@ -139,7 +142,7 @@ export class HogWatcher { ? this.hub.CDP_WATCHER_BUCKET_SIZE * this.hub.CDP_WATCHER_THRESHOLD_DEGRADED : 0 - pipeline.hset(`${REDIS_KEY_HEALTH}/${id}`, 'pool', newScore) + pipeline.hset(`${REDIS_KEY_TOKENS}/${id}`, 'pool', newScore) await pipeline.exec() })