Skip to content

Commit

Permalink
fix(plugin): RL instances sync to the same DB at same rate (#12003)
Browse files Browse the repository at this point in the history
All rate-limiting plugin instance syncs with the same plugin config, that is the very first config got hit by a request, and they all sync with the same rate. Even a config update won't change the DB to be synced.

The timer will sync not just the same instance's counters but all counters in the same DB. This is a compromise given the emergency and we prefer simplicity over correctness for this behavior.

Full changelog
- The counter table is split with DB;
- Timers are created when a request hits;
- The sync_rate is guaranteed with limited running timers and timer delay
- Cover the case in the integration test by "with_sync_rate"

Fix KAG-2904

Co-authored-by: samugi <[email protected]>
  • Loading branch information
StarlightIbuki and samugi authored Nov 17, 2023
1 parent a382576 commit a355d01
Show file tree
Hide file tree
Showing 3 changed files with 323 additions and 246 deletions.
3 changes: 3 additions & 0 deletions changelog/unreleased/kong/rl-shared-sync-timer.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
message: "**Rate Limiting**: fix an issuer where all counters are synced to the same DB at the same rate."
type: bugfix
scope: Plugin
162 changes: 116 additions & 46 deletions kong/plugins/rate-limiting/policies/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,32 @@ local SYNC_RATE_REALTIME = -1

local EMPTY_UUID = "00000000-0000-0000-0000-000000000000"

-- for `conf.sync_rate > 0`
local auto_sync_timer
local EMPTY = {}

local cur_usage = {
--[[
[cache_key] = <integer>
[db_key][cache_key] = <integer>
--]]
}

local cur_usage_expire_at = {
--[[
[cache_key] = <integer>
[db_key][cache_key] = <integer>
--]]
}

local cur_delta = {
--[[
[cache_key] = <integer>
[db_key][cache_key] = <integer>
--]]
}

local function init_tables(db_key)
cur_usage[db_key] = cur_usage[db_key] or {}
cur_usage_expire_at[db_key] = cur_usage_expire_at[db_key] or {}
cur_delta[db_key] = cur_delta[db_key] or {}
end


local function is_present(str)
return str and str ~= "" and str ~= null
Expand Down Expand Up @@ -73,6 +78,13 @@ local sock_opts = {}

local EXPIRATION = require "kong.plugins.rate-limiting.expiration"

local function get_db_key(conf)
return fmt("%s:%d;%d",
conf.redis_host,
conf.redis_port,
conf.redis_database)
end


local function get_redis_connection(conf)
local red = redis:new()
Expand All @@ -82,26 +94,25 @@ local function get_redis_connection(conf)
sock_opts.ssl_verify = conf.redis_ssl_verify
sock_opts.server_name = conf.redis_server_name

local db_key = get_db_key(conf)

-- use a special pool name only if redis_database is set to non-zero
-- otherwise use the default pool name host:port
if conf.redis_database ~= 0 then
sock_opts.pool = fmt( "%s:%d;%d",
conf.redis_host,
conf.redis_port,
conf.redis_database)
sock_opts.pool = db_key
end

local ok, err = red:connect(conf.redis_host, conf.redis_port,
sock_opts)
if not ok then
kong.log.err("failed to connect to Redis: ", err)
return nil, err
return nil, db_key, err
end

local times, err = red:get_reused_times()
if err then
kong.log.err("failed to get connect reused times: ", err)
return nil, err
return nil, db_key, err
end

if times == 0 then
Expand All @@ -118,7 +129,7 @@ local function get_redis_connection(conf)
end
if not ok then
kong.log.err("failed to auth Redis: ", err)
return nil, err
return nil, db_key, err
end
end

Expand All @@ -129,86 +140,143 @@ local function get_redis_connection(conf)
local ok, err = red:select(conf.redis_database)
if not ok then
kong.log.err("failed to change Redis database: ", err)
return nil, err
return nil, db_key, err
end
end
end

return red
return red, db_key, err
end

local function clear_local_counter()
table_clear(cur_usage)
table_clear(cur_usage_expire_at)
table_clear(cur_delta)
local function clear_local_counter(db_key)
-- for config updates a db may no longer be used but this happens rarely
-- and unlikely there will be a lot of them. So we choose to not remove the table
-- but just clear it, as recreating the table will be more expensive
table_clear(cur_usage[db_key])
table_clear(cur_usage_expire_at[db_key])
table_clear(cur_delta[db_key])
end

local function sync_to_redis(premature, conf)
if premature then
return
end

local red, err = get_redis_connection(conf)
local red, db_key, err = get_redis_connection(conf)
if not red then
kong.log.err("[rate-limiting] failed to connect to Redis: ", err)
clear_local_counter()
clear_local_counter(db_key)
return
end

red:init_pipeline()

for cache_key, delta in pairs(cur_delta) do
for cache_key, delta in pairs(cur_delta[db_key] or EMPTY) do
red:eval([[
local key, value, expiration = KEYS[1], tonumber(ARGV[1]), ARGV[2]
local exists = redis.call("exists", key)
redis.call("incrby", key, value)
if not exists or exists == 0 then
redis.call("expireat", key, expiration)
end
]], 1, cache_key, delta, cur_usage_expire_at[cache_key])
]], 1, cache_key, delta, cur_usage_expire_at[db_key][cache_key])
end

local _, err = red:commit_pipeline()
if err then
kong.log.err("[rate-limiting] failed to commit increment pipeline in Redis: ", err)
clear_local_counter()
clear_local_counter(db_key)
return
end

local ok, err = red:set_keepalive(10000, 100)
if not ok then
kong.log.err("[rate-limiting] failed to set Redis keepalive: ", err)
clear_local_counter()
clear_local_counter(db_key)
return
end

