Skip to content

Commit

Permalink
Fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
benjackwhite committed Aug 7, 2024
1 parent eb7b167 commit 3e791e0
Showing 1 changed file with 8 additions and 5 deletions.
13 changes: 8 additions & 5 deletions plugin-server/src/cdp/hog-watcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ import { status } from '../utils/status'
import { HogFunctionInvocationResult, HogFunctionType } from './types'

export const BASE_REDIS_KEY = process.env.NODE_ENV == 'test' ? '@posthog-test/hog-watcher' : '@posthog/hog-watcher'
const REDIS_KEY_HEALTH = `${BASE_REDIS_KEY}/health`
const REDIS_KEY_TOKENS = `${BASE_REDIS_KEY}/tokens`
const REDIS_KEY_DISABLED = `${BASE_REDIS_KEY}/disabled`
const REDIS_TIMEOUT_SECONDS = 5

const LUA_TOKEN_BUCKET = readFileSync(path.join(__dirname, 'lua', 'token-bucket.lua')).toString()
Expand Down Expand Up @@ -51,7 +52,7 @@ export class HogWatcher {
private rateLimitArgs(id: HogFunctionType['id'], cost: number) {
const nowSeconds = Math.round(now() / 1000)
return [
`${REDIS_KEY_HEALTH}/${id}`,
`${REDIS_KEY_TOKENS}/${id}`,
nowSeconds,
cost,
this.hub.CDP_WATCHER_BUCKET_SIZE,
Expand Down Expand Up @@ -110,6 +111,8 @@ export class HogWatcher {

for (const id of idsSet) {
pipeline.checkRateLimit(...this.rateLimitArgs(id, 0))
// Also check if it is actively disabled
pipeline.get(`${REDIS_KEY_DISABLED}/${id}`, 0, 'EX', this.hub.CDP_WATCHER_TTL)
}

return pipeline.exec()
Expand All @@ -124,8 +127,8 @@ export class HogWatcher {
}

public async getState(id: HogFunctionType['id']): Promise<HogWatcherFunctionState> {
const res = await this.runRedis(async (client) => client.checkRateLimit(...this.rateLimitArgs(id, 0)))
return this.tokensToFunctionState(res)
const res = await this.getStates([id])
return res[id]
}

public async forceStateChange(id: HogFunctionType['id'], state: HogWatcherState): Promise<void> {
Expand All @@ -139,7 +142,7 @@ export class HogWatcher {
? this.hub.CDP_WATCHER_BUCKET_SIZE * this.hub.CDP_WATCHER_THRESHOLD_DEGRADED
: 0

pipeline.hset(`${REDIS_KEY_HEALTH}/${id}`, 'pool', newScore)
pipeline.hset(`${REDIS_KEY_TOKENS}/${id}`, 'pool', newScore)

await pipeline.exec()
})
Expand Down

0 comments on commit 3e791e0

Please sign in to comment.