diff --git a/kong/clustering/data_plane.lua b/kong/clustering/data_plane.lua index d5acbcaeb04a..f36fd25db4bc 100644 --- a/kong/clustering/data_plane.lua +++ b/kong/clustering/data_plane.lua @@ -2,7 +2,6 @@ local _M = {} local _MT = { __index = _M, } -local semaphore = require("ngx.semaphore") local cjson = require("cjson.safe") local constants = require("kong.constants") local protocol = require("kong.clustering.protocol") @@ -13,6 +12,7 @@ local get_updated_monotonic_ms = require("kong.tools.utils").get_updated_monoton local type = type local pcall = pcall local assert = assert +local collectgarbage = collectgarbage local setmetatable = setmetatable local ipairs = ipairs local math = math @@ -137,8 +137,6 @@ function _M:communicate(premature) return end - local config_semaphore = semaphore.new(0) - -- How DP connection management works: -- -- Three threads are spawned, when any of these threads exits, @@ -159,47 +157,11 @@ function _M:communicate(premature) -- applied on the local Kong DP local ping_immediately - local config_exit - local next_data - - local config_thread = ngx.thread.spawn(function() - while not (config_exit or exiting()) do - local ok, err = config_semaphore:wait(1) - if not ok then - if err ~= "timeout" then - ngx.log(ngx.ERR, LOG_PREFIX, "semaphore wait error: ", err) - ngx.sleep(0) - end - - else - local data = next_data - if data then - local start = get_updated_monotonic_ms() - local pok, res, err = pcall(protocol.reconfigure_end, data) - if pok then - ngx.log(ngx.DEBUG, LOG_PREFIX, "importing configuration took: ", - get_updated_monotonic_ms() - start, " ms", log_suffix) - ping_immediately = true - end - - if not pok or not res then - ngx.log(ngx.ERR, LOG_PREFIX, "unable to update running config: ", - (not pok and res) or err) - end - - if next_data == data then - next_data = nil - end - end - - ngx.sleep(0) - end - end - end) + local thread_exit local write_thread = ngx.thread.spawn(function() local counter = 0 -- count down to ping - while not (config_exit or exiting()) do + while not (thread_exit or exiting()) do if ping_immediately or counter <= 0 then ping_immediately = nil counter = PING_INTERVAL @@ -211,12 +173,14 @@ function _M:communicate(premature) ngx.sleep(1) end + + thread_exit = true end) local read_thread = ngx.thread.spawn(function() local receive_start local last_seen = ngx.time() - while not (config_exit or exiting()) do + while not (thread_exit or exiting()) do local data, typ, err = wb:recv_frame() if err then if not is_timeout(err) then @@ -257,12 +221,17 @@ function _M:communicate(premature) ngx.log(ngx.DEBUG, LOG_PREFIX, "received updated configuration from control plane: ", get_updated_monotonic_ms() - receive_start, " ms", log_suffix) - next_data = msg - if config_semaphore:count() <= 0 then - -- the following line always executes immediately after the `if` check - -- because `:count` will never yield, end result is that the semaphore - -- count is guaranteed to not exceed 1 - config_semaphore:post() + local start = get_updated_monotonic_ms() + local pok, res, err = pcall(protocol.reconfigure_end, msg) + if pok then + ngx.log(ngx.INFO, LOG_PREFIX, "importing configuration took: ", + get_updated_monotonic_ms() - start, " ms", log_suffix) + ping_immediately = true + end + + if not pok or not res then + ngx.log(ngx.ERR, LOG_PREFIX, "unable to update running config: ", + (not pok and res) or err) end else @@ -275,11 +244,14 @@ function _M:communicate(premature) ngx.sleep(0) end + + collectgarbage() end + + thread_exit = true end) - local ok, err, perr = ngx.thread.wait(read_thread, write_thread, config_thread) - ngx.thread.kill(read_thread) + local ok, err, perr = ngx.thread.wait(write_thread) ngx.thread.kill(write_thread) wb:close() @@ -294,9 +266,9 @@ function _M:communicate(premature) -- the config thread might be holding a lock if it's in the middle of an -- update, so we need to give it a chance to terminate gracefully - config_exit = true + thread_exit = true - ok, err, perr = ngx.thread.wait(config_thread) + ok, err, perr = ngx.thread.wait(read_thread) if not ok then ngx.log(ngx.ERR, LOG_PREFIX, err, log_suffix) @@ -304,7 +276,7 @@ function _M:communicate(premature) ngx.log(ngx.ERR, LOG_PREFIX, perr, log_suffix) end - ngx.thread.kill(config_thread) + ngx.thread.kill(read_thread) if not exiting() then assert(ngx.timer.at(reconnection_delay, function(premature)