Skip to content

Commit

Permalink
tests(rpc): rpc mock/hook
Browse files Browse the repository at this point in the history
  • Loading branch information
StarlightIbuki committed Dec 23, 2024
1 parent 51cb5f1 commit 948243c
Show file tree
Hide file tree
Showing 9 changed files with 594 additions and 0 deletions.
3 changes: 3 additions & 0 deletions bin/busted
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ local cert_path do
busted_cert_content = busted_cert_content .. "\n" .. pl_file.read(system_cert_path)
end

local cluster_cert_content = assert(pl_file.read("spec/fixtures/kong_clustering.crt"))
busted_cert_content = busted_cert_content .. "\n" .. cluster_cert_content

pl_file.write(busted_cert_file, busted_cert_content)
cert_path = busted_cert_file
end
Expand Down
132 changes: 132 additions & 0 deletions spec/02-integration/01-helpers/05-rpc-mock_spec.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
local helpers = require("spec.helpers")
local server = require("spec.helpers.rpc_mock.server")
local client = require("spec.helpers.rpc_mock.client")
local get_node_id = helpers.get_node_id

local function trigger_change()
-- the initial sync is flaky. let's trigger a sync by creating a service
local admin_client = helpers.admin_client()
assert.res_status(201, admin_client:send {
method = "POST",
path = "/services/",
body = {
url = "http://example.com",
},
headers = {
["Content-Type"] = "application/json",
},
})
end

describe("rpc mock/hook", function()
describe("server side", function()
local server_mock

lazy_setup(function()
helpers.get_db_utils()

server_mock = server.new()
assert(server_mock:start())

assert(helpers.start_kong({
database = "off",
role = "data_plane",
cluster_mtls = "shared",
cluster_cert = "spec/fixtures/kong_clustering.crt",
cluster_cert_key = "spec/fixtures/kong_clustering.key",
nginx_conf = "spec/fixtures/custom_nginx.template",
cluster_rpc = "on",
cluster_rpc_sync = "on",
log_level = "debug",
cluster_control_plane = "127.0.0.1:8005",
}))
end)

lazy_teardown(function()
server_mock:stop(true)
helpers.stop_kong(nil, true)
end)

it("recording", function()
trigger_change()

local record = server_mock:wait_for_call()
assert.is_table(record.response.result.default.deltas)
end)

it("mock", function()
local client_version
server_mock:mock("kong.sync.v2.get_delta", function(node_id, payload)
client_version = payload.default.version
return { default = { version = 100, deltas = {} } }
end)
server_mock:attach_debugger()

local dp_id = get_node_id("servroot")

server_mock:wait_for_node(dp_id)

assert(server_mock:call(dp_id, "kong.sync.v2.notify_new_version", { default = { new_version = 100, } }))

-- the mock should have been called
helpers.wait_until(function()
return client_version
end, 20)
end)
end)

describe("client side", function()
local client_mock
local called = false

lazy_setup(function()
helpers.get_db_utils()

client_mock = assert(client.new())
assert(helpers.start_kong({
role = "control_plane",
cluster_mtls = "shared",
cluster_cert = "spec/fixtures/kong_clustering.crt",
cluster_cert_key = "spec/fixtures/kong_clustering.key",
nginx_conf = "spec/fixtures/custom_nginx.template",
cluster_rpc = "on",
cluster_rpc_sync = "on",
}))

client_mock.callbacks:register("kong.sync.v2.notify_new_version", function(node_id, payload)
called = true
end)

client_mock:start()
client_mock:wait_until_connected()
end)

lazy_teardown(function()
helpers.stop_kong(nil, true)
client_mock:stop()
end)

it("client->CP", function()
local res, err = client_mock:call("control_plane", "kong.sync.v2.get_delta", { default = { version = 0,},})
assert.is_nil(err)
assert.is_table(res and res.default and res.default.deltas)

local res, err = client_mock:call("control_plane", "kong.sync.v2.unknown", { default = { },})
assert.is_string(err)
assert.is_nil(res)
end)

it("CP->client", function()
-- this registers the data plane node
local res, err = client_mock:call("control_plane", "kong.sync.v2.get_delta", { default = { version = 0,},})
assert.is_nil(err)
assert.is_table(res and res.default and res.default.deltas)

trigger_change()

helpers.wait_until(function()
return called
end, 20)
end)
end)
end)
76 changes: 76 additions & 0 deletions spec/fixtures/custom_plugins/kong/plugins/rpc-debug/handler.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
local kong_meta = require("kong.meta")

local _M = {
PRIORITY = 1000,
VERSION = kong_meta.version,
}

local original_callbacks = {}
local inc_id = 0

