Skip to content

Commit

Permalink
Revert "refactor(concurrency): consistent node-level locks"
Browse files Browse the repository at this point in the history
This reverts commit 7da959c.
  • Loading branch information
samugi committed Jul 12, 2024
1 parent f908184 commit b65e2fb
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 42 deletions.
92 changes: 63 additions & 29 deletions kong/cluster_events/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ local timer_at = ngx.timer.at
local ngx_update_time = ngx.update_time

local knode = kong and kong.node or require "kong.pdk.node".new()
local concurrency = require "kong.concurrency"

local POLL_INTERVAL_LOCK_KEY = "cluster_events:poll_interval"
local POLL_RUNNING_LOCK_KEY = "cluster_events:poll_running"
Expand Down Expand Up @@ -327,45 +326,80 @@ if ngx_debug then
end


poll_handler = function(premature, self)
if premature or not self.polling then
-- set self.polling to false to stop a polling loop
return
end

local function get_lock(self)
-- check if a poll is not currently running, to ensure we don't start
-- another poll while a worker is still stuck in its own polling (in
-- case it is being slow)
-- we still add an exptime to this lock in case something goes horribly
-- wrong, to ensure other workers can poll new events
-- a poll cannot take more than max(poll_interval * 5, 10) -- 10s min
local ok, err = concurrency.with_worker_mutex({
name = POLL_RUNNING_LOCK_KEY,
timeout = 0,
exptime = max(self.poll_interval * 5, 10),
}, function()
if self.poll_interval > 0.001 then
-- check if interval of `poll_interval` has elapsed already, to ensure
-- we do not run the poll when a previous poll was quickly executed, but
-- another worker got the timer trigger a bit too late.
return concurrency.with_worker_mutex({
name = POLL_INTERVAL_LOCK_KEY,
timeout = 0,
exptime = self.poll_interval - 0.001,
}, function()
return poll(self)
end)
local ok, err = self.shm:safe_add(POLL_RUNNING_LOCK_KEY, true,
max(self.poll_interval * 5, 10))
if not ok then
if err ~= "exists" then
log(ERR, "failed to acquire poll_running lock: ", err)
end
-- else
-- log(DEBUG, "failed to acquire poll_running lock: ",
-- "a worker still holds the lock")

return false
end

if self.poll_interval > 0.001 then
-- check if interval of `poll_interval` has elapsed already, to ensure
-- we do not run the poll when a previous poll was quickly executed, but
-- another worker got the timer trigger a bit too late.
ok, err = self.shm:safe_add(POLL_INTERVAL_LOCK_KEY, true,
self.poll_interval - 0.001)
if not ok then
if err ~= "exists" then
log(ERR, "failed to acquire poll_interval lock: ", err)
end
-- else
-- log(DEBUG, "failed to acquire poll_interval lock: ",
-- "not enough time elapsed since last poll")

self.shm:delete(POLL_RUNNING_LOCK_KEY)

return false
end
end

return true
end


poll_handler = function(premature, self)
if premature or not self.polling then
-- set self.polling to false to stop a polling loop
return
end

if not get_lock(self) then
local ok, err = timer_at(self.poll_interval, poll_handler, self)
if not ok then
log(CRIT, "failed to start recurring polling timer: ", err)
end

return poll(self)
end)
return
end

if not ok and err ~= "exists" then
log(ERR, err)
-- single worker

local pok, perr, err = pcall(poll, self)
if not pok then
log(ERR, "poll() threw an error: ", perr)

elseif not perr then
log(ERR, "failed to poll: ", err)
end

-- schedule next polling timer
ok, err = timer_at(self.poll_interval, poll_handler, self)
-- unlock

self.shm:delete(POLL_RUNNING_LOCK_KEY)

local ok, err = timer_at(self.poll_interval, poll_handler, self)
if not ok then
log(CRIT, "failed to start recurring polling timer: ", err)
end
Expand Down
3 changes: 1 addition & 2 deletions kong/concurrency.lua
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,7 @@ function concurrency.with_worker_mutex(opts, fn)
local elapsed, err = rlock:lock(opts_name)
if not elapsed then
if err == "timeout" then
local ttl = rlock.dict and rlock.dict:ttl(opts_name)
return nil, err, ttl
return nil, err
end
return nil, "failed to acquire worker lock: " .. err
end
Expand Down
22 changes: 11 additions & 11 deletions kong/db/declarative/import.lua
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ local constants = require("kong.constants")
local workspaces = require("kong.workspaces")
local cycle_aware_deep_copy = require("kong.tools.table").cycle_aware_deep_copy
local declarative_config = require("kong.db.schema.others.declarative_config")
local concurrency = require("kong.concurrency")


local yield = require("kong.tools.yield").yield
Expand Down Expand Up @@ -572,24 +571,25 @@ do
local DECLARATIVE_RETRY_TTL_MAX = 10
local DECLARATIVE_LOCK_KEY = "declarative:lock"

-- make sure no matter which path it exits, we released the lock.
load_into_cache_with_events = function(entities, meta, hash, hashes)
local ok, err, ttl = concurrency.with_worker_mutex({
name = DECLARATIVE_LOCK_KEY,
timeout = 0,
exptime = DECLARATIVE_LOCK_TTL,
}, function()
return load_into_cache_with_events_no_lock(entities, meta, hash, hashes)
end)
local kong_shm = ngx.shared.kong

local ok, err = kong_shm:add(DECLARATIVE_LOCK_KEY, 0, DECLARATIVE_LOCK_TTL)
if not ok then
if err == "timeout" and ttl then
local retry_after = min(ttl, DECLARATIVE_RETRY_TTL_MAX)
return nil, "busy", retry_after
if err == "exists" then
local ttl = min(kong_shm:ttl(DECLARATIVE_LOCK_KEY), DECLARATIVE_RETRY_TTL_MAX)
return nil, "busy", ttl
end

kong_shm:delete(DECLARATIVE_LOCK_KEY)
return nil, err
end

ok, err = load_into_cache_with_events_no_lock(entities, meta, hash, hashes)

kong_shm:delete(DECLARATIVE_LOCK_KEY)

return ok, err
end
end
Expand Down

1 comment on commit b65e2fb

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bazel Build

Docker image available kong/kong:b65e2fb1048217e61d03b848e9f9bd07fd72e544
Artifacts available https://github.com/Kong/kong/actions/runs/9906659442

Please sign in to comment.