diff --git a/kong/clustering/data_plane.lua b/kong/clustering/data_plane.lua index 95dff28f0d67..3364a331ddb6 100644 --- a/kong/clustering/data_plane.lua +++ b/kong/clustering/data_plane.lua @@ -88,6 +88,9 @@ function _M:init_worker(basic_info) -- does not config rpc sync if not kong.sync then + -- start communicate() + self.run_communicate = true + start_communicate() return end @@ -108,15 +111,13 @@ function _M:init_worker(basic_info) -- cp supports kong.sync.v2 if has_sync_v2 then + -- notify communicate() to exit + self.run_communicate = false return end - -- we only check once - if self.inited then - return - end - - self.inited = true + -- start communicate() + self.run_communicate = true ngx_log(ngx_WARN, "sync v1 is enabled due to rpc sync can not work.") @@ -262,7 +263,8 @@ function _M:communicate(premature) local config_err_t local config_thread = ngx.thread.spawn(function() - while not exiting() and not config_exit do + -- outside flag will stop the communicate() loop + while not exiting() and not config_exit and self.run_communicate do local ok, err = config_semaphore:wait(1) if not ok then @@ -416,7 +418,7 @@ function _M:communicate(premature) ngx_log(ngx_ERR, _log_prefix, perr, log_suffix) end - if not exiting() then + if not exiting() and self.run_communicate then assert(ngx.timer.at(reconnection_delay, function(premature) self:communicate(premature) end)) diff --git a/kong/clustering/services/sync/init.lua b/kong/clustering/services/sync/init.lua index c901914b6230..1ee55254e4b3 100644 --- a/kong/clustering/services/sync/init.lua +++ b/kong/clustering/services/sync/init.lua @@ -76,19 +76,13 @@ function _M:init_worker() -- cp does not support kong.sync.v2 if not has_sync_v2 then ngx.log(ngx.WARN, "rpc sync is disabled in CP.") + assert(self.rpc:sync_every(EACH_SYNC_DELAY), true) -- stop timer return end -- sync to CP ASAP assert(self.rpc:sync_once(FIRST_SYNC_DELAY)) - -- we only check once - if self.inited then - return - end - - self.inited = true - assert(self.rpc:sync_every(EACH_SYNC_DELAY)) end, "clustering:jsonrpc", "connected") diff --git a/kong/clustering/services/sync/rpc.lua b/kong/clustering/services/sync/rpc.lua index c84a10ecd76e..108c54d7fdd8 100644 --- a/kong/clustering/services/sync/rpc.lua +++ b/kong/clustering/services/sync/rpc.lua @@ -413,8 +413,24 @@ function _M:sync_once(delay) end -function _M:sync_every(delay) - return ngx.timer.every(delay, sync_handler) +function _M:sync_every(delay, stop) + local name = "rpc_sync_v2_every" + local is_managed = kong.timer:is_managed(name) + + -- we only start or stop once + + if stop then + if is_managed then + assert(kong.timer:cancel(name)) + end + return true + end + + if is_managed then + return true + end + + return kong.timer:named_every(name, delay, sync_handler) end