From c98f385b0d0ed734dcd527f326e260ecd67f61a8 Mon Sep 17 00:00:00 2001 From: Chrono Date: Wed, 27 Nov 2024 17:30:00 +0800 Subject: [PATCH] fix(clustering): use RPC handshake reported node info in `clustering_data_planes` (#13844) KAG-5772 --- kong/clustering/rpc/manager.lua | 49 +++++++++++-------- kong/clustering/services/sync/rpc.lua | 21 ++++---- .../18-hybrid_rpc/01-rpc_spec.lua | 11 ++++- 3 files changed, 50 insertions(+), 31 deletions(-) diff --git a/kong/clustering/rpc/manager.lua b/kong/clustering/rpc/manager.lua index f761f47d869c..2d2d22901adf 100644 --- a/kong/clustering/rpc/manager.lua +++ b/kong/clustering/rpc/manager.lua @@ -57,7 +57,7 @@ function _M.new(conf, node_id) if conf.role == "control_plane" then self.concentrator = require("kong.clustering.rpc.concentrator").new(self, kong.db) - self.client_ips = {} -- store DP node's ip addr + self.client_info = {} -- store DP node's ip addr and version end return setmetatable(self, _MT) @@ -96,7 +96,7 @@ function _M:_remove_socket(socket) self.client_capabilities[node_id] = nil if self.concentrator then - self.client_ips[node_id] = nil + self.client_info[node_id] = nil assert(self.concentrator:_enqueue_unsubscribe(node_id)) end end @@ -180,14 +180,6 @@ function _M:_handle_meta_call(c) assert(type(info.kong_hostname) == "string") assert(type(info.kong_conf) == "table") - local capabilities_list = info.rpc_capabilities - local node_id = info.kong_node_id - - self.client_capabilities[node_id] = { - set = pl_tablex_makeset(capabilities_list), - list = capabilities_list, - } - local payload = { jsonrpc = "2.0", result = { @@ -203,6 +195,24 @@ function _M:_handle_meta_call(c) return nil, err end + local capabilities_list = info.rpc_capabilities + local node_id = info.kong_node_id + + self.client_capabilities[node_id] = { + set = pl_tablex_makeset(capabilities_list), + list = capabilities_list, + } + + -- we are on cp side + assert(self.concentrator) + assert(self.client_info) + + -- store DP's ip addr + self.client_info[node_id] = { + ip = ngx_var.remote_addr, + version = info.kong_version, + } + return node_id end @@ -239,12 +249,17 @@ function _M:_meta_call(c, meta_cap, node_id) end if typ ~= "binary" then - return nil, "wrong frame type: " .. type + return nil, "wrong frame type: " .. typ end local payload = cjson_decode(data) assert(payload.jsonrpc == "2.0") + -- now we only support snappy + if payload.result.rpc_frame_encoding ~= RPC_SNAPPY_FRAMED then + return nil, "unknown encoding: " .. payload.result.rpc_frame_encoding + end + local capabilities_list = payload.result.rpc_capabilities self.client_capabilities[node_id] = { @@ -252,11 +267,6 @@ function _M:_meta_call(c, meta_cap, node_id) list = capabilities_list, } - -- now we only support snappy - if payload.result.rpc_frame_encoding ~= RPC_SNAPPY_FRAMED then - return nil, "unknown encoding: " .. payload.result.rpc_frame_encoding - end - return true end @@ -398,9 +408,6 @@ function _M:handle_websocket() local s = socket.new(self, wb, node_id) self:_add_socket(s) - -- store DP's ip addr - self.client_ips[node_id] = ngx_var.remote_addr - s:start() local res, err = s:join() self:_remove_socket(s) @@ -533,8 +540,8 @@ function _M:get_peers() end -function _M:get_peer_ip(node_id) - return self.client_ips[node_id] +function _M:get_peer_info(node_id) + return self.client_info[node_id] end diff --git a/kong/clustering/services/sync/rpc.lua b/kong/clustering/services/sync/rpc.lua index 5783107f0caf..0b6970e4c2f7 100644 --- a/kong/clustering/services/sync/rpc.lua +++ b/kong/clustering/services/sync/rpc.lua @@ -18,6 +18,7 @@ local CLUSTERING_SYNC_STATUS = constants.CLUSTERING_SYNC_STATUS local SYNC_MUTEX_OPTS = { name = "get_delta", timeout = 0, } +local assert = assert local ipairs = ipairs local fmt = string.format local ngx_null = ngx.null @@ -79,13 +80,14 @@ function _M:init_cp(manager) -- { default = { version = 1000, }, } local default_namespace_version = default_namespace.version + local node_info = assert(kong.rpc:get_peer_info(node_id)) - -- XXX TODO: follow update_sync_status() in control_plane.lua + -- follow update_sync_status() in control_plane.lua local ok, err = kong.db.clustering_data_planes:upsert({ id = node_id }, { last_seen = ngx.time(), hostname = node_id, - ip = kong.rpc:get_peer_ip(node_id), -- try to get the correct ip - version = "3.8.0.0", -- XXX TODO: get from rpc call + ip = node_info.ip, -- get the correct ip + version = node_info.version, -- get from rpc call sync_status = CLUSTERING_SYNC_STATUS.NORMAL, config_hash = fmt("%032d", default_namespace_version), rpc_capabilities = rpc_peers and rpc_peers[node_id] or {}, @@ -202,12 +204,13 @@ local function do_sync() return nil, "rpc is not ready" end - local ns_deltas, err = kong.rpc:call("control_plane", "kong.sync.v2.get_delta", - { default = - { version = - tonumber(declarative.get_current_hash()) or 0, - }, - }) + local msg = { default = + { version = + tonumber(declarative.get_current_hash()) or 0, + }, + } + + local ns_deltas, err = kong.rpc:call("control_plane", "kong.sync.v2.get_delta", msg) if not ns_deltas then ngx_log(ngx_ERR, "sync get_delta error: ", err) return true diff --git a/spec/02-integration/18-hybrid_rpc/01-rpc_spec.lua b/spec/02-integration/18-hybrid_rpc/01-rpc_spec.lua index 118222e0ae36..6b717e293cbf 100644 --- a/spec/02-integration/18-hybrid_rpc/01-rpc_spec.lua +++ b/spec/02-integration/18-hybrid_rpc/01-rpc_spec.lua @@ -1,5 +1,6 @@ local helpers = require "spec.helpers" local cjson = require("cjson.safe") +local CLUSTERING_SYNC_STATUS = require("kong.constants").CLUSTERING_SYNC_STATUS -- we need incremental sync to verify rpc for _, inc_sync in ipairs { "on" } do @@ -58,9 +59,17 @@ for _, strategy in helpers.each_strategy() do -- TODO: perhaps need a new test method for _, v in pairs(json.data) do if v.ip == "127.0.0.1" and v.rpc_capabilities and #v.rpc_capabilities ~= 0 then - table.sort(v.rpc_capabilities) assert.near(14 * 86400, v.ttl, 3) + assert.matches("^(%d+%.%d+)%.%d+", v.version) + assert.equal(CLUSTERING_SYNC_STATUS.NORMAL, v.sync_status) + + local reg = [[^(\d+)\.(\d+)]] + local m = assert(ngx.re.match(v.version, reg)) + assert(tonumber(m[1]) >= 3) + assert(tonumber(m[2]) >= 9) + -- check the available rpc service + table.sort(v.rpc_capabilities) assert.same("kong.sync.v2", v.rpc_capabilities[1]) return true end