diff --git a/kong/cluster_events/init.lua b/kong/cluster_events/init.lua index ca30f522eba1..6566277477fe 100644 --- a/kong/cluster_events/init.lua +++ b/kong/cluster_events/init.lua @@ -13,7 +13,6 @@ local timer_at = ngx.timer.at local ngx_update_time = ngx.update_time local knode = kong and kong.node or require "kong.pdk.node".new() -local concurrency = require "kong.concurrency" local POLL_INTERVAL_LOCK_KEY = "cluster_events:poll_interval" local POLL_RUNNING_LOCK_KEY = "cluster_events:poll_running" @@ -327,45 +326,80 @@ if ngx_debug then end -poll_handler = function(premature, self) - if premature or not self.polling then - -- set self.polling to false to stop a polling loop - return - end - +local function get_lock(self) -- check if a poll is not currently running, to ensure we don't start -- another poll while a worker is still stuck in its own polling (in -- case it is being slow) -- we still add an exptime to this lock in case something goes horribly -- wrong, to ensure other workers can poll new events -- a poll cannot take more than max(poll_interval * 5, 10) -- 10s min - local ok, err = concurrency.with_worker_mutex({ - name = POLL_RUNNING_LOCK_KEY, - timeout = 0, - exptime = max(self.poll_interval * 5, 10), - }, function() - if self.poll_interval > 0.001 then - -- check if interval of `poll_interval` has elapsed already, to ensure - -- we do not run the poll when a previous poll was quickly executed, but - -- another worker got the timer trigger a bit too late. - return concurrency.with_worker_mutex({ - name = POLL_INTERVAL_LOCK_KEY, - timeout = 0, - exptime = self.poll_interval - 0.001, - }, function() - return poll(self) - end) + local ok, err = self.shm:safe_add(POLL_RUNNING_LOCK_KEY, true, + max(self.poll_interval * 5, 10)) + if not ok then + if err ~= "exists" then + log(ERR, "failed to acquire poll_running lock: ", err) + end + -- else + -- log(DEBUG, "failed to acquire poll_running lock: ", + -- "a worker still holds the lock") + + return false + end + + if self.poll_interval > 0.001 then + -- check if interval of `poll_interval` has elapsed already, to ensure + -- we do not run the poll when a previous poll was quickly executed, but + -- another worker got the timer trigger a bit too late. + ok, err = self.shm:safe_add(POLL_INTERVAL_LOCK_KEY, true, + self.poll_interval - 0.001) + if not ok then + if err ~= "exists" then + log(ERR, "failed to acquire poll_interval lock: ", err) + end + -- else + -- log(DEBUG, "failed to acquire poll_interval lock: ", + -- "not enough time elapsed since last poll") + + self.shm:delete(POLL_RUNNING_LOCK_KEY) + + return false + end + end + + return true +end + + +poll_handler = function(premature, self) + if premature or not self.polling then + -- set self.polling to false to stop a polling loop + return + end + + if not get_lock(self) then + local ok, err = timer_at(self.poll_interval, poll_handler, self) + if not ok then + log(CRIT, "failed to start recurring polling timer: ", err) end - return poll(self) - end) + return + end - if not ok and err ~= "exists" then - log(ERR, err) + -- single worker + + local pok, perr, err = pcall(poll, self) + if not pok then + log(ERR, "poll() threw an error: ", perr) + + elseif not perr then + log(ERR, "failed to poll: ", err) end - -- schedule next polling timer - ok, err = timer_at(self.poll_interval, poll_handler, self) + -- unlock + + self.shm:delete(POLL_RUNNING_LOCK_KEY) + + local ok, err = timer_at(self.poll_interval, poll_handler, self) if not ok then log(CRIT, "failed to start recurring polling timer: ", err) end diff --git a/kong/concurrency.lua b/kong/concurrency.lua index 2b03b2b7cad3..beef26d76aea 100644 --- a/kong/concurrency.lua +++ b/kong/concurrency.lua @@ -51,8 +51,7 @@ function concurrency.with_worker_mutex(opts, fn) local elapsed, err = rlock:lock(opts_name) if not elapsed then if err == "timeout" then - local ttl = rlock.dict and rlock.dict:ttl(opts_name) - return nil, err, ttl + return nil, err end return nil, "failed to acquire worker lock: " .. err end diff --git a/kong/db/declarative/import.lua b/kong/db/declarative/import.lua index b2d5bd33caa6..2b8411160545 100644 --- a/kong/db/declarative/import.lua +++ b/kong/db/declarative/import.lua @@ -4,7 +4,6 @@ local constants = require("kong.constants") local workspaces = require("kong.workspaces") local cycle_aware_deep_copy = require("kong.tools.table").cycle_aware_deep_copy local declarative_config = require("kong.db.schema.others.declarative_config") -local concurrency = require("kong.concurrency") local yield = require("kong.tools.yield").yield @@ -572,24 +571,25 @@ do local DECLARATIVE_RETRY_TTL_MAX = 10 local DECLARATIVE_LOCK_KEY = "declarative:lock" + -- make sure no matter which path it exits, we released the lock. load_into_cache_with_events = function(entities, meta, hash, hashes) - local ok, err, ttl = concurrency.with_worker_mutex({ - name = DECLARATIVE_LOCK_KEY, - timeout = 0, - exptime = DECLARATIVE_LOCK_TTL, - }, function() - return load_into_cache_with_events_no_lock(entities, meta, hash, hashes) - end) + local kong_shm = ngx.shared.kong + local ok, err = kong_shm:add(DECLARATIVE_LOCK_KEY, 0, DECLARATIVE_LOCK_TTL) if not ok then - if err == "timeout" and ttl then - local retry_after = min(ttl, DECLARATIVE_RETRY_TTL_MAX) - return nil, "busy", retry_after + if err == "exists" then + local ttl = min(kong_shm:ttl(DECLARATIVE_LOCK_KEY), DECLARATIVE_RETRY_TTL_MAX) + return nil, "busy", ttl end + kong_shm:delete(DECLARATIVE_LOCK_KEY) return nil, err end + ok, err = load_into_cache_with_events_no_lock(entities, meta, hash, hashes) + + kong_shm:delete(DECLARATIVE_LOCK_KEY) + return ok, err end end