Skip to content

Commit

Permalink
refactor(concurrency): consistent node-level locks
Browse files Browse the repository at this point in the history
Several places in the gateway need a node-level lock, some of them used
slightly different implementations. This refactor brings consistency in
the ways we do node-level locking by using the same implementation
(concurrency.with_worker_mutex) everywhere.
  • Loading branch information
samugi committed Jul 11, 2024
1 parent bc27ffd commit 7da959c
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 75 deletions.
92 changes: 29 additions & 63 deletions kong/cluster_events/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ local timer_at = ngx.timer.at
local ngx_update_time = ngx.update_time

local knode = kong and kong.node or require "kong.pdk.node".new()
local concurrency = require "kong.concurrency"

local POLL_INTERVAL_LOCK_KEY = "cluster_events:poll_interval"
local POLL_RUNNING_LOCK_KEY = "cluster_events:poll_running"
Expand Down Expand Up @@ -326,80 +327,45 @@ if ngx_debug then
end


local function get_lock(self)
-- check if a poll is not currently running, to ensure we don't start
-- another poll while a worker is still stuck in its own polling (in
-- case it is being slow)
-- we still add an exptime to this lock in case something goes horribly
-- wrong, to ensure other workers can poll new events
-- a poll cannot take more than max(poll_interval * 5, 10) -- 10s min
local ok, err = self.shm:safe_add(POLL_RUNNING_LOCK_KEY, true,
max(self.poll_interval * 5, 10))
if not ok then
if err ~= "exists" then
log(ERR, "failed to acquire poll_running lock: ", err)
end
-- else
-- log(DEBUG, "failed to acquire poll_running lock: ",
-- "a worker still holds the lock")

return false
end

if self.poll_interval > 0.001 then
-- check if interval of `poll_interval` has elapsed already, to ensure
-- we do not run the poll when a previous poll was quickly executed, but
-- another worker got the timer trigger a bit too late.
ok, err = self.shm:safe_add(POLL_INTERVAL_LOCK_KEY, true,
self.poll_interval - 0.001)
if not ok then
if err ~= "exists" then
log(ERR, "failed to acquire poll_interval lock: ", err)
end
-- else
-- log(DEBUG, "failed to acquire poll_interval lock: ",
-- "not enough time elapsed since last poll")

self.shm:delete(POLL_RUNNING_LOCK_KEY)

return false
end
end

return true
end


poll_handler = function(premature, self)
if premature or not self.polling then
-- set self.polling to false to stop a polling loop
return
end

if not get_lock(self) then
local ok, err = timer_at(self.poll_interval, poll_handler, self)
if not ok then
log(CRIT, "failed to start recurring polling timer: ", err)
-- check if a poll is not currently running, to ensure we don't start
-- another poll while a worker is still stuck in its own polling (in
-- case it is being slow)
-- we still add an exptime to this lock in case something goes horribly
-- wrong, to ensure other workers can poll new events
-- a poll cannot take more than max(poll_interval * 5, 10) -- 10s min
local ok, err = concurrency.with_worker_mutex({
name = POLL_RUNNING_LOCK_KEY,
timeout = 0,
exptime = max(self.poll_interval * 5, 10),
}, function()
if self.poll_interval > 0.001 then
-- check if interval of `poll_interval` has elapsed already, to ensure
-- we do not run the poll when a previous poll was quickly executed, but
-- another worker got the timer trigger a bit too late.
return concurrency.with_worker_mutex({
name = POLL_INTERVAL_LOCK_KEY,
timeout = 0,
exptime = self.poll_interval - 0.001,
}, function()
return poll(self)
end)
end

return
end
return poll(self)
end)

-- single worker

local pok, perr, err = pcall(poll, self)
if not pok then
log(ERR, "poll() threw an error: ", perr)

elseif not perr then
log(ERR, "failed to poll: ", err)
if not ok and err ~= "exists" then
log(ERR, err)
end

-- unlock

self.shm:delete(POLL_RUNNING_LOCK_KEY)

local ok, err = timer_at(self.poll_interval, poll_handler, self)
-- schedule next polling timer
ok, err = timer_at(self.poll_interval, poll_handler, self)
if not ok then
log(CRIT, "failed to start recurring polling timer: ", err)
end
Expand Down
3 changes: 2 additions & 1 deletion kong/concurrency.lua
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ function concurrency.with_worker_mutex(opts, fn)
local elapsed, err = rlock:lock(opts_name)
if not elapsed then
if err == "timeout" then
return nil, err
local ttl = rlock.dict and rlock.dict:ttl(opts_name)
return nil, err, ttl
end
return nil, "failed to acquire worker lock: " .. err
end
Expand Down
22 changes: 11 additions & 11 deletions kong/db/declarative/import.lua
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ local constants = require("kong.constants")
local workspaces = require("kong.workspaces")
local cycle_aware_deep_copy = require("kong.tools.table").cycle_aware_deep_copy
local declarative_config = require("kong.db.schema.others.declarative_config")
local concurrency = require("kong.concurrency")


local yield = require("kong.tools.yield").yield
Expand Down Expand Up @@ -571,25 +572,24 @@ do
local DECLARATIVE_RETRY_TTL_MAX = 10
local DECLARATIVE_LOCK_KEY = "declarative:lock"

-- make sure no matter which path it exits, we released the lock.
load_into_cache_with_events = function(entities, meta, hash, hashes)
local kong_shm = ngx.shared.kong
local ok, err, ttl = concurrency.with_worker_mutex({
name = DECLARATIVE_LOCK_KEY,
timeout = 0,
exptime = DECLARATIVE_LOCK_TTL,
}, function()
return load_into_cache_with_events_no_lock(entities, meta, hash, hashes)
end)

local ok, err = kong_shm:add(DECLARATIVE_LOCK_KEY, 0, DECLARATIVE_LOCK_TTL)
if not ok then
if err == "exists" then
local ttl = min(kong_shm:ttl(DECLARATIVE_LOCK_KEY), DECLARATIVE_RETRY_TTL_MAX)
return nil, "busy", ttl
if err == "timeout" and ttl then
local retry_after = min(ttl, DECLARATIVE_RETRY_TTL_MAX)
return nil, "busy", retry_after
end

kong_shm:delete(DECLARATIVE_LOCK_KEY)
return nil, err
end

ok, err = load_into_cache_with_events_no_lock(entities, meta, hash, hashes)

kong_shm:delete(DECLARATIVE_LOCK_KEY)

return ok, err
end
end
Expand Down

1 comment on commit 7da959c

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bazel Build

Docker image available kong/kong:7da959ca4e2ec87c53afe9f0b48057c0e8b22f0e
Artifacts available https://github.com/Kong/kong/actions/runs/9895757257

Please sign in to comment.