From 2cd6c2d7d00ca04cff3d767902473392bf7dc9b0 Mon Sep 17 00:00:00 2001 From: Chrono Date: Mon, 11 Nov 2024 15:48:25 +0800 Subject: [PATCH] refactor(clustering/rpc): rework on rpc initial connection (#13824) https://konghq.atlassian.net/browse/KAG-5728 --- kong/clustering/rpc/manager.lua | 38 +++++++++++++++++++++++++++------ kong/init.lua | 38 +++++++++++---------------------- 2 files changed, 44 insertions(+), 32 deletions(-) diff --git a/kong/clustering/rpc/manager.lua b/kong/clustering/rpc/manager.lua index 5d614997567a..94a7c7982550 100644 --- a/kong/clustering/rpc/manager.lua +++ b/kong/clustering/rpc/manager.lua @@ -58,14 +58,16 @@ end function _M:_add_socket(socket, capabilities_list) - local sockets = self.clients[socket.node_id] + local node_id = socket.node_id + + local sockets = self.clients[node_id] if not sockets then - assert(self.concentrator:_enqueue_subscribe(socket.node_id)) + assert(self.concentrator:_enqueue_subscribe(node_id)) sockets = setmetatable({}, { __mode = "k", }) - self.clients[socket.node_id] = sockets + self.clients[node_id] = sockets end - self.client_capabilities[socket.node_id] = { + self.client_capabilities[node_id] = { set = pl_tablex_makeset(capabilities_list), list = capabilities_list, } @@ -292,6 +294,30 @@ function _M:handle_websocket() end +function _M:try_connect(reconnection_delay) + ngx.timer.at(reconnection_delay or 0, function(premature) + self:connect(premature, + "control_plane", -- node_id + self.conf.cluster_control_plane, -- host + "/v2/outlet", -- path + self.cluster_cert.cdata, + self.cluster_cert_key) + end) +end + + +function _M:init_worker() + if self.conf.role == "data_plane" then + -- data_plane will try to connect to cp + self:try_connect() + + else + -- control_plane + self.concentrator:start() + end +end + + function _M:connect(premature, node_id, host, path, cert, key) if premature then return @@ -368,9 +394,7 @@ function _M:connect(premature, node_id, host, path, cert, key) ::err:: if not exiting() then - ngx.timer.at(reconnection_delay, function(premature) - self:connect(premature, node_id, host, path, cert, key) - end) + self:try_connect(reconnection_delay) end end diff --git a/kong/init.lua b/kong/init.lua index 57441b6e4347..e11a383b5e1d 100644 --- a/kong/init.lua +++ b/kong/init.lua @@ -882,7 +882,7 @@ function Kong.init_worker() if kong.clustering then -- full sync dp - + local is_dp_full_sync_agent = process.type() == "privileged agent" and not kong.sync if is_control_plane(kong.configuration) or -- CP needs to support both full and incremental sync @@ -890,7 +890,7 @@ function Kong.init_worker() then kong.clustering:init_worker() end - + -- DP full sync agent skips the rest of the init_worker if is_dp_full_sync_agent then return @@ -991,30 +991,18 @@ function Kong.init_worker() plugin_servers.start() end - if kong.clustering then - -- rpc and incremental sync - if kong.rpc and is_http_module then - - -- only available in http subsystem - local cluster_tls = require("kong.clustering.tls") - - if is_data_plane(kong.configuration) then - ngx.timer.at(0, function(premature) - kong.rpc:connect(premature, - "control_plane", kong.configuration.cluster_control_plane, - "/v2/outlet", - cluster_tls.get_cluster_cert(kong.configuration).cdata, - cluster_tls.get_cluster_cert_key(kong.configuration)) - end) - - else -- control_plane - kong.rpc.concentrator:start() - end + -- rpc and incremental sync + if is_http_module then - -- init incremental sync - if kong.sync then - kong.sync:init_worker() - end + -- init rpc connection + if kong.rpc then + kong.rpc:init_worker() + end + + -- init incremental sync + -- should run after rpc init successfully + if kong.sync then + kong.sync:init_worker() end end