-- just clear these tables and avoid creating three new tables
clear_local_counter()
clear_local_counter(db_key)
end

local function periodical_sync(conf, sync_func)
if not auto_sync_timer then
local err
-- timer may be initialized after the module's loaded so we need to update the reference
auto_sync_timer, err = kong.timer:named_every("rate-limiting-auto-sync", conf.sync_rate, sync_func, conf)
local plugin_sync_pending = {}
local plugin_sync_running = {}

-- It's called "rate_limited_sync" because the sync timer itself
-- is rate-limited by the sync_rate.
-- It should be easy to prove that:
-- 1. There will be at most 2 timers per worker for a plugin instance
-- at any given time, 1 syncing and 1 pending (guaranteed by the locks)
-- 2. 2 timers will at least start with a sync_rate interval apart
-- 3. A change is always picked up by a pending timer and
-- will be sync to Redis at most sync_rate interval
local function rate_limited_sync(conf, sync_func)
local cache_key = conf.__key__ or conf.__plugin_id or "rate-limiting"
-- a timer is pending. The change will be picked up by the pending timer
if plugin_sync_pending[cache_key] then
return true
end

if not auto_sync_timer then
kong.log.err("failed to create timer: ", err)
return nil, err
-- The change may or may not be picked up by a running timer
-- let's start a pending timer to make sure the change is picked up
plugin_sync_pending[cache_key] = true
return kong.timer:at(conf.sync_rate, function(premature)
if premature then
-- we do not clear the pending flag to prevent more timers to be started
-- as they will also exit prematurely
return
end
end

return true
-- a "pending" state is never touched before the timer is started
assert(plugin_sync_pending[cache_key])


local tries = 0
-- a timer is already running.
-- the sleep time is picked to a seemingly reasonable value
while plugin_sync_running[cache_key] do
-- we should wait for at most 2 runs even if the connection times out
-- when this happens, we should not clear the "running" state as it would
-- cause a race condition;
-- we don't want to clear the "pending" state and exit the timer either as
-- it's equivalent to waiting for more runs
if tries > 4 then
kong.log.emerg("A Redis sync is blocked by a previous try. " ..
"The previous try should have timed out but it didn't for unknown reasons.")
end

ngx.sleep(conf.redis_timeout / 2)
tries = tries + 1
end

plugin_sync_running[cache_key] = true

plugin_sync_pending[cache_key] = nil

-- given the condition, the counters will never be empty so no need to
-- check for empty tables and skip the sync
local ok, err = pcall(sync_func, premature, conf)
if not ok then
kong.log.err("[rate-limiting] error when syncing counters to Redis: ", err)
end

plugin_sync_running[cache_key] = nil
end)
end

local function update_local_counters(conf, periods, limits, identifier, value)
local db_key = get_db_key(conf)
init_tables(db_key)

for period, period_date in pairs(periods) do
if limits[period] then
local cache_key = get_local_key(conf, identifier, period, period_date)

cur_delta[cache_key] = (cur_delta[cache_key] or 0) + value
cur_delta[db_key][cache_key] = (cur_delta[db_key][cache_key] or 0) + value
end
end

end

return {
Expand Down Expand Up @@ -286,23 +354,25 @@ return {

else
update_local_counters(conf, periods, limits, identifier, value)
return periodical_sync(conf, sync_to_redis)
return rate_limited_sync(conf, sync_to_redis)
end
end,
usage = function(conf, identifier, period, current_timestamp)
local periods = timestamp.get_timestamps(current_timestamp)
local cache_key = get_local_key(conf, identifier, period, periods[period])
local db_key = get_db_key(conf)
init_tables(db_key)

-- use local cache to reduce the number of redis calls
-- also by pass the logic of incrementing the counter
if conf.sync_rate ~= SYNC_RATE_REALTIME and cur_usage[cache_key] then
if cur_usage_expire_at[cache_key] > ngx_time() then
return cur_usage[cache_key] + (cur_delta[cache_key] or 0)
if conf.sync_rate ~= SYNC_RATE_REALTIME and cur_usage[db_key][cache_key] then
if cur_usage_expire_at[db_key][cache_key] > ngx_time() then
return cur_usage[db_key][cache_key] + (cur_delta[db_key][cache_key] or 0)
end

cur_usage[cache_key] = 0
cur_usage_expire_at[cache_key] = periods[period] + EXPIRATION[period]
cur_delta[cache_key] = 0
cur_usage[db_key][cache_key] = 0
cur_usage_expire_at[db_key][cache_key] = periods[period] + EXPIRATION[period]
cur_delta[db_key][cache_key] = 0

return 0
end
Expand Down Expand Up @@ -339,11 +409,11 @@ return {
end

if conf.sync_rate ~= SYNC_RATE_REALTIME then
cur_usage[cache_key] = current_metric or 0
cur_usage_expire_at[cache_key] = periods[period] + EXPIRATION[period]
cur_usage[db_key][cache_key] = current_metric or 0
cur_usage_expire_at[db_key][cache_key] = periods[period] + EXPIRATION[period]
-- The key was just read from Redis using `incr`, which incremented it
-- by 1. Adjust the value to account for the prior increment.
cur_delta[cache_key] = -1
cur_delta[db_key][cache_key] = -1
end

return current_metric or 0
Expand Down
Loading

1 comment on commit a355d01

@khcp-gha-bot
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bazel Build

Docker image available kong/kong:a355d01cfdab7ab98f74a0230d57184ffeb86d92
Artifacts available https://github.com/Kong/kong/actions/runs/6899600370

Please sign in to comment.