From 30b4af70ddf2243fa4d4f66b08d20d3165a058f9 Mon Sep 17 00:00:00 2001 From: chronolaw Date: Fri, 3 Jan 2025 06:28:32 +0800 Subject: [PATCH 1/2] feat(clustering/rpc): generate rpc disconnected event rpc disconnected --- kong/clustering/rpc/manager.lua | 11 +++++++++++ kong/clustering/services/sync/init.lua | 5 +++++ 2 files changed, 16 insertions(+) diff --git a/kong/clustering/rpc/manager.lua b/kong/clustering/rpc/manager.lua index 88dfa6a8594d..f1918398c761 100644 --- a/kong/clustering/rpc/manager.lua +++ b/kong/clustering/rpc/manager.lua @@ -469,6 +469,17 @@ function _M:handle_websocket() local res, err = s:join() self:_remove_socket(s) + -- tell outside that rpc disconnected + do + local worker_events = assert(kong.worker_events) + + -- notify this worker + local ok, err = worker_events.post_local("clustering:jsonrpc", "disconnected") + if not ok then + ngx_log(ngx_ERR, _log_prefix, "unable to post rpc disconnected event: ", err) + end + end + if not res then ngx_log(ngx_ERR, _log_prefix, "RPC connection broken: ", err, " node_id: ", node_id) return ngx_exit(ngx.ERROR) diff --git a/kong/clustering/services/sync/init.lua b/kong/clustering/services/sync/init.lua index 1ee55254e4b3..ba431420d0da 100644 --- a/kong/clustering/services/sync/init.lua +++ b/kong/clustering/services/sync/init.lua @@ -86,6 +86,11 @@ function _M:init_worker() assert(self.rpc:sync_every(EACH_SYNC_DELAY)) end, "clustering:jsonrpc", "connected") + + -- if rpc is down we will also stop to sync + worker_events.register(function() + assert(self.rpc:sync_every(EACH_SYNC_DELAY), true) -- stop timer + end, "clustering:jsonrpc", "disconnected") end From d90ef35967cd8f66b1f98b19396a2c1c17313366 Mon Sep 17 00:00:00 2001 From: chronolaw Date: Mon, 6 Jan 2025 13:12:09 +0800 Subject: [PATCH 2/2] fix(clustering/rpc): fix mistake of stop timer --- kong/clustering/services/sync/init.lua | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/kong/clustering/services/sync/init.lua b/kong/clustering/services/sync/init.lua index ba431420d0da..73864a17f092 100644 --- a/kong/clustering/services/sync/init.lua +++ b/kong/clustering/services/sync/init.lua @@ -76,7 +76,7 @@ function _M:init_worker() -- cp does not support kong.sync.v2 if not has_sync_v2 then ngx.log(ngx.WARN, "rpc sync is disabled in CP.") - assert(self.rpc:sync_every(EACH_SYNC_DELAY), true) -- stop timer + assert(self.rpc:sync_every(EACH_SYNC_DELAY, true)) -- stop timer return end @@ -89,7 +89,7 @@ function _M:init_worker() -- if rpc is down we will also stop to sync worker_events.register(function() - assert(self.rpc:sync_every(EACH_SYNC_DELAY), true) -- stop timer + assert(self.rpc:sync_every(EACH_SYNC_DELAY, true)) -- stop timer end, "clustering:jsonrpc", "disconnected") end