diff --git a/bin/busted b/bin/busted index e0635aabd6f7..e59760322faf 100755 --- a/bin/busted +++ b/bin/busted @@ -19,6 +19,9 @@ local cert_path do busted_cert_content = busted_cert_content .. "\n" .. pl_file.read(system_cert_path) end + local cluster_cert_content = assert(pl_file.read("spec/fixtures/kong_clustering.crt")) + busted_cert_content = busted_cert_content .. "\n" .. cluster_cert_content + pl_file.write(busted_cert_file, busted_cert_content) cert_path = busted_cert_file end diff --git a/spec/02-integration/01-helpers/05-rpc-mock_spec.lua b/spec/02-integration/01-helpers/05-rpc-mock_spec.lua new file mode 100644 index 000000000000..0776b9db108b --- /dev/null +++ b/spec/02-integration/01-helpers/05-rpc-mock_spec.lua @@ -0,0 +1,240 @@ +local helpers = require("spec.helpers") +local misc = require("spec.internal.misc") +local cp = require("spec.helpers.rpc_mock.cp") +local dp = require("spec.helpers.rpc_mock.dp") +local setup = require("spec.helpers.rpc_mock.setup") +local get_node_id = misc.get_node_id +local DP_PREFIX = "servroot_dp" + +describe("rpc mock/hooc", function() + lazy_setup(setup.setup) + lazy_teardown(setup.teardown) + + describe("CP side mock", function() + local mocked_cp, node_id + + lazy_setup(function() + local _, db = helpers.get_db_utils(nil, nil, { "rpc-hello-test" }) + + mocked_cp = cp.new({ + plugins = "bundled,rpc-hello-test", + }) + + local service = assert(db.services:insert({ + host = helpers.mock_upstream_host, + })) + + assert(db.routes:insert({ + service = service, + paths = { "/" }, + })) + + assert(db.plugins:insert({ + service = service, + name = "rpc-hello-test", + config = {}, + })) + + assert(mocked_cp:start()) + + assert(helpers.start_kong({ + prefix = DP_PREFIX, + database = "off", + role = "data_plane", + cluster_mtls = "shared", + cluster_cert = "spec/fixtures/kong_clustering.crt", + cluster_cert_key = "spec/fixtures/kong_clustering.key", + nginx_conf = "spec/fixtures/custom_nginx.template", + plugins = "bundled,rpc-hello-test", + cluster_rpc = "on", + cluster_rpc_sync = "on", + log_level = "debug", + cluster_control_plane = "127.0.0.1:8005", + })) + + node_id = get_node_id(DP_PREFIX) + mocked_cp:wait_for_node(node_id) + end) + + lazy_teardown(function() + mocked_cp:stop(true) + helpers.stop_kong(DP_PREFIX, true) + end) + + it("interception", function() + local body + helpers.pwait_until(function() + local proxy_client = assert(helpers.proxy_client()) + + body = assert.res_status(200, proxy_client:send { + method = "GET", + path = "/", + headers = { + ["x-greeting"] = "world", + } + }) + end, 10) + + assert.equal("hello world", body) + + -- wait for the "kong.sync.v2.get_delta" call and get the record + local record = mocked_cp:wait_for_a_call(function(call) + return call.method == "kong.test.hello" + end) + + -- ensure the content of the call is correct + assert.same({ + method = 'kong.test.hello', + node_id = node_id, + proxy_id = 'control_plane', + request = 'world', + response = { + result = 'hello world', + }, + }, record) + end) + + it("mock", function() + finally(function() + mocked_cp:unmock("kong.test.hello") + end) + local called = false + mocked_cp:mock("kong.test.hello", function(node_id, payload) + called = true + return "goodbye " .. payload + end) + + local body + helpers.pwait_until(function() + local proxy_client = assert(helpers.proxy_client()) + + body = assert.res_status(200, proxy_client:send { + method = "GET", + path = "/", + headers = { + ["x-greeting"] = "world", + } + }) + end, 10) + + assert.equal("goodbye world", body) + assert.truthy(called) + end) + + it("call", function() + local res, err = mocked_cp:call(node_id, "kong.test.hello", "world") + assert.is_nil(err) + assert.equal("hello world", res) + + local res, err = mocked_cp:call(node_id, "kong.test.unknown", "world") + assert.is_string(err) + assert.is_nil(res) + end) + + it("prehook/posthook", function() + local prehook_called = false + mocked_cp:prehook("kong.test.hello", function(node_id, payload) + prehook_called = true + return node_id .. "'s " .. payload + end) + + local body + helpers.pwait_until(function() + local proxy_client = assert(helpers.proxy_client()) + + body = assert.res_status(200, proxy_client:send { + method = "GET", + path = "/", + headers = { + ["x-greeting"] = "world", + } + }) + end, 10) + + assert.equal("hello " .. node_id .. "'s world", body) + assert.truthy(prehook_called) + + prehook_called = false + local posthook_called = false + mocked_cp:posthook("kong.test.hello", function(node_id, payload) + posthook_called = true + return "Server: " .. payload.result + end) + + local proxy_client = assert(helpers.proxy_client()) + + body = assert.res_status(200, proxy_client:send { + method = "GET", + path = "/", + headers = { + ["x-greeting"] = "world", + } + }) + + assert.equal("Server: hello " .. node_id .. "'s world", body) + assert.truthy(prehook_called) + assert.truthy(posthook_called) + end) + end) + + describe("DP side", function() + local mocked_dp + local called = false + + lazy_setup(function() + helpers.get_db_utils() + assert(helpers.start_kong({ + role = "control_plane", + cluster_mtls = "shared", + cluster_cert = "spec/fixtures/kong_clustering.crt", + cluster_cert_key = "spec/fixtures/kong_clustering.key", + plugins = "bundled,rpc-hello-test", + nginx_conf = "spec/fixtures/custom_nginx.template", + cluster_rpc = "on", + cluster_rpc_sync = "on", + })) + + mocked_dp = assert(dp.new()) + + mocked_dp.callbacks:register("kong.test.hello", function(node_id, payload) + called = true + return "goodbye " .. payload + end) + + mocked_dp:start() + mocked_dp:wait_until_connected() + end) + + lazy_teardown(function() + helpers.stop_kong() + mocked_dp:stop() + end) + + it("rpc call", function() + local res, err = mocked_dp:call("control_plane", "kong.test.hello", "world") + assert.is_nil(err) + assert.equal("hello world", res) + + local res, err = mocked_dp:call("control_plane", "kong.sync.v2.unknown", { default = {},}) + assert.is_string(err) + assert.is_nil(res) + end) + + it("get called", function() + local admin_client = helpers.admin_client() + local node_id = mocked_dp.node_id + + local res = assert.res_status(200, admin_client:send { + method = "GET", + path = "/rpc-hello-test", + headers = { + ["x-greeting"] = "world", + ["x-node-id"] = node_id, + }, + }) + + assert.equal("goodbye world", res) + assert.truthy(called) + end) + end) +end) diff --git a/spec/02-integration/18-hybrid_rpc/05-sync-rpc_spec.lua b/spec/02-integration/18-hybrid_rpc/05-sync-rpc_spec.lua new file mode 100644 index 000000000000..26aa9b4b5b10 --- /dev/null +++ b/spec/02-integration/18-hybrid_rpc/05-sync-rpc_spec.lua @@ -0,0 +1,156 @@ +local helpers = require("spec.helpers") +local misc = require("spec.internal.misc") +local cp = require("spec.helpers.rpc_mock.cp") +local dp = require("spec.helpers.rpc_mock.dp") +local setup = require("spec.helpers.rpc_mock.setup") +local get_node_id = misc.get_node_id +local DP_PREFIX = "servroot_dp" + +local function change_config() + -- the initial sync is flaky. let's trigger a sync by creating a service + local admin_client = helpers.admin_client() + assert.res_status(201, admin_client:send { + method = "POST", + path = "/services/", + body = { + url = "http://example.com", + }, + headers = { + ["Content-Type"] = "application/json", + }, + }) +end + +describe("kong.sync.v2", function() + lazy_setup(setup.setup) + lazy_teardown(setup.teardown) + + describe("CP side", function() + local mocked_cp, node_id + + lazy_setup(function() + helpers.get_db_utils() + + mocked_cp = cp.new() + assert(mocked_cp:start()) + + assert(helpers.start_kong({ + prefix = DP_PREFIX, + database = "off", + role = "data_plane", + cluster_mtls = "shared", + cluster_cert = "spec/fixtures/kong_clustering.crt", + cluster_cert_key = "spec/fixtures/kong_clustering.key", + nginx_conf = "spec/fixtures/custom_nginx.template", + cluster_rpc = "on", + cluster_rpc_sync = "on", + log_level = "debug", + cluster_control_plane = "127.0.0.1:8005", + })) + + node_id = get_node_id(DP_PREFIX) + + mocked_cp:wait_for_node(node_id) + end) + + lazy_teardown(function() + mocked_cp:stop() + helpers.stop_kong(DP_PREFIX) + end) + + it("config change", function() + -- this get DP to make a "kong.sync.v2.get_delta" call to the CP + -- CP->DP: notify_new_version + -- DP->CP: get_delta + change_config() + + -- wait for the "kong.sync.v2.get_delta" call and get the record + local record = mocked_cp:wait_for_a_call() + -- ensure the content of the call is correct + assert.is_table(record.response.result.default.deltas) + end) + + it("notify_new_version triggers get_delta", function() + local called = false + mocked_cp:mock("kong.sync.v2.get_delta", function(node_id, payload) + called = true + return { default = { version = 100, deltas = {} } } + end) + + -- make a call from the mocked cp + -- CP->DP: notify_new_version + assert(mocked_cp:call(node_id, "kong.sync.v2.notify_new_version", { default = { new_version = 100, } })) + + -- DP->CP: get_delta + -- the dp after receiving the notification will make a call to the cp + -- which is mocked + -- the mocking handler is called + helpers.wait_until(function() + return called + end, 20) + end) + end) + + describe("DP side", function() + local mocked_dp, register_dp + local called = false + + lazy_setup(function() + helpers.get_db_utils() + assert(helpers.start_kong({ + role = "control_plane", + cluster_mtls = "shared", + cluster_cert = "spec/fixtures/kong_clustering.crt", + cluster_cert_key = "spec/fixtures/kong_clustering.key", + nginx_conf = "spec/fixtures/custom_nginx.template", + cluster_rpc = "on", + cluster_rpc_sync = "on", + })) + + mocked_dp = assert(dp.new()) + + mocked_dp.callbacks:register("kong.sync.v2.notify_new_version", function(node_id, payload) + called = true + end) + + mocked_dp:start() + mocked_dp:wait_until_connected() + + + -- this is a workaround to registers the data plane node + -- CP does not register the DP node until it receives a call from the DP + function register_dp() + local res, err = mocked_dp:call("control_plane", "kong.sync.v2.get_delta", { default = { version = 0,},}) + assert.is_nil(err) + assert.is_table(res and res.default and res.default.deltas) + end + end) + + lazy_teardown(function() + helpers.stop_kong() + mocked_dp:stop() + end) + + it("rpc call", function() + local res, err = mocked_dp:call("control_plane", "kong.sync.v2.get_delta", { default = { version = 0,},}) + assert.is_nil(err) + assert.is_table(res and res.default and res.default.deltas) + + local res, err = mocked_dp:call("control_plane", "kong.sync.v2.unknown", { default = {},}) + assert.is_string(err) + assert.is_nil(res) + end) + + it("config change triggers notify_new_version", function() + register_dp() + + -- this makes CP to initiate a "kong.sync.notify_new_version" call to DP + change_config() + + -- the mocking handler is called + helpers.wait_until(function() + return called + end, 20) + end) + end) +end) diff --git a/spec/fixtures/custom_plugins/kong/plugins/rpc-debug/handler.lua b/spec/fixtures/custom_plugins/kong/plugins/rpc-debug/handler.lua new file mode 100644 index 000000000000..0e857960e4b2 --- /dev/null +++ b/spec/fixtures/custom_plugins/kong/plugins/rpc-debug/handler.lua @@ -0,0 +1,126 @@ +--- This plugin serves as a bridge for debugging RPC calls, allowing to incercept and manipulate the calls +-- debugging is supported by a set of RPC calls, +-- CP side: +-- 1. kong.rpc.debug.register: registers self as the debugger of the CP +-- params: nil +-- returns: true +-- 2. kong.rpc.debug.call: let CP to call a method on a node +-- returns: { result = "result", error = "error" } +-- params: { node_id = "node_id", method = "method", args = { ... } } +-- 3. kong.rpc.debug.lua_code: let CP to execute lua code on a node +-- params: "lua code" +-- returns: the return value of the lua code +-- +--- debugger side: +-- 1. kong.rpc.debug.call_handler: the debugger will receive a call from the CP when a hooked API is called +-- params: { call_seq = "call_seq", method = "method", node_id = "node_id", payload = { ... } } +-- the debugger can return 2 types of responses: +-- a. { mock = true, result = "result", error = "error" }, if the API is mocked +-- b. { prehook = true/false, posthook = true/false, args = { ... }/nil }, boolean to be true if a prehook/posthook is present, and args to be the manipulated args +-- 2. kong.rpc.debug.call_handler_post: the debugger will receive a call from the CP when a posthook is called +-- params: { call_seq = "call_seq", method = "method", node_id = "node_id", payload = { result = "result", error = "error" } } +-- return: { result = "result", error = "error" } + +local kong_meta = require("kong.meta") +local shallow_copy = require("kong.tools.table").shallow_copy +local debugger_prefix = "kong.rpc.debug." + +local _M = { + PRIORITY = 1000, + VERSION = kong_meta.version, +} + + +local function hook(debugger_node_id) + local original_callbacks = shallow_copy(kong.rpc.callbacks.callbacks) + local next_call_seq = 0 + for api, cb in pairs(original_callbacks) do + if api:sub(1, #debugger_prefix) == "kong.rpc.debug." then + goto skip + end + + kong.log.info("hooking registering RPC proxy API: ", api) + -- re-register + kong.rpc.callbacks.callbacks[api] = nil + kong.rpc.callbacks:register(api, function(node_id, payload) + local call_seq = next_call_seq + next_call_seq = next_call_seq + 1 + kong.log.info("hooked proxy API ", api, " called by node: ", node_id) + kong.log.info("forwarding to node: ", node_id) + local res, err = kong.rpc:call(debugger_node_id, "kong.rpc.debug.call_handler", { call_seq = call_seq, method = api, node_id = node_id, payload = payload }) + if not res then + return nil, "Failed to call debugger(" .. debugger_node_id .. "): " .. err + end + + if res.error then + return nil, res.error + end + + -- no prehook/posthook, directly return mock result + if res.mock then + return res.result, res.error + end + + if res.prehook then + payload = res.args + end + + local call_res, call_err = cb(node_id, payload) + + if res.posthook then + res, err = kong.rpc:call(debugger_node_id, "kong.rpc.debug.call_handler_post", + { call_seq = call_seq, method = api, node_id = node_id, payload = { result = call_res, error = call_err } }) + if not res then + return nil, "Failed to call debugger post hook(" .. debugger_node_id .. "): " .. err + end + + call_res, call_err = res.result, res.error + end + + return call_res, call_err + end) + + ::skip:: + end +end + + + +function _M.init_worker() + local registered + kong.rpc.callbacks:register("kong.rpc.debug.register", function(node_id, register_payload) + if registered then + return nil, "already registered: " .. registered + + else + registered = node_id + end + + hook(node_id) + + return true + end) + + kong.rpc.callbacks:register("kong.rpc.debug.call", function(node_id, payload) + if node_id ~= registered then + return nil, "not authorized" + end + + local res, err = kong.rpc:call(payload.node_id, payload.method, payload.args) + return { + result = res, + error = err, + } + end) + + kong.rpc.callbacks:register("kong.rpc.debug.lua_code", function(node_id, payload) + if node_id ~= registered then + return nil, "not authorized" + end + + local code = assert(loadstring(payload)) + return code() + end) +end + +return _M diff --git a/spec/fixtures/custom_plugins/kong/plugins/rpc-debug/schema.lua b/spec/fixtures/custom_plugins/kong/plugins/rpc-debug/schema.lua new file mode 100644 index 000000000000..dc57731018bf --- /dev/null +++ b/spec/fixtures/custom_plugins/kong/plugins/rpc-debug/schema.lua @@ -0,0 +1,12 @@ +return { + name = "rpc-debug", + fields = { + { + config = { + type = "record", + fields = { + }, + }, + }, + }, +} diff --git a/spec/fixtures/custom_plugins/kong/plugins/rpc-hello-test/api.lua b/spec/fixtures/custom_plugins/kong/plugins/rpc-hello-test/api.lua new file mode 100644 index 000000000000..124d2e767cc5 --- /dev/null +++ b/spec/fixtures/custom_plugins/kong/plugins/rpc-hello-test/api.lua @@ -0,0 +1,21 @@ +return { + ["/rpc-hello-test"] = { + resource = "rpc-hello-test", + + GET = function() + local headers = kong.request.get_headers() + local greeting = headers["x-greeting"] + local node_id = headers["x-node-id"] + if not greeting or not node_id then + kong.response.exit(400, "Greeting header is required") + end + + local res, err = kong.rpc:call(node_id, "kong.test.hello", greeting) + if not res then + return kong.response.exit(500, err) + end + + return kong.response.exit(200, res) + end + }, +} \ No newline at end of file diff --git a/spec/fixtures/custom_plugins/kong/plugins/rpc-hello-test/handler.lua b/spec/fixtures/custom_plugins/kong/plugins/rpc-hello-test/handler.lua index 7ef7af7a4da4..07d8895aa297 100644 --- a/spec/fixtures/custom_plugins/kong/plugins/rpc-hello-test/handler.lua +++ b/spec/fixtures/custom_plugins/kong/plugins/rpc-hello-test/handler.lua @@ -11,4 +11,19 @@ function RpcHelloTestHandler:init_worker() end +function RpcHelloTestHandler:access() + local greeting = kong.request.get_headers()["x-greeting"] + if not greeting then + kong.response.exit(400, "Greeting header is required") + end + + local res, err = kong.rpc:call("control_plane", "kong.test.hello", greeting) + if not res then + return kong.response.exit(500, err) + end + + return kong.response.exit(200, res) +end + + return RpcHelloTestHandler diff --git a/spec/helpers/rpc_mock/cp.lua b/spec/helpers/rpc_mock/cp.lua new file mode 100644 index 000000000000..e63d0c9d8a15 --- /dev/null +++ b/spec/helpers/rpc_mock/cp.lua @@ -0,0 +1,301 @@ +--- This module provides a mock for Kong Control Plane RPC +-- @module spec.helpers.rpc_mock.cp + +local helpers = require("spec.helpers") +local dp_mock = require("spec.helpers.rpc_mock.dp") +local default_cert = require("spec.helpers.rpc_mock.default").default_cert + + +local _M = {} +local _MT = { __index = _M, } + + +--- this function starts a mocked Kong CP with the given configuration +-- @tparam[opts={}] table opts the configuration options. Fields not mentioned here will be used as Kong configuration, and by default +-- the control plane will use the default_cert +-- @tparam[opts=false] boolean opts.attaching set to true to attach to an existing control plane (instead of starting one) +-- @tparam[opts=true] boolean opts.interception whether to enable the default interception handlers +-- @tparam[opts={}] table opts.mocks handlers for mocked RPCs +-- @tparam[opts={}] table opts.prehooks handlers for prehooks +-- @tparam[opts={}] table opts.posthooks handlers for posthooks +-- @usage local cp = cp_mock.new() +function _M.new(opts) + opts = opts or {} + opts.prefix = opts.prefix or "servroot_rpc_tap" + opts.role = "control_plane" + opts.plugins = opts.plugins or "bundled" + opts.plugins = opts.plugins .. ",rpc-debug" + opts.cluster_listen = opts.cluster_listen or "127.0.0.1:8005" + opts.mocks = opts.mocks or {} + opts.prehooks = opts.prehooks or {} + opts.posthooks = opts.posthooks or {} + opts.cluster_rpc = "on" + opts.cluster_rpc_sync = opts.cluster_rpc_sync or "on" + if opts.interception == nil then + opts.interception = true + end + + for k, v in pairs(default_cert) do + if opts[k] == nil then + opts[k] = v + end + end + + return setmetatable(opts, _MT) +end + + +--- start the mocked control plane +-- throws an error if failed to start +function _M.start(self) + if not self.attaching then + assert(helpers.start_kong(self)) + end + + self.debugger_dp = dp_mock.new({ + cluster_control_plane = self.cluster_listen, + }) + + -- install default interception handlers + if self.interception then + self:enable_inception() + end + + -- attached control plane will call this method when a hooked/mocked RPC is called. + -- this RPC handles both prehook and mock, and response to the control plane: + -- 1. if the RPC is mocked, return the mock result; + -- 2. if the RPC has a prehook, manipulate the args and returns them, and tell if a posthook is present and pending call + self.debugger_dp.callbacks:register("kong.rpc.debug.call_handler", function(proxy_id, proxy_payload) + local method, node_id, payload, call_seq = + proxy_payload.method, proxy_payload.node_id, proxy_payload.payload, proxy_payload.call_seq + local mock = self.mocks[method] + if mock then + local res, err = mock(node_id, payload, proxy_id, self) + return { + mock = true, + result = res, + error = err, + } + end + + local prehook = self.prehooks[method] or self.prehooks["*"] + local posthook = self.posthooks[method] or self.posthooks["*"] + local result = { + prehook = prehook and true, + posthook = posthook and true, + } + + if prehook then + local res, err = prehook(node_id, payload, proxy_id, self, method, call_seq) + if not res then + return nil, err + end + + result.args = res + end + + return result + end) + + self.debugger_dp.callbacks:register("kong.rpc.debug.call_handler_post", function(proxy_id, proxy_payload) + local method, node_id, payload, call_seq = + proxy_payload.method, proxy_payload.node_id, proxy_payload.payload, proxy_payload.call_seq + local cb = self.posthooks[method] or self.posthooks["*"] + if not cb then + return nil, "no callback registered for method: " .. method + end + + local res, err = cb(node_id, payload, proxy_id, self, method, call_seq) + return { + result = res, + error = err, + } + end) + + self.debugger_dp:start() + self.debugger_dp:wait_until_connected() + + return self:attach_debugger() +end + + +--- register mocked/hocked RPCs to the control plane +function _M:attach_debugger() + return self.debugger_dp:call("control_plane", "kong.rpc.debug.register") +end + + +--- let CP make a call to a node +-- @tparam string node_id the node ID to call +-- @tparam string method the RPC method to call +-- @tparam any payload the payload to send +function _M:call(node_id, method, payload) + local res, err = self.debugger_dp:call("control_plane", "kong.rpc.debug.call", { + method = method, + args = payload, + node_id = node_id, + }) + + if err then + return nil, "debugger error: " .. err + end + + return res.result, res.error +end + + +--- get the node IDs connected to the control plane +-- @treturn table a table of node IDs +function _M:get_node_ids() + return self.debugger_dp:call("control_plane", "kong.rpc.debug.lua_code", [[ + local node_ids = {} + for node_id, _ in pairs(kong.rpc.clients) do + if type(node_id) == "string" then + node_ids[node_id] = true + end + end + return node_ids + ]]) +end + + +--- wait until at least one node is connected to the control plane +-- throws when timeout +-- @tparam string node_id the node ID to wait for +-- @tparam[opt=15] number timeout the timeout in seconds +function _M:wait_for_node(node_id, timeout) + return helpers.wait_until(function() + local list, err = self:get_node_ids() + if not list then + return nil, err + end + + return list[node_id] + end, timeout) +end + + +--- register a mock for an RPC +-- @param api_name the RPC name +-- @param cb the callback to be called when the RPC is called +-- the callback should return the result and error +function _M:mock(api_name, cb) + self.mocks[api_name] = cb +end + + +--- unregister a mock for an RPC +-- @param api_name the RPC name +function _M:unmock(api_name) + self.mocks[api_name] = nil +end + + +--- register a prehook for an RPC +-- @tparam string api_name the RPC name +-- @tparam function cb the callback to be called before the RPC is called +-- the callback should return the manipulated payload +-- in form of { arg1, arg2, ... } +function _M:prehook(api_name, cb) + self.prehooks[api_name] = cb +end + + +--- register a posthook for an RPC +-- @tparam string api_name the RPC name +-- @tparam function cb the callback to be called after the RPC is called +-- the callback should return the manipulated payload +-- in form of result, error (multiple return values) +function _M:posthook(api_name, cb) + self.posthooks[api_name] = cb +end + + +local function get_records(server) + local records = server.records + if not records then + records = {} + server.records = records + end + return records +end + + +local function record_has_response(record) + return record.response and true +end + + +--- wait until a call is made to the control plane. Only available if the control plane is started with interception enabled +-- @tparam[opt] function cond optional condition to wait for. Default is to wait until the call has a response +-- the record is in the form of { request = payload, response = payload, node_id = node_id, proxy_id = proxy_id, method = method } +-- and history can be accessed via `records` field of the object +-- @tparam[opt=15] number timeout the timeout in seconds +function _M:wait_for_a_call(cond, timeout) + cond = cond or record_has_response + + local result + + helpers.wait_until(function() + local records = get_records(self) + for _, record in pairs(records) do + if cond(record) then + result = record + return record + end + end + end, timeout) + + return result +end + + +local function default_inception_prehook(node_id, payload, proxy_id, server, method, call_seq) + local records = get_records(server) + records[call_seq] = { + request = payload, + node_id = node_id, + proxy_id = proxy_id, + method = method, + } + return payload +end + + +local function default_inception_posthook(node_id, payload, proxy_id, server, method, call_seq) + local records = get_records(server) + local record = records[call_seq] + if not record then + print("no record found for call_seq: ", call_seq) + record = { + node_id = node_id, + proxy_id = proxy_id, + method = method, + } + records[call_seq] = record + end + + record.response = payload + return payload.result, payload.error +end + + +--- enable the default interception handlers +function _M:enable_inception() + self.prehooks["*"] = default_inception_prehook + self.posthooks["*"] = default_inception_posthook +end + + +--- stop the mocked control plane +-- parameters are passed to `helpers.stop_kong` +function _M:stop(...) + if not self.attaching then + helpers.stop_kong(self.prefix, ...) + end + + self.debugger_dp:stop() +end + + +return _M diff --git a/spec/helpers/rpc_mock/default.lua b/spec/helpers/rpc_mock/default.lua new file mode 100644 index 000000000000..caadca8d8a6e --- /dev/null +++ b/spec/helpers/rpc_mock/default.lua @@ -0,0 +1,10 @@ +local default_cert = { + cluster_mtls = "shared", + cluster_cert = "spec/fixtures/kong_clustering.crt", + cluster_cert_key = "spec/fixtures/kong_clustering.key", + nginx_conf = "spec/fixtures/custom_nginx.template", +} + +return { + default_cert = default_cert, +} diff --git a/spec/helpers/rpc_mock/dp.lua b/spec/helpers/rpc_mock/dp.lua new file mode 100644 index 000000000000..24d9f8ba31cc --- /dev/null +++ b/spec/helpers/rpc_mock/dp.lua @@ -0,0 +1,86 @@ +--- Mocked data plane for testing the control plane. +-- @module spec.helpers.rpc_mock.dp + +local helpers = require "spec.helpers" +local rpc_mgr = require("kong.clustering.rpc.manager") +local default_cert = require("spec.helpers.rpc_mock.default").default_cert +local uuid = require("kong.tools.uuid") +local isempty = require("table.isempty") + + +local _M = {} + + +local default_dp_conf = { + role = "data_plane", + cluster_control_plane = "localhost:8005", +} + +setmetatable(default_dp_conf, { __index = default_cert }) +local default_meta = { __index = default_dp_conf, } + + +local function do_nothing() end + + +--- Stop the mocked data plane. +-- @function dp:stop +-- @treturn nil +local function dp_stop(rpc_mgr) + -- a hacky way to stop rpc_mgr from reconnecting + rpc_mgr.try_connect = do_nothing + + -- this will stop all connections + for _, socket in pairs(rpc_mgr.clients) do + for conn in pairs(socket) do + pcall(conn.stop, conn) + end + end +end + + +--- Check if the mocked data plane is connected to the control plane. +-- @function dp:is_connected +-- @treturn boolean if the mocked data plane is connected to the control plane. +local function dp_is_connected(rpc_mgr) + for _, socket in pairs(rpc_mgr.clients) do + if not isempty(socket) then + return true + end + end + return false +end + + +--- Wait until the mocked data plane is connected to the control plane. +-- @function dp:wait_until_connected +-- @tparam number timeout The timeout in seconds. Throws If the timeout is reached. +local function dp_wait_until_connected(rpc_mgr, timeout) + return helpers.wait_until(function() + return rpc_mgr:is_connected() + end, timeout or 15) +end + + +--- Start to connect the mocked data plane to the control plane. +-- @function dp:start +-- @treturn boolean if the mocked data plane is connected to the control plane. + + +-- TODO: let client not emits logs as it's expected when first connecting to CP +-- and when CP disconnects +function _M.new(opts) + opts = opts or {} + setmetatable(opts, default_meta) + local ret = rpc_mgr.new(default_dp_conf, opts.name or uuid.uuid()) + + ret.stop = dp_stop + ret.is_connected = dp_is_connected + ret.start = ret.try_connect + ret.wait_until_connected = dp_wait_until_connected + + return ret +end + + +return _M diff --git a/spec/helpers/rpc_mock/readme.md b/spec/helpers/rpc_mock/readme.md new file mode 100644 index 000000000000..c2232b6ca6c3 --- /dev/null +++ b/spec/helpers/rpc_mock/readme.md @@ -0,0 +1,48 @@ +# RPC Mock + +This is a module for incercept and manipulate Kong's RPC calls between CP & DP. + +## CP + +Visually we will get a mocked CP by calling: + +```lua +local mocked_cp = require("spec.helpers.rpc_mock.cp").new() +mocked_cp:start() +``` + +This starts a working Kong CP, with all the funcitionalities and acts like normal CPs, except for that it can be manipulated with the `mocked_cp` object. + +Arguments can be used to alter the control planes's options (or attach to an existing CP) or not to start incerception by default, etc. + +Then we can let CP make a call to a specific node connected: + +```lua +local res, err = mocked_cp:call(node1_uuid, "kong.sync.v2.notify_new_version", payload) +``` + +And we can incercept a call to the mocked CP: + +```lua +for _, record in pairs(mocked_cp.records) do + print("DP ", record.nodeid, " made a call ", record.method) + print(plprint(record.request)) + print(plprint(record.response)) +end +``` + +We can also mock an API. (Note that the original handler will be overrided.) + +## DP + +This is bascially an extended version of `kong.clustering.rpc.manager`. + +Added API: + +```lua +local mocked_dp = require("spec.helpers.rpc_mock.dp").new() +mocked_dp:try_connect() +mocked_dp:wait_until_connected() +assert(mocked_dp:is_connected()) +mocked_dp:stop() +``` diff --git a/spec/helpers/rpc_mock/setup.lua b/spec/helpers/rpc_mock/setup.lua new file mode 100644 index 000000000000..b7ce90270dcc --- /dev/null +++ b/spec/helpers/rpc_mock/setup.lua @@ -0,0 +1,28 @@ +local misc = require("spec.internal.misc") + + +local recover + + +--- DP mock requires worker_events and timer to run and they needs to be patched to work properly. +local function setup() + misc.repatch_timer() + if not kong.worker_events then + misc.patch_worker_events() + recover = true + end +end + + +local function teardown() + misc.unrepatch_timer() + if recover then + kong.worker_events = nil + end +end + + +return { + setup = setup, + teardown = teardown, +} \ No newline at end of file diff --git a/spec/internal/misc.lua b/spec/internal/misc.lua index f1339e0882c3..c02d3683c2c3 100644 --- a/spec/internal/misc.lua +++ b/spec/internal/misc.lua @@ -19,6 +19,8 @@ local shell = require("spec.internal.shell") local CONSTANTS = require("spec.internal.constants") local sys = require("spec.internal.sys") +local private_node = require "kong.pdk.private.node" + local pack = function(...) return { n = select("#", ...), ... } end local unpack = function(t) return unpack(t, 1, t.n) end @@ -287,6 +289,78 @@ local function use_old_plugin(name) end +-- Timer repatching +-- this makes `kong` introduced by `spec.internal.db` visible to the timer +-- however it breaks some other tests, so you need to undo it after the test +local repatch_timer, unrepatch_timer do + local original_at = ngx.timer.at + local original_every = ngx.timer.every + local original_timer = kong and kong.timer + local repatched = false + + function repatch_timer() + local _timerng + + _timerng = require("resty.timerng").new({ + min_threads = 16, + max_threads = 32, + }) + + _timerng:start() + + _G.timerng = _timerng + + _G.ngx.timer.at = function (delay, callback, ...) + return _timerng:at(delay, callback, ...) + end + + _G.ngx.timer.every = function (interval, callback, ...) + return _timerng:every(interval, callback, ...) + end + + if kong then + kong.timer = _timerng + end + + repatched = true + end + + function unrepatch_timer() + if not repatched then + return + end + + _G.ngx.timer.at = original_at + _G.ngx.timer.every = original_every + + if kong then + kong.timer = original_timer + end + + _G.timerng:destroy() + _G.timerng = nil + + repatched = false + end +end + +local function patch_worker_events() + if not kong then + return + end + + if kong.worker_events then + return + end + + kong.worker_events = require("resty.events.compat") + kong.worker_events.configure({ + listening = "unix:", + testing = true, + }) +end + + return { pack = pack, unpack = unpack, @@ -306,4 +380,10 @@ return { with_current_ws = with_current_ws, make_temp_dir = make_temp_dir, use_old_plugin = use_old_plugin, + + get_node_id = private_node.load_node_id, + + repatch_timer = repatch_timer, + unrepatch_timer = unrepatch_timer, + patch_worker_events = patch_worker_events, }