From c79e2bef7dea5289111a961dcd0185e0ae5dd499 Mon Sep 17 00:00:00 2001 From: xumin Date: Tue, 17 Dec 2024 17:18:30 +0800 Subject: [PATCH] tests(rpc): rpc mock/hook --- spec/01-unit/19-hybrid/04-rpc_spec.lua | 68 +++++++ .../kong/plugins/rpc-proxy/handler.lua | 59 ++++++ .../kong/plugins/rpc-proxy/schema.lua | 12 ++ spec/helpers.lua | 20 ++ spec/helpers/rpc_mock/client.lua | 58 ++++++ spec/helpers/rpc_mock/default.lua | 8 + spec/helpers/rpc_mock/server.lua | 187 ++++++++++++++++++ 7 files changed, 412 insertions(+) create mode 100644 spec/01-unit/19-hybrid/04-rpc_spec.lua create mode 100644 spec/fixtures/custom_plugins/kong/plugins/rpc-proxy/handler.lua create mode 100644 spec/fixtures/custom_plugins/kong/plugins/rpc-proxy/schema.lua create mode 100644 spec/helpers/rpc_mock/client.lua create mode 100644 spec/helpers/rpc_mock/default.lua create mode 100644 spec/helpers/rpc_mock/server.lua diff --git a/spec/01-unit/19-hybrid/04-rpc_spec.lua b/spec/01-unit/19-hybrid/04-rpc_spec.lua new file mode 100644 index 000000000000..938f881f07f2 --- /dev/null +++ b/spec/01-unit/19-hybrid/04-rpc_spec.lua @@ -0,0 +1,68 @@ +-- by importing helpers, we initialize the kong PDK module +local helpers = require "spec.helpers" +local server = require("spec.helpers.rpc_mock.server") +local client = require("spec.helpers.rpc_mock.client") + +describe("rpc v2", function() + describe("full sync pagination", function() + describe("server side", function() + local server_mock + local port + lazy_setup(function() + server_mock = server.new() + assert(server_mock:start()) + port = server_mock.listen + + helpers.start_kong({ + role = "data_plane", + cluster_cert = "spec/fixtures/kong_spec.crt", + cluster_cert_key = "spec/fixtures/kong_spec.key", + cluster_rpc = "on", + cluster_rpc_listen = "localhost:" .. port, + cluster_incremental_sync = "on", + }) + end) + lazy_teardown(function() + server_mock:stop(true) + + helpers.stop_kong(nil, true) + end) + + it("works", function() + helpers.wait_until(function() + return server_mock.records and next(server_mock.records) + end,20) + end) + end) + + describe("client side", function() + local client_mock + lazy_setup(function() + client_mock = assert(client.new()) + helpers.start_kong({ + role = "control_plane", + cluster_cert = "spec/fixtures/kong_spec.crt", + cluster_cert_key = "spec/fixtures/kong_spec.key", + cluster_rpc = "on", + cluster_incremental_sync = "on", + }) + client_mock:start() + end) + lazy_teardown(function() + helpers.stop_kong(nil, true) + client_mock:stop() + end) + + it("works", function() + client_mock:wait_until_connected() + + local res, err = client_mock:call("control_plane", "kong.sync.v2.get_delta", { default = { version = 0,},}) + assert.is_nil(err) + assert.is_table(res and res.default and res.default.deltas) + + local res, err = client_mock:call("control_plane", "kong.sync.v2.unknown", { default = { },}) + assert.is_string(err) + end) + end) + end) +end) diff --git a/spec/fixtures/custom_plugins/kong/plugins/rpc-proxy/handler.lua b/spec/fixtures/custom_plugins/kong/plugins/rpc-proxy/handler.lua new file mode 100644 index 000000000000..6d893cbd4213 --- /dev/null +++ b/spec/fixtures/custom_plugins/kong/plugins/rpc-proxy/handler.lua @@ -0,0 +1,59 @@ +local rpc_mgr = require("kong.clustering.rpc.manager") +local kong_meta = require("kong.meta") + +local _M = { + PRIORITY = 1000, + VERSION = kong_meta.version, +} + +local original_callbacks = {} + +function _M.init_worker() + kong.rpc.callbacks:register("kong.rpc.proxy.register", function(node_id, register_payload) + local proxy_apis = register_payload.proxy_apis + + for _, proxy_api in ipairs(proxy_apis) do + kong.log.info("Hook registering RPC proxy API: ", proxy_api) + local original = kong.rpc.callbacks[proxy_api] + if original and not original_callbacks[proxy_api] then + original_callbacks[proxy_api] = original + end + kong.rpc.callbacks[proxy_api] = nil + kong.rpc.callbacks:register(proxy_api, function(client_id, payload) + local res, err = kong.rpc:call(node_id, "kong.rpc.proxy", { method = proxy_api, node_id = client_id, payload = payload }) + if not res then + return nil, "Failed to proxy(" .. node_id .. "): " .. err + end + + if res.error then + return nil, res.error + end + + if res.prehook or res.posthook then + if res.prehook then + payload = res.args + end + + local origin_res, origin_err = original(client_id, payload) + + if res.posthook then + res, err = kong.rpc:call(node_id, "kong.rpc.proxy.posthook", { method = proxy_api, node_id = client_id, payload = {result = origin_res, error = origin_err} }) + if not res then + return nil, "Failed to call post hook(" .. node_id .. "): " .. err + end + + return res.result, res.error + end + elseif res.mock then + return res.result, res.error + end + + return nil, "invalid response from proxy" + end) + end + + return true + end) +end + +return _M diff --git a/spec/fixtures/custom_plugins/kong/plugins/rpc-proxy/schema.lua b/spec/fixtures/custom_plugins/kong/plugins/rpc-proxy/schema.lua new file mode 100644 index 000000000000..41eb85a06209 --- /dev/null +++ b/spec/fixtures/custom_plugins/kong/plugins/rpc-proxy/schema.lua @@ -0,0 +1,12 @@ +return { + name = "rpc-proxy", + fields = { + { + config = { + type = "record", + fields = { + }, + }, + }, + }, +} diff --git a/spec/helpers.lua b/spec/helpers.lua index 22b67c4434d3..c41854e8bf26 100644 --- a/spec/helpers.lua +++ b/spec/helpers.lua @@ -28,6 +28,26 @@ local server = reload_module("spec.internal.server") local client = reload_module("spec.internal.client") local wait = reload_module("spec.internal.wait") +-- redo the patches to apply the kong global patches +local _timerng + +_timerng = require("resty.timerng").new({ + min_threads = 16, + max_threads = 32, +}) + +_timerng:start() + +_G.timerng = _timerng + +_G.ngx.timer.at = function (delay, callback, ...) + return _timerng:at(delay, callback, ...) +end + +_G.ngx.timer.every = function (interval, callback, ...) + return _timerng:every(interval, callback, ...) +end + ---------------- -- Variables/constants diff --git a/spec/helpers/rpc_mock/client.lua b/spec/helpers/rpc_mock/client.lua new file mode 100644 index 000000000000..669058861760 --- /dev/null +++ b/spec/helpers/rpc_mock/client.lua @@ -0,0 +1,58 @@ +-- by importing helpers, we ensure the kong PDK module is initialized +local helpers = require "spec.helpers" +local rpc_mgr = require("kong.clustering.rpc.manager") +local default_cert = require("spec.helpers.rpc_mock.default").default_cert + +local _M = {} +local _MT = { __index = _M, } + +local default_dp_conf = { + role = "data_plane", + cluster_control_plane = "localhost:8005", +} + +setmetatable(default_dp_conf, { __index = default_cert }) +local default_meta = { __index = default_dp_conf, } + +local function do_nothing() end + +local function client_stop(rpc_mgr) + -- a hacky way to stop rpc_mgr from reconnecting + rpc_mgr.try_connect = do_nothing + + -- this will stop all connections + for _, socket in pairs(rpc_mgr.clients) do + for conn in pairs(socket) do + conn:stop() + end + end +end + +local function client_is_connected(rpc_mgr) + for _, socket in pairs(rpc_mgr.clients) do + for conn in pairs(socket) do + return true + end + end + return false +end + +local function client_wait_until_connected(rpc_mgr, timeout) + return helpers.wait_until(function() + return rpc_mgr:is_connected() + end, timeout or 15) +end + +-- TODO: let client not emits logs as it's expected to fail to connect for the first few seconds +function _M.new(opts) + opts = opts or {} + setmetatable(opts, default_meta) + local ret = rpc_mgr.new(default_dp_conf, opts.name or "dp") + ret.stop = client_stop + ret.is_connected = client_is_connected + ret.start = ret.try_connect + ret.wait_until_connected = client_wait_until_connected + return ret +end + +return _M diff --git a/spec/helpers/rpc_mock/default.lua b/spec/helpers/rpc_mock/default.lua new file mode 100644 index 000000000000..8890e0d77bc8 --- /dev/null +++ b/spec/helpers/rpc_mock/default.lua @@ -0,0 +1,8 @@ +local default_cert = { + cluster_cert = "spec/fixtures/kong_spec.crt", + cluster_cert_key = "spec/fixtures/kong_spec.key", +} + +return { + default_cert = default_cert, +} diff --git a/spec/helpers/rpc_mock/server.lua b/spec/helpers/rpc_mock/server.lua new file mode 100644 index 000000000000..a613a58f0e62 --- /dev/null +++ b/spec/helpers/rpc_mock/server.lua @@ -0,0 +1,187 @@ +local helpers = require("spec.helpers") +local client = require("spec.helpers.rpc_mock.client") +local default_cert = require("spec.helpers.rpc_mock.default").default_cert + +local _M = {} +local _MT = { __index = _M, } + +function _M.new(opts) + opts = opts or {} + opts.prefix = opts.prefix or "servroot_rpc_tap" + opts.role = "control_plane" + opts.plugins = "bundled,rpc-proxy" + opts.listen = opts.listen or 8005 + opts.cluster_listen = opts.cluster_listen or ("0.0.0.0:" .. opts.listen) + opts.mocks = opts.mocks or {} + opts.prehooks = opts.prehooks or {} + opts.posthooks = opts.posthooks or {} + opts.cluster_rpc = "on" + opts.cluster_incremental_sync = opts.cluster_incremental_sync or "on" + if opts.interception == nil then + opts.interception = true + end + + for k, v in pairs(default_cert) do + if opts[k] == nil then + opts[k] = v + end + end + + return setmetatable(opts, _MT) +end + +function _M.start(self) + local bp, db = helpers.get_db_utils(strategy, nil, { "rpc-proxy" }) + + local plugin = db.plugins:insert({ + name = "rpc-proxy", + config = {}, + }) + + assert(helpers.start_kong(self)) + + self.proxy_client = client.new({ + cluster_control_plane = self.cluster_listen, + }) + + if self.interception then + self:enable_inception() + end + + self.proxy_client.callbacks:register("kong.rpc.proxy.mock", function(proxy_id, proxy_payload) + local method, node_id, payload = proxy_payload.method, proxy_payload.node_id, proxy_payload.payload + local cb = self.mocks[method] + if cb then + local res, err = cb(node_id, payload, proxy_id, self) + return { + mock = true, + result = res, + error = err, + } + end + + local prehook = self.prehooks[method] or self.prehooks["*"] + local posthook = self.posthooks[method] or self.posthooks["*"] + local result = { + prehook = prehook and true, + posthook = posthook and true, + } + + if prehook then + local res, err = prehook(node_id, payload, proxy_id, self, method) + if not res then + return nil, err + end + + result.args = res + end + + return result + end) + + self.proxy_client.callbacks:register("kong.rpc.proxy.posthook", function(proxy_id, proxy_payload) + local method, node_id, payload = proxy_payload.method, proxy_payload.node_id, proxy_payload.payload + local cb = self.posthooks[method] or self.posthooks["*"] + if not cb then + return nil, "no callback registered for method: " .. method + end + + return cb(node_id, payload, proxy_id, self, method) + end) + + self = setmetatable(self, _MT) + + self.proxy_client:start() + self.proxy_client:wait_until_connected() + + if next(self.mocks) or next(self.prehooks) or next(self.posthooks) then + return self:register_proxy() + end + + return true +end + +function _M:register_proxy() + local hooked = {} + for api_name, cb in pairs(self.mocks) do + hooked[api_name] = true + end + for api_name, cb in pairs(self.prehooks) do + hooked[api_name] = true + end + for api_name, cb in pairs(self.posthooks) do + hooked[api_name] = true + end + local hooked_list + + if hooked["*"] then + hooked_list = self.proxy_client.client_capabilities.control_plane.list + else + hooked_list = {} + for api_name in pairs(hooked) do + hooked_list[#hooked_list + 1] = api_name + end + end + + return self.proxy_client:call("control_plane", "kong.rpc.proxy.register", { + proxy_apis = hooked_list, + }) +end + +function _M:mock_api(api_name, cb) + self.mocks[api_name] = cb +end + +function _M:prehook_api(api_name, cb) + self.prehooks[api_name] = cb +end + +function _M:posthook_api(api_name, cb) + self.posthooks[api_name] = cb +end + +local function get_records(server) + local records = server.records + if not records then + records = {} + server.records = records + end + return records +end + +-- TODO: add req ID for correlation +local function default_inception_prehook(node_id, payload, proxy_id, server, method) + local records = get_records(server) + records[#records + 1] = { + request = true, + node_id = node_id, + payload = payload, + proxy_id = proxy_id, + method = method, + } + return payload +end + +local function default_inception_posthook(node_id, payload, proxy_id, server, method) + local records = get_records(server) + records[#records + 1] = { + response = true, + node_id = node_id, + payload = payload, + proxy_id = proxy_id, + method = method, + } + return payload +end + +function _M:enable_inception() + self.prehooks["*"] = default_inception_prehook + self.posthooks["*"] = default_inception_posthook +end + +function _M:stop(...) + helpers.stop_kong(self.prefix, ...) + self.proxy_client:stop() +end + +return _M