Skip to content

Commit

Permalink
fix(clustering): use RPC handshake reported node info in `clustering_…
Browse files Browse the repository at this point in the history
…data_planes` (#13844)

KAG-5772
  • Loading branch information
chronolaw authored Nov 27, 2024
1 parent dee55bd commit c98f385
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 31 deletions.
49 changes: 28 additions & 21 deletions kong/clustering/rpc/manager.lua
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ function _M.new(conf, node_id)

if conf.role == "control_plane" then
self.concentrator = require("kong.clustering.rpc.concentrator").new(self, kong.db)
self.client_ips = {} -- store DP node's ip addr
self.client_info = {} -- store DP node's ip addr and version
end

return setmetatable(self, _MT)
Expand Down Expand Up @@ -96,7 +96,7 @@ function _M:_remove_socket(socket)
self.client_capabilities[node_id] = nil

if self.concentrator then
self.client_ips[node_id] = nil
self.client_info[node_id] = nil
assert(self.concentrator:_enqueue_unsubscribe(node_id))
end
end
Expand Down Expand Up @@ -180,14 +180,6 @@ function _M:_handle_meta_call(c)
assert(type(info.kong_hostname) == "string")
assert(type(info.kong_conf) == "table")

local capabilities_list = info.rpc_capabilities
local node_id = info.kong_node_id

self.client_capabilities[node_id] = {
set = pl_tablex_makeset(capabilities_list),
list = capabilities_list,
}

local payload = {
jsonrpc = "2.0",
result = {
Expand All @@ -203,6 +195,24 @@ function _M:_handle_meta_call(c)
return nil, err
end

local capabilities_list = info.rpc_capabilities
local node_id = info.kong_node_id

self.client_capabilities[node_id] = {
set = pl_tablex_makeset(capabilities_list),
list = capabilities_list,
}

-- we are on cp side
assert(self.concentrator)
assert(self.client_info)

-- store DP's ip addr
self.client_info[node_id] = {
ip = ngx_var.remote_addr,
version = info.kong_version,
}

return node_id
end

Expand Down Expand Up @@ -239,24 +249,24 @@ function _M:_meta_call(c, meta_cap, node_id)
end

if typ ~= "binary" then
return nil, "wrong frame type: " .. type
return nil, "wrong frame type: " .. typ
end

local payload = cjson_decode(data)
assert(payload.jsonrpc == "2.0")

-- now we only support snappy
if payload.result.rpc_frame_encoding ~= RPC_SNAPPY_FRAMED then
return nil, "unknown encoding: " .. payload.result.rpc_frame_encoding
end

local capabilities_list = payload.result.rpc_capabilities

self.client_capabilities[node_id] = {
set = pl_tablex_makeset(capabilities_list),
list = capabilities_list,
}

-- now we only support snappy
if payload.result.rpc_frame_encoding ~= RPC_SNAPPY_FRAMED then
return nil, "unknown encoding: " .. payload.result.rpc_frame_encoding
end

return true
end

Expand Down Expand Up @@ -398,9 +408,6 @@ function _M:handle_websocket()
local s = socket.new(self, wb, node_id)
self:_add_socket(s)

-- store DP's ip addr
self.client_ips[node_id] = ngx_var.remote_addr

s:start()
local res, err = s:join()
self:_remove_socket(s)
Expand Down Expand Up @@ -533,8 +540,8 @@ function _M:get_peers()
end


function _M:get_peer_ip(node_id)
return self.client_ips[node_id]
function _M:get_peer_info(node_id)
return self.client_info[node_id]
end


Expand Down
21 changes: 12 additions & 9 deletions kong/clustering/services/sync/rpc.lua
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ local CLUSTERING_SYNC_STATUS = constants.CLUSTERING_SYNC_STATUS
local SYNC_MUTEX_OPTS = { name = "get_delta", timeout = 0, }


local assert = assert
local ipairs = ipairs
local fmt = string.format
local ngx_null = ngx.null
Expand Down Expand Up @@ -79,13 +80,14 @@ function _M:init_cp(manager)

-- { default = { version = 1000, }, }
local default_namespace_version = default_namespace.version
local node_info = assert(kong.rpc:get_peer_info(node_id))

-- XXX TODO: follow update_sync_status() in control_plane.lua
-- follow update_sync_status() in control_plane.lua
local ok, err = kong.db.clustering_data_planes:upsert({ id = node_id }, {
last_seen = ngx.time(),
hostname = node_id,
ip = kong.rpc:get_peer_ip(node_id), -- try to get the correct ip
version = "3.8.0.0", -- XXX TODO: get from rpc call
ip = node_info.ip, -- get the correct ip
version = node_info.version, -- get from rpc call
sync_status = CLUSTERING_SYNC_STATUS.NORMAL,
config_hash = fmt("%032d", default_namespace_version),
rpc_capabilities = rpc_peers and rpc_peers[node_id] or {},
Expand Down Expand Up @@ -202,12 +204,13 @@ local function do_sync()
return nil, "rpc is not ready"
end

local ns_deltas, err = kong.rpc:call("control_plane", "kong.sync.v2.get_delta",
{ default =
{ version =
tonumber(declarative.get_current_hash()) or 0,
},
})
local msg = { default =
{ version =
tonumber(declarative.get_current_hash()) or 0,
},
}

local ns_deltas, err = kong.rpc:call("control_plane", "kong.sync.v2.get_delta", msg)
if not ns_deltas then
ngx_log(ngx_ERR, "sync get_delta error: ", err)
return true
Expand Down
11 changes: 10 additions & 1 deletion spec/02-integration/18-hybrid_rpc/01-rpc_spec.lua
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
local helpers = require "spec.helpers"
local cjson = require("cjson.safe")
local CLUSTERING_SYNC_STATUS = require("kong.constants").CLUSTERING_SYNC_STATUS

-- we need incremental sync to verify rpc
for _, inc_sync in ipairs { "on" } do
Expand Down Expand Up @@ -58,9 +59,17 @@ for _, strategy in helpers.each_strategy() do
-- TODO: perhaps need a new test method
for _, v in pairs(json.data) do
if v.ip == "127.0.0.1" and v.rpc_capabilities and #v.rpc_capabilities ~= 0 then
table.sort(v.rpc_capabilities)
assert.near(14 * 86400, v.ttl, 3)
assert.matches("^(%d+%.%d+)%.%d+", v.version)
assert.equal(CLUSTERING_SYNC_STATUS.NORMAL, v.sync_status)

local reg = [[^(\d+)\.(\d+)]]
local m = assert(ngx.re.match(v.version, reg))
assert(tonumber(m[1]) >= 3)
assert(tonumber(m[2]) >= 9)

-- check the available rpc service
table.sort(v.rpc_capabilities)
assert.same("kong.sync.v2", v.rpc_capabilities[1])
return true
end
Expand Down

1 comment on commit c98f385

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bazel Build

Docker image available kong/kong:c98f385b0d0ed734dcd527f326e260ecd67f61a8
Artifacts available https://github.com/Kong/kong/actions/runs/12047420498

Please sign in to comment.