diff --git a/kong/clustering/rpc/manager.lua b/kong/clustering/rpc/manager.lua index 88dfa6a8594..f1918398c76 100644 --- a/kong/clustering/rpc/manager.lua +++ b/kong/clustering/rpc/manager.lua @@ -469,6 +469,17 @@ function _M:handle_websocket() local res, err = s:join() self:_remove_socket(s) + -- tell outside that rpc disconnected + do + local worker_events = assert(kong.worker_events) + + -- notify this worker + local ok, err = worker_events.post_local("clustering:jsonrpc", "disconnected") + if not ok then + ngx_log(ngx_ERR, _log_prefix, "unable to post rpc disconnected event: ", err) + end + end + if not res then ngx_log(ngx_ERR, _log_prefix, "RPC connection broken: ", err, " node_id: ", node_id) return ngx_exit(ngx.ERROR) diff --git a/kong/clustering/services/sync/init.lua b/kong/clustering/services/sync/init.lua index 1ee55254e4b..73864a17f09 100644 --- a/kong/clustering/services/sync/init.lua +++ b/kong/clustering/services/sync/init.lua @@ -76,7 +76,7 @@ function _M:init_worker() -- cp does not support kong.sync.v2 if not has_sync_v2 then ngx.log(ngx.WARN, "rpc sync is disabled in CP.") - assert(self.rpc:sync_every(EACH_SYNC_DELAY), true) -- stop timer + assert(self.rpc:sync_every(EACH_SYNC_DELAY, true)) -- stop timer return end @@ -86,6 +86,11 @@ function _M:init_worker() assert(self.rpc:sync_every(EACH_SYNC_DELAY)) end, "clustering:jsonrpc", "connected") + + -- if rpc is down we will also stop to sync + worker_events.register(function() + assert(self.rpc:sync_every(EACH_SYNC_DELAY, true)) -- stop timer + end, "clustering:jsonrpc", "disconnected") end