Skip to content

Commit

Permalink
fixup! refactor(concurrency): consistent node-level locks
Browse files Browse the repository at this point in the history
There was an issue of log noise in the previous refactor because a
conditional log was checking for the returned value "exists" from the
lock which was not returned as expected by `with_worker_mutex`. This
commit addresses that issue by checking for the correct value `timeout`.
  • Loading branch information
samugi committed Jul 17, 2024
1 parent 4fe62a6 commit 49e7450
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 3 deletions.
2 changes: 1 addition & 1 deletion kong/cluster_events/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ poll_handler = function(premature, self)
return poll(self)
end)

if not ok and err ~= "exists" then
if not ok and err ~= "timeout" then
log(ERR, err)
end

Expand Down
3 changes: 2 additions & 1 deletion kong/db/declarative/import.lua
Original file line number Diff line number Diff line change
Expand Up @@ -582,7 +582,8 @@ do
end)

if not ok then
if err == "timeout" and ttl then
if err == "timeout" then
ttl = ttl or DECLARATIVE_RETRY_TTL_MAX
local retry_after = min(ttl, DECLARATIVE_RETRY_TTL_MAX)
return nil, "busy", retry_after
end
Expand Down
19 changes: 18 additions & 1 deletion spec/02-integration/06-invalidations/01-cluster_events_spec.lua
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,28 @@ _G.ngx.config.debug = true

local helpers = require "spec.helpers"
local kong_cluster_events = require "kong.cluster_events"
local match = require "luassert.match"


for _, strategy in helpers.each_strategy() do
describe("cluster_events with db [#" .. strategy .. "]", function()
local db
local db, log_spy, orig_ngx_log

lazy_setup(function()
local _
_, db = helpers.get_db_utils(strategy, {})

orig_ngx_log = ngx.log
local logged = { level = function() end }
log_spy = spy.on(logged, "level")
_G.ngx.log = function(l) logged.level(l) end -- luacheck: ignore
end)

lazy_teardown(function()
local cluster_events = assert(kong_cluster_events.new { db = db })
cluster_events.strategy:truncate_events()

_G.ngx.log = orig_ngx_log -- luacheck: ignore
end)

before_each(function()
Expand Down Expand Up @@ -121,6 +129,7 @@ for _, strategy in helpers.each_strategy() do

assert(cluster_events_1:poll())
assert.spy(spy_func).was_called(3)
assert.spy(log_spy).was_not_called_with(match.is_not.gt(ngx.ERR))
end)

it("broadcasts data to subscribers", function()
Expand All @@ -144,6 +153,7 @@ for _, strategy in helpers.each_strategy() do
assert(cluster_events_1:poll())
assert.spy(spy_func).was_called(1)
assert.spy(spy_func).was_called_with("hello world")
assert.spy(log_spy).was_not_called_with(match.is_not.gt(ngx.ERR))
end)

it("does not broadcast events on the same node", function()
Expand All @@ -165,6 +175,7 @@ for _, strategy in helpers.each_strategy() do

assert(cluster_events_1:poll())
assert.spy(spy_func).was_not_called()
assert.spy(log_spy).was_not_called_with(match.is_not.gt(ngx.ERR))
end)

it("starts interval polling when subscribing", function()
Expand Down Expand Up @@ -199,6 +210,7 @@ for _, strategy in helpers.each_strategy() do
helpers.wait_until(function()
return called == 2
end, 10)
assert.spy(log_spy).was_not_called_with(match.is_not.gt(ngx.ERR))
end)

it("applies a poll_offset to lookback potentially missed events", function()
Expand Down Expand Up @@ -240,6 +252,7 @@ for _, strategy in helpers.each_strategy() do

assert(cluster_events_1:poll())
assert.spy(spy_func).was_called(2) -- not called again this time
assert.spy(log_spy).was_not_called_with(match.is_not.gt(ngx.ERR))
end)

it("handles more than <PAGE_SIZE> events at once", function()
Expand All @@ -263,6 +276,7 @@ for _, strategy in helpers.each_strategy() do

assert(cluster_events_1:poll())
assert.spy(spy_func).was_called(201)
assert.spy(log_spy).was_not_called_with(match.is_not.gt(ngx.ERR))
end)

it("runs callbacks in protected mode", function()
Expand All @@ -285,6 +299,7 @@ for _, strategy in helpers.each_strategy() do
assert.has_no_error(function()
cluster_events_1:poll()
end)
assert.spy(log_spy).was_not_called_with(match.is_not.gt(ngx.ERR))
end)

it("broadcasts an event with a delay", function()
Expand Down Expand Up @@ -319,6 +334,7 @@ for _, strategy in helpers.each_strategy() do
assert(cluster_events_1:poll())
return pcall(assert.spy(spy_func).was_called, 1) -- called
end, 1) -- note that we have already waited for `delay` seconds
assert.spy(log_spy).was_not_called_with(match.is_not.gt(ngx.ERR))
end)

it("broadcasts an event with a polling delay for subscribers", function()
Expand Down Expand Up @@ -356,6 +372,7 @@ for _, strategy in helpers.each_strategy() do
assert(cluster_events_1:poll())
return pcall(assert.spy(spy_func).was_called, 1) -- called
end, 1) -- note that we have already waited for `delay` seconds
assert.spy(log_spy).was_not_called_with(match.is_not.gt(ngx.ERR))
end)
end)
end)
Expand Down
16 changes: 16 additions & 0 deletions spec/helpers.lua
Original file line number Diff line number Diff line change
Expand Up @@ -2825,6 +2825,22 @@ luassert:register("assertion", "gt", is_gt,
"assertion.gt.negative",
"assertion.gt.positive")



---
-- Matcher to ensure a value is greater than a base value.
-- @function is_gt_matcher
-- @param base the base value to compare against
-- @param value the value that must be greater than the base value
local function is_gt_matcher(state, arguments)
local expected = arguments[1]
return function(value)
return value > expected
end
end
luassert:register("matcher", "gt", is_gt_matcher)


--- Generic modifier "certificate".
-- Will set a "certificate" value in the assertion state, so following
-- assertions will operate on the value set.
Expand Down

0 comments on commit 49e7450

Please sign in to comment.