diff --git a/kong/cluster_events/init.lua b/kong/cluster_events/init.lua index 6566277477fe..ca30f522eba1 100644 --- a/kong/cluster_events/init.lua +++ b/kong/cluster_events/init.lua @@ -13,6 +13,7 @@ local timer_at = ngx.timer.at local ngx_update_time = ngx.update_time local knode = kong and kong.node or require "kong.pdk.node".new() +local concurrency = require "kong.concurrency" local POLL_INTERVAL_LOCK_KEY = "cluster_events:poll_interval" local POLL_RUNNING_LOCK_KEY = "cluster_events:poll_running" @@ -326,80 +327,45 @@ if ngx_debug then end -local function get_lock(self) - -- check if a poll is not currently running, to ensure we don't start - -- another poll while a worker is still stuck in its own polling (in - -- case it is being slow) - -- we still add an exptime to this lock in case something goes horribly - -- wrong, to ensure other workers can poll new events - -- a poll cannot take more than max(poll_interval * 5, 10) -- 10s min - local ok, err = self.shm:safe_add(POLL_RUNNING_LOCK_KEY, true, - max(self.poll_interval * 5, 10)) - if not ok then - if err ~= "exists" then - log(ERR, "failed to acquire poll_running lock: ", err) - end - -- else - -- log(DEBUG, "failed to acquire poll_running lock: ", - -- "a worker still holds the lock") - - return false - end - - if self.poll_interval > 0.001 then - -- check if interval of `poll_interval` has elapsed already, to ensure - -- we do not run the poll when a previous poll was quickly executed, but - -- another worker got the timer trigger a bit too late. - ok, err = self.shm:safe_add(POLL_INTERVAL_LOCK_KEY, true, - self.poll_interval - 0.001) - if not ok then - if err ~= "exists" then - log(ERR, "failed to acquire poll_interval lock: ", err) - end - -- else - -- log(DEBUG, "failed to acquire poll_interval lock: ", - -- "not enough time elapsed since last poll") - - self.shm:delete(POLL_RUNNING_LOCK_KEY) - - return false - end - end - - return true -end - - poll_handler = function(premature, self) if premature or not self.polling then -- set self.polling to false to stop a polling loop return end - if not get_lock(self) then - local ok, err = timer_at(self.poll_interval, poll_handler, self) - if not ok then - log(CRIT, "failed to start recurring polling timer: ", err) + -- check if a poll is not currently running, to ensure we don't start + -- another poll while a worker is still stuck in its own polling (in + -- case it is being slow) + -- we still add an exptime to this lock in case something goes horribly + -- wrong, to ensure other workers can poll new events + -- a poll cannot take more than max(poll_interval * 5, 10) -- 10s min + local ok, err = concurrency.with_worker_mutex({ + name = POLL_RUNNING_LOCK_KEY, + timeout = 0, + exptime = max(self.poll_interval * 5, 10), + }, function() + if self.poll_interval > 0.001 then + -- check if interval of `poll_interval` has elapsed already, to ensure + -- we do not run the poll when a previous poll was quickly executed, but + -- another worker got the timer trigger a bit too late. + return concurrency.with_worker_mutex({ + name = POLL_INTERVAL_LOCK_KEY, + timeout = 0, + exptime = self.poll_interval - 0.001, + }, function() + return poll(self) + end) end - return - end + return poll(self) + end) - -- single worker - - local pok, perr, err = pcall(poll, self) - if not pok then - log(ERR, "poll() threw an error: ", perr) - - elseif not perr then - log(ERR, "failed to poll: ", err) + if not ok and err ~= "exists" then + log(ERR, err) end - -- unlock - - self.shm:delete(POLL_RUNNING_LOCK_KEY) - - local ok, err = timer_at(self.poll_interval, poll_handler, self) + -- schedule next polling timer + ok, err = timer_at(self.poll_interval, poll_handler, self) if not ok then log(CRIT, "failed to start recurring polling timer: ", err) end diff --git a/kong/concurrency.lua b/kong/concurrency.lua index beef26d76aea..2b03b2b7cad3 100644 --- a/kong/concurrency.lua +++ b/kong/concurrency.lua @@ -51,7 +51,8 @@ function concurrency.with_worker_mutex(opts, fn) local elapsed, err = rlock:lock(opts_name) if not elapsed then if err == "timeout" then - return nil, err + local ttl = rlock.dict and rlock.dict:ttl(opts_name) + return nil, err, ttl end return nil, "failed to acquire worker lock: " .. err end diff --git a/kong/db/declarative/import.lua b/kong/db/declarative/import.lua index 2b8411160545..b2d5bd33caa6 100644 --- a/kong/db/declarative/import.lua +++ b/kong/db/declarative/import.lua @@ -4,6 +4,7 @@ local constants = require("kong.constants") local workspaces = require("kong.workspaces") local cycle_aware_deep_copy = require("kong.tools.table").cycle_aware_deep_copy local declarative_config = require("kong.db.schema.others.declarative_config") +local concurrency = require("kong.concurrency") local yield = require("kong.tools.yield").yield @@ -571,25 +572,24 @@ do local DECLARATIVE_RETRY_TTL_MAX = 10 local DECLARATIVE_LOCK_KEY = "declarative:lock" - -- make sure no matter which path it exits, we released the lock. load_into_cache_with_events = function(entities, meta, hash, hashes) - local kong_shm = ngx.shared.kong + local ok, err, ttl = concurrency.with_worker_mutex({ + name = DECLARATIVE_LOCK_KEY, + timeout = 0, + exptime = DECLARATIVE_LOCK_TTL, + }, function() + return load_into_cache_with_events_no_lock(entities, meta, hash, hashes) + end) - local ok, err = kong_shm:add(DECLARATIVE_LOCK_KEY, 0, DECLARATIVE_LOCK_TTL) if not ok then - if err == "exists" then - local ttl = min(kong_shm:ttl(DECLARATIVE_LOCK_KEY), DECLARATIVE_RETRY_TTL_MAX) - return nil, "busy", ttl + if err == "timeout" and ttl then + local retry_after = min(ttl, DECLARATIVE_RETRY_TTL_MAX) + return nil, "busy", retry_after end - kong_shm:delete(DECLARATIVE_LOCK_KEY) return nil, err end - ok, err = load_into_cache_with_events_no_lock(entities, meta, hash, hashes) - - kong_shm:delete(DECLARATIVE_LOCK_KEY) - return ok, err end end