Skip to content

Commit

Permalink
no-config-thread
Browse files Browse the repository at this point in the history
Signed-off-by: Aapo Talvensaari <[email protected]>
  • Loading branch information
bungle committed Oct 24, 2023
1 parent 28bf625 commit 0e39c53
Showing 1 changed file with 25 additions and 53 deletions.
78 changes: 25 additions & 53 deletions kong/clustering/data_plane.lua
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ local _M = {}
local _MT = { __index = _M, }


local semaphore = require("ngx.semaphore")
local cjson = require("cjson.safe")
local constants = require("kong.constants")
local protocol = require("kong.clustering.protocol")
Expand All @@ -13,6 +12,7 @@ local get_updated_monotonic_ms = require("kong.tools.utils").get_updated_monoton
local type = type
local pcall = pcall
local assert = assert
local collectgarbage = collectgarbage
local setmetatable = setmetatable
local ipairs = ipairs
local math = math
Expand Down Expand Up @@ -137,8 +137,6 @@ function _M:communicate(premature)
return
end

local config_semaphore = semaphore.new(0)

-- How DP connection management works:
--
-- Three threads are spawned, when any of these threads exits,
Expand All @@ -159,47 +157,11 @@ function _M:communicate(premature)
-- applied on the local Kong DP

local ping_immediately
local config_exit
local next_data

local config_thread = ngx.thread.spawn(function()
while not (config_exit or exiting()) do
local ok, err = config_semaphore:wait(1)
if not ok then
if err ~= "timeout" then
ngx.log(ngx.ERR, LOG_PREFIX, "semaphore wait error: ", err)
ngx.sleep(0)
end

else
local data = next_data
if data then
local start = get_updated_monotonic_ms()
local pok, res, err = pcall(protocol.reconfigure_end, data)
if pok then
ngx.log(ngx.DEBUG, LOG_PREFIX, "importing configuration took: ",
get_updated_monotonic_ms() - start, " ms", log_suffix)
ping_immediately = true
end

if not pok or not res then
ngx.log(ngx.ERR, LOG_PREFIX, "unable to update running config: ",
(not pok and res) or err)
end

if next_data == data then
next_data = nil
end
end

ngx.sleep(0)
end
end
end)
local thread_exit

local write_thread = ngx.thread.spawn(function()
local counter = 0 -- count down to ping
while not (config_exit or exiting()) do
while not (thread_exit or exiting()) do
if ping_immediately or counter <= 0 then
ping_immediately = nil
counter = PING_INTERVAL
Expand All @@ -211,12 +173,14 @@ function _M:communicate(premature)

ngx.sleep(1)
end

thread_exit = true
end)

local read_thread = ngx.thread.spawn(function()
local receive_start
local last_seen = ngx.time()
while not (config_exit or exiting()) do
while not (thread_exit or exiting()) do
local data, typ, err = wb:recv_frame()
if err then
if not is_timeout(err) then
Expand Down Expand Up @@ -257,12 +221,17 @@ function _M:communicate(premature)
ngx.log(ngx.DEBUG, LOG_PREFIX, "received updated configuration from control plane: ",
get_updated_monotonic_ms() - receive_start, " ms", log_suffix)

next_data = msg
if config_semaphore:count() <= 0 then
-- the following line always executes immediately after the `if` check
-- because `:count` will never yield, end result is that the semaphore
-- count is guaranteed to not exceed 1
config_semaphore:post()
local start = get_updated_monotonic_ms()
local pok, res, err = pcall(protocol.reconfigure_end, msg)
if pok then
ngx.log(ngx.INFO, LOG_PREFIX, "importing configuration took: ",
get_updated_monotonic_ms() - start, " ms", log_suffix)
ping_immediately = true
end

if not pok or not res then
ngx.log(ngx.ERR, LOG_PREFIX, "unable to update running config: ",
(not pok and res) or err)
end

else
Expand All @@ -275,11 +244,14 @@ function _M:communicate(premature)

ngx.sleep(0)
end

collectgarbage()
end

thread_exit = true
end)

local ok, err, perr = ngx.thread.wait(read_thread, write_thread, config_thread)
ngx.thread.kill(read_thread)
local ok, err, perr = ngx.thread.wait(write_thread)
ngx.thread.kill(write_thread)
wb:close()

Expand All @@ -294,17 +266,17 @@ function _M:communicate(premature)

-- the config thread might be holding a lock if it's in the middle of an
-- update, so we need to give it a chance to terminate gracefully
config_exit = true
thread_exit = true

ok, err, perr = ngx.thread.wait(config_thread)
ok, err, perr = ngx.thread.wait(read_thread)
if not ok then
ngx.log(ngx.ERR, LOG_PREFIX, err, log_suffix)

elseif perr then
ngx.log(ngx.ERR, LOG_PREFIX, perr, log_suffix)
end

ngx.thread.kill(config_thread)
ngx.thread.kill(read_thread)

if not exiting() then
assert(ngx.timer.at(reconnection_delay, function(premature)
Expand Down

0 comments on commit 0e39c53

Please sign in to comment.