function _M.init_worker()
kong.rpc.callbacks:register("kong.rpc.debug.register", function(node_id, register_payload)
local proxy_apis = register_payload.proxy_apis

for _, proxy_api in ipairs(proxy_apis) do
-- unregister and save the original callback
local original_cb
if not original_callbacks[proxy_api] then
original_callbacks[proxy_api] = kong.rpc.callbacks.callbacks[proxy_api]
end
original_cb = original_callbacks[proxy_api]
kong.rpc.callbacks.callbacks[proxy_api] = nil

kong.log.info("hooking registering RPC proxy API: ", proxy_api)
kong.rpc.callbacks:register(proxy_api, function(client_id, payload)
local id = inc_id
inc_id = inc_id + 1
kong.log.info("hooked proxy API ", proxy_api, " called by node: ", client_id)
kong.log.info("forwarding to node: ", node_id)
local res, err = kong.rpc:call(node_id, "kong.rpc.debug.mock", { call_id = id, method = proxy_api, node_id = client_id, payload = payload })
if not res then
return nil, "Failed to proxy(" .. node_id .. "): " .. err
end

if res.error then
return nil, res.error
end

if res.prehook or res.posthook then
if res.prehook then
payload = res.args
end

local origin_res, origin_err = original_cb(client_id, payload)

if res.posthook then
res, err = kong.rpc:call(node_id, "kong.rpc.debug.posthook", { call_id = id, method = proxy_api, node_id = client_id, payload = {result = origin_res, error = origin_err} })
if not res then
return nil, "Failed to call post hook(" .. node_id .. "): " .. err
end

return res.result, res.error
end
elseif res.mock then
return res.result, res.error
end

return nil, "invalid response from proxy"
end)
end

return true
end)

kong.rpc.callbacks:register("kong.rpc.debug.call", function(node_id, payload)
local res, err = kong.rpc:call(payload.node_id, payload.method, payload.args)
return res, err
end)

kong.rpc.callbacks:register("kong.rpc.debug.lua_code", function(node_id, payload)
local code = assert(loadstring(payload))
return code()
end)
end

return _M
12 changes: 12 additions & 0 deletions spec/fixtures/custom_plugins/kong/plugins/rpc-debug/schema.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
return {
name = "rpc-debug",
fields = {
{
config = {
type = "record",
fields = {
},
},
},
},
}
26 changes: 26 additions & 0 deletions spec/helpers.lua
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,30 @@ local server = reload_module("spec.internal.server")
local client = reload_module("spec.internal.client")
local wait = reload_module("spec.internal.wait")

-- redo the patches to apply the kong global patches
local _timerng

_timerng = require("resty.timerng").new({
min_threads = 16,
max_threads = 32,
})

_timerng:start()

_G.timerng = _timerng

_G.ngx.timer.at = function (delay, callback, ...)
return _timerng:at(delay, callback, ...)
end

_G.ngx.timer.every = function (interval, callback, ...)
return _timerng:every(interval, callback, ...)
end

if not kong.timer then
kong.timer = _timerng
end


----------------
-- Variables/constants
Expand Down Expand Up @@ -226,4 +250,6 @@ local wait = reload_module("spec.internal.wait")
make_temp_dir = misc.make_temp_dir,

build_go_plugins = cmd.build_go_plugins,

get_node_id = misc.get_node_id,
}
68 changes: 68 additions & 0 deletions spec/helpers/rpc_mock/client.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
-- by importing helpers, we ensure the kong PDK module is initialized
local helpers = require "spec.helpers"
local rpc_mgr = require("kong.clustering.rpc.manager")
local default_cert = require("spec.helpers.rpc_mock.default").default_cert
local uuid = require "kong.tools.uuid"


local _M = {}


local default_dp_conf = {
role = "data_plane",
cluster_control_plane = "localhost:8005",
}

setmetatable(default_dp_conf, { __index = default_cert })
local default_meta = { __index = default_dp_conf, }


local function do_nothing() end


local function client_stop(rpc_mgr)
-- a hacky way to stop rpc_mgr from reconnecting
rpc_mgr.try_connect = do_nothing

-- this will stop all connections
for _, socket in pairs(rpc_mgr.clients) do
for conn in pairs(socket) do
pcall(conn.stop, conn)
end
end
end


local function client_is_connected(rpc_mgr)
for _, socket in pairs(rpc_mgr.clients) do
if next(socket) then
return true
end
end
return false
end


local function client_wait_until_connected(rpc_mgr, timeout)
return helpers.wait_until(function()
return rpc_mgr:is_connected()
end, timeout or 15)
end


-- TODO: let client not emits logs as it's expected to fail to connect for the first few seconds
function _M.new(opts)
opts = opts or {}
setmetatable(opts, default_meta)
local ret = rpc_mgr.new(default_dp_conf, opts.name or uuid.uuid())

ret.stop = client_stop
ret.is_connected = client_is_connected
ret.start = ret.try_connect
ret.wait_until_connected = client_wait_until_connected

return ret
end


return _M
10 changes: 10 additions & 0 deletions spec/helpers/rpc_mock/default.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
local default_cert = {
cluster_mtls = "shared",
cluster_cert = "spec/fixtures/kong_clustering.crt",
cluster_cert_key = "spec/fixtures/kong_clustering.key",
nginx_conf = "spec/fixtures/custom_nginx.template",
}

return {
default_cert = default_cert,
}
Loading

0 comments on commit 948243c

Please sign in to comment.