Skip to content

Commit

Permalink
refactor(clustering/rpc): rework on rpc initial connection (#13824)
Browse files Browse the repository at this point in the history
  • Loading branch information
chronolaw authored Nov 11, 2024
1 parent 7e9dbf9 commit 2cd6c2d
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 32 deletions.
38 changes: 31 additions & 7 deletions kong/clustering/rpc/manager.lua
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,16 @@ end


function _M:_add_socket(socket, capabilities_list)
local sockets = self.clients[socket.node_id]
local node_id = socket.node_id

local sockets = self.clients[node_id]
if not sockets then
assert(self.concentrator:_enqueue_subscribe(socket.node_id))
assert(self.concentrator:_enqueue_subscribe(node_id))
sockets = setmetatable({}, { __mode = "k", })
self.clients[socket.node_id] = sockets
self.clients[node_id] = sockets
end

self.client_capabilities[socket.node_id] = {
self.client_capabilities[node_id] = {
set = pl_tablex_makeset(capabilities_list),
list = capabilities_list,
}
Expand Down Expand Up @@ -292,6 +294,30 @@ function _M:handle_websocket()
end


function _M:try_connect(reconnection_delay)
ngx.timer.at(reconnection_delay or 0, function(premature)
self:connect(premature,
"control_plane", -- node_id
self.conf.cluster_control_plane, -- host
"/v2/outlet", -- path
self.cluster_cert.cdata,
self.cluster_cert_key)
end)
end


function _M:init_worker()
if self.conf.role == "data_plane" then
-- data_plane will try to connect to cp
self:try_connect()

else
-- control_plane
self.concentrator:start()
end
end


function _M:connect(premature, node_id, host, path, cert, key)
if premature then
return
Expand Down Expand Up @@ -368,9 +394,7 @@ function _M:connect(premature, node_id, host, path, cert, key)
::err::

if not exiting() then
ngx.timer.at(reconnection_delay, function(premature)
self:connect(premature, node_id, host, path, cert, key)
end)
self:try_connect(reconnection_delay)
end
end

Expand Down
38 changes: 13 additions & 25 deletions kong/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -882,15 +882,15 @@ function Kong.init_worker()

if kong.clustering then
-- full sync dp

local is_dp_full_sync_agent = process.type() == "privileged agent" and not kong.sync

if is_control_plane(kong.configuration) or -- CP needs to support both full and incremental sync
is_dp_full_sync_agent -- full sync is only enabled for DP if incremental sync is disabled
then
kong.clustering:init_worker()
end

-- DP full sync agent skips the rest of the init_worker
if is_dp_full_sync_agent then
return
Expand Down Expand Up @@ -991,30 +991,18 @@ function Kong.init_worker()
plugin_servers.start()
end

if kong.clustering then
-- rpc and incremental sync
if kong.rpc and is_http_module then

-- only available in http subsystem
local cluster_tls = require("kong.clustering.tls")

if is_data_plane(kong.configuration) then
ngx.timer.at(0, function(premature)
kong.rpc:connect(premature,
"control_plane", kong.configuration.cluster_control_plane,
"/v2/outlet",
cluster_tls.get_cluster_cert(kong.configuration).cdata,
cluster_tls.get_cluster_cert_key(kong.configuration))
end)

else -- control_plane
kong.rpc.concentrator:start()
end
-- rpc and incremental sync
if is_http_module then

-- init incremental sync
if kong.sync then
kong.sync:init_worker()
end
-- init rpc connection
if kong.rpc then
kong.rpc:init_worker()
end

-- init incremental sync
-- should run after rpc init successfully
if kong.sync then
kong.sync:init_worker()
end
end

Expand Down

1 comment on commit 2cd6c2d

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bazel Build

Docker image available kong/kong:2cd6c2d7d00ca04cff3d767902473392bf7dc9b0
Artifacts available https://github.com/Kong/kong/actions/runs/11774299425

Please sign in to comment.