Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(plugin): RL instances sync to the same DB at same rate #12003

Merged
merged 1 commit into from
Nov 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions changelog/unreleased/kong/rl-shared-sync-timer.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
message: "**Rate Limiting**: fix an issuer where all counters are synced to the same DB at the same rate."
type: bugfix
scope: Plugin
162 changes: 116 additions & 46 deletions kong/plugins/rate-limiting/policies/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,32 @@ local SYNC_RATE_REALTIME = -1

local EMPTY_UUID = "00000000-0000-0000-0000-000000000000"

-- for `conf.sync_rate > 0`
local auto_sync_timer
local EMPTY = {}

local cur_usage = {
--[[
[cache_key] = <integer>
[db_key][cache_key] = <integer>
--]]
}

local cur_usage_expire_at = {
--[[
[cache_key] = <integer>
[db_key][cache_key] = <integer>
--]]
}

local cur_delta = {
--[[
[cache_key] = <integer>
[db_key][cache_key] = <integer>
--]]
}

local function init_tables(db_key)
cur_usage[db_key] = cur_usage[db_key] or {}
cur_usage_expire_at[db_key] = cur_usage_expire_at[db_key] or {}
cur_delta[db_key] = cur_delta[db_key] or {}
end


local function is_present(str)
return str and str ~= "" and str ~= null
Expand Down Expand Up @@ -73,6 +78,13 @@ local sock_opts = {}

local EXPIRATION = require "kong.plugins.rate-limiting.expiration"

local function get_db_key(conf)
return fmt("%s:%d;%d",
conf.redis_host,
conf.redis_port,
conf.redis_database)
end


local function get_redis_connection(conf)
local red = redis:new()
Expand All @@ -82,26 +94,25 @@ local function get_redis_connection(conf)
sock_opts.ssl_verify = conf.redis_ssl_verify
sock_opts.server_name = conf.redis_server_name

local db_key = get_db_key(conf)

-- use a special pool name only if redis_database is set to non-zero
-- otherwise use the default pool name host:port
if conf.redis_database ~= 0 then
sock_opts.pool = fmt( "%s:%d;%d",
conf.redis_host,
conf.redis_port,
conf.redis_database)
sock_opts.pool = db_key
end

local ok, err = red:connect(conf.redis_host, conf.redis_port,
sock_opts)
if not ok then
kong.log.err("failed to connect to Redis: ", err)
return nil, err
return nil, db_key, err
end

local times, err = red:get_reused_times()
if err then
kong.log.err("failed to get connect reused times: ", err)
return nil, err
return nil, db_key, err
end

if times == 0 then
Expand All @@ -118,7 +129,7 @@ local function get_redis_connection(conf)
end
if not ok then
kong.log.err("failed to auth Redis: ", err)
return nil, err
return nil, db_key, err
end
end

Expand All @@ -129,86 +140,143 @@ local function get_redis_connection(conf)
local ok, err = red:select(conf.redis_database)
if not ok then
kong.log.err("failed to change Redis database: ", err)
return nil, err
return nil, db_key, err
end
end
end

return red
return red, db_key, err
end

local function clear_local_counter()
table_clear(cur_usage)
table_clear(cur_usage_expire_at)
table_clear(cur_delta)
local function clear_local_counter(db_key)
-- for config updates a db may no longer be used but this happens rarely
-- and unlikely there will be a lot of them. So we choose to not remove the table
-- but just clear it, as recreating the table will be more expensive
table_clear(cur_usage[db_key])
table_clear(cur_usage_expire_at[db_key])
table_clear(cur_delta[db_key])
end

local function sync_to_redis(premature, conf)
if premature then
return
end

local red, err = get_redis_connection(conf)
local red, db_key, err = get_redis_connection(conf)
if not red then
kong.log.err("[rate-limiting] failed to connect to Redis: ", err)
clear_local_counter()
clear_local_counter(db_key)
return
end

red:init_pipeline()

for cache_key, delta in pairs(cur_delta) do
for cache_key, delta in pairs(cur_delta[db_key] or EMPTY) do
red:eval([[
local key, value, expiration = KEYS[1], tonumber(ARGV[1]), ARGV[2]
local exists = redis.call("exists", key)
redis.call("incrby", key, value)
if not exists or exists == 0 then
redis.call("expireat", key, expiration)
end
]], 1, cache_key, delta, cur_usage_expire_at[cache_key])
]], 1, cache_key, delta, cur_usage_expire_at[db_key][cache_key])
end

local _, err = red:commit_pipeline()
if err then
kong.log.err("[rate-limiting] failed to commit increment pipeline in Redis: ", err)
clear_local_counter()
clear_local_counter(db_key)
samugi marked this conversation as resolved.
Show resolved Hide resolved
return
end

local ok, err = red:set_keepalive(10000, 100)
if not ok then
kong.log.err("[rate-limiting] failed to set Redis keepalive: ", err)
clear_local_counter()
clear_local_counter(db_key)
return
end

-- just clear these tables and avoid creating three new tables
clear_local_counter()
clear_local_counter(db_key)
end

local function periodical_sync(conf, sync_func)
if not auto_sync_timer then
local err
-- timer may be initialized after the module's loaded so we need to update the reference
auto_sync_timer, err = kong.timer:named_every("rate-limiting-auto-sync", conf.sync_rate, sync_func, conf)
local plugin_sync_pending = {}
local plugin_sync_running = {}

-- It's called "rate_limited_sync" because the sync timer itself
-- is rate-limited by the sync_rate.
-- It should be easy to prove that:
-- 1. There will be at most 2 timers per worker for a plugin instance
-- at any given time, 1 syncing and 1 pending (guaranteed by the locks)
-- 2. 2 timers will at least start with a sync_rate interval apart
-- 3. A change is always picked up by a pending timer and
-- will be sync to Redis at most sync_rate interval
local function rate_limited_sync(conf, sync_func)
local cache_key = conf.__key__ or conf.__plugin_id or "rate-limiting"
-- a timer is pending. The change will be picked up by the pending timer
if plugin_sync_pending[cache_key] then
return true
end

if not auto_sync_timer then
kong.log.err("failed to create timer: ", err)
return nil, err
-- The change may or may not be picked up by a running timer
-- let's start a pending timer to make sure the change is picked up
plugin_sync_pending[cache_key] = true
return kong.timer:at(conf.sync_rate, function(premature)
if premature then
-- we do not clear the pending flag to prevent more timers to be started
-- as they will also exit prematurely
return
samugi marked this conversation as resolved.
Show resolved Hide resolved
end
end

return true
-- a "pending" state is never touched before the timer is started
assert(plugin_sync_pending[cache_key])


local tries = 0
-- a timer is already running.
-- the sleep time is picked to a seemingly reasonable value
while plugin_sync_running[cache_key] do
-- we should wait for at most 2 runs even if the connection times out
-- when this happens, we should not clear the "running" state as it would
-- cause a race condition;
-- we don't want to clear the "pending" state and exit the timer either as
-- it's equivalent to waiting for more runs
if tries > 4 then
kong.log.emerg("A Redis sync is blocked by a previous try. " ..
"The previous try should have timed out but it didn't for unknown reasons.")
end

ngx.sleep(conf.redis_timeout / 2)
tries = tries + 1
end

plugin_sync_running[cache_key] = true

plugin_sync_pending[cache_key] = nil

-- given the condition, the counters will never be empty so no need to
-- check for empty tables and skip the sync
local ok, err = pcall(sync_func, premature, conf)
if not ok then
kong.log.err("[rate-limiting] error when syncing counters to Redis: ", err)
end

plugin_sync_running[cache_key] = nil
end)
end

local function update_local_counters(conf, periods, limits, identifier, value)
local db_key = get_db_key(conf)
init_tables(db_key)

for period, period_date in pairs(periods) do
if limits[period] then
local cache_key = get_local_key(conf, identifier, period, period_date)

cur_delta[cache_key] = (cur_delta[cache_key] or 0) + value
cur_delta[db_key][cache_key] = (cur_delta[db_key][cache_key] or 0) + value
end
end

end

return {
Expand Down Expand Up @@ -286,23 +354,25 @@ return {

else
update_local_counters(conf, periods, limits, identifier, value)
return periodical_sync(conf, sync_to_redis)
return rate_limited_sync(conf, sync_to_redis)
end
end,
usage = function(conf, identifier, period, current_timestamp)
local periods = timestamp.get_timestamps(current_timestamp)
local cache_key = get_local_key(conf, identifier, period, periods[period])
local db_key = get_db_key(conf)
init_tables(db_key)

-- use local cache to reduce the number of redis calls
-- also by pass the logic of incrementing the counter
if conf.sync_rate ~= SYNC_RATE_REALTIME and cur_usage[cache_key] then
if cur_usage_expire_at[cache_key] > ngx_time() then
return cur_usage[cache_key] + (cur_delta[cache_key] or 0)
if conf.sync_rate ~= SYNC_RATE_REALTIME and cur_usage[db_key][cache_key] then
if cur_usage_expire_at[db_key][cache_key] > ngx_time() then
return cur_usage[db_key][cache_key] + (cur_delta[db_key][cache_key] or 0)
end

cur_usage[cache_key] = 0
cur_usage_expire_at[cache_key] = periods[period] + EXPIRATION[period]
cur_delta[cache_key] = 0
cur_usage[db_key][cache_key] = 0
cur_usage_expire_at[db_key][cache_key] = periods[period] + EXPIRATION[period]
cur_delta[db_key][cache_key] = 0

return 0
end
Expand Down Expand Up @@ -339,11 +409,11 @@ return {
end

if conf.sync_rate ~= SYNC_RATE_REALTIME then
cur_usage[cache_key] = current_metric or 0
cur_usage_expire_at[cache_key] = periods[period] + EXPIRATION[period]
cur_usage[db_key][cache_key] = current_metric or 0
cur_usage_expire_at[db_key][cache_key] = periods[period] + EXPIRATION[period]
-- The key was just read from Redis using `incr`, which incremented it
-- by 1. Adjust the value to account for the prior increment.
cur_delta[cache_key] = -1
cur_delta[db_key][cache_key] = -1
end

return current_metric or 0
Expand Down
Loading