Skip to content

Commit

Permalink
tests(rpc): rpc mock/hook
Browse files Browse the repository at this point in the history
  • Loading branch information
StarlightIbuki committed Dec 19, 2024
1 parent f7f7a23 commit 5b3a82a
Show file tree
Hide file tree
Showing 9 changed files with 555 additions and 0 deletions.
3 changes: 3 additions & 0 deletions bin/busted
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ local cert_path do
busted_cert_content = busted_cert_content .. "\n" .. pl_file.read(system_cert_path)
end

local cluster_cert_content = assert(pl_file.read("spec/fixtures/kong_clustering.crt"))
busted_cert_content = busted_cert_content .. "\n" .. cluster_cert_content

pl_file.write(busted_cert_file, busted_cert_content)
cert_path = busted_cert_file
end
Expand Down
90 changes: 90 additions & 0 deletions spec/01-unit/19-hybrid/04-rpc_spec.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
-- by importing helpers, we initialize the kong PDK module
local helpers = require "spec.helpers"
local server = require("spec.helpers.rpc_mock.server")
local client = require("spec.helpers.rpc_mock.client")

describe("rpc v2", function()
describe("full sync pagination", function()
describe("server side", function()
local server_mock
lazy_setup(function()
helpers.get_db_utils()

server_mock = server.new()
assert(server_mock:start())

assert(helpers.start_kong({
database = "off",
role = "data_plane",
cluster_mtls = "shared",
cluster_cert = "spec/fixtures/kong_clustering.crt",
cluster_cert_key = "spec/fixtures/kong_clustering.key",
nginx_conf = "spec/fixtures/custom_nginx.template",
cluster_rpc = "on",
cluster_rpc_sync = "on",
log_level = "debug",
cluster_control_plane = "127.0.0.1:8005",
}))
end)
lazy_teardown(function()
server_mock:stop(true)
helpers.stop_kong(nil, true)
end)

it("works", function()
-- the initial sync is flaky. let's trigger a sync by creating a service
local admin_client = helpers.admin_client()
assert.res_status(201, admin_client:send {
method = "POST",
path = "/services/",
body = {
name = "mockbin",
url = "http://mockbin.org",
},
headers = {
["Content-Type"] = "application/json",
},
})

helpers.wait_until(function()
return server_mock.records and next(server_mock.records)
end,20)
end)
end)

describe("client side", function()
local client_mock
lazy_setup(function()
helpers.get_db_utils()

client_mock = assert(client.new())
assert(helpers.start_kong({
role = "control_plane",
cluster_mtls = "shared",
cluster_cert = "spec/fixtures/kong_clustering.crt",
cluster_cert_key = "spec/fixtures/kong_clustering.key",
nginx_conf = "spec/fixtures/custom_nginx.template",
cluster_rpc = "on",
cluster_rpc_sync = "on",
}))
client_mock:start()
end)
lazy_teardown(function()
helpers.stop_kong(nil, true)
client_mock:stop()
end)

it("works", function()
client_mock:wait_until_connected()

local res, err = client_mock:call("control_plane", "kong.sync.v2.get_delta", { default = { version = 0,},})
assert.is_nil(err)
assert.is_table(res and res.default and res.default.deltas)

local res, err = client_mock:call("control_plane", "kong.sync.v2.unknown", { default = { },})
assert.is_string(err)
assert.is_nil(res)
end)
end)
end)
end)
94 changes: 94 additions & 0 deletions spec/02-integration/01-helpers/05-rpc-mock_spec.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
local helpers = require "spec.helpers"
local server = require("spec.helpers.rpc_mock.server")
local client = require("spec.helpers.rpc_mock.client")

local function trigger_change()
-- the initial sync is flaky. let's trigger a sync by creating a service
local admin_client = helpers.admin_client()
assert.res_status(201, admin_client:send {
method = "POST",
path = "/services/",
body = {
name = "mockbin",
url = "http://mockbin.org",
},
headers = {
["Content-Type"] = "application/json",
},
})
end

describe("rpc mock/hook", function()
describe("server side", function()
local server_mock
lazy_setup(function()
helpers.get_db_utils()

server_mock = server.new()
assert(server_mock:start())

assert(helpers.start_kong({
database = "off",
role = "data_plane",
cluster_mtls = "shared",
cluster_cert = "spec/fixtures/kong_clustering.crt",
cluster_cert_key = "spec/fixtures/kong_clustering.key",
nginx_conf = "spec/fixtures/custom_nginx.template",
cluster_rpc = "on",
cluster_rpc_sync = "on",
log_level = "debug",
cluster_control_plane = "127.0.0.1:8005",
}))
end)
lazy_teardown(function()
server_mock:stop(true)
helpers.stop_kong(nil, true)
end)

it("recording", function()
trigger_change()
helpers.wait_until(function()
for _, record in pairs(server_mock.records or {}) do
if record.response then
return true
end
end
end,20)
end)
end)

describe("client side", function()
local client_mock
lazy_setup(function()
helpers.get_db_utils()

client_mock = assert(client.new())
assert(helpers.start_kong({
role = "control_plane",
cluster_mtls = "shared",
cluster_cert = "spec/fixtures/kong_clustering.crt",
cluster_cert_key = "spec/fixtures/kong_clustering.key",
nginx_conf = "spec/fixtures/custom_nginx.template",
cluster_rpc = "on",
cluster_rpc_sync = "on",
}))
client_mock:start()
end)
lazy_teardown(function()
helpers.stop_kong(nil, true)
client_mock:stop()
end)

it("works", function()
local res, err = client_mock:call("control_plane", "kong.sync.v2.get_delta", { default = { version = 0,},})
assert.is_nil(err)
assert.is_table(res and res.default and res.default.deltas)

local res, err = client_mock:call("control_plane", "kong.sync.v2.unknown", { default = { },})
assert.is_string(err)
assert.is_nil(res)
end)

pending("callbacks")
end)
end)
66 changes: 66 additions & 0 deletions spec/fixtures/custom_plugins/kong/plugins/rpc-debug/handler.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
local kong_meta = require("kong.meta")

local _M = {
PRIORITY = 1000,
VERSION = kong_meta.version,
}

local original_callbacks = {}
local inc_id = 0

function _M.init_worker()
kong.rpc.callbacks:register("kong.rpc.debug.register", function(node_id, register_payload)
local proxy_apis = register_payload.proxy_apis

for _, proxy_api in ipairs(proxy_apis) do
-- unregister and save the original callback
local original_cb
if not original_callbacks[proxy_api] then
original_callbacks[proxy_api] = kong.rpc.callbacks.callbacks[proxy_api]
end
original_cb = original_callbacks[proxy_api]
kong.rpc.callbacks.callbacks[proxy_api] = nil

kong.log.info("hooking registering RPC proxy API: ", proxy_api)
kong.rpc.callbacks:register(proxy_api, function(client_id, payload)
local id = inc_id
inc_id = inc_id + 1
kong.log.info("hooked proxy API ", proxy_api, " called by node: ", client_id)
kong.log.info("forwarding to node: ", node_id)
local res, err = kong.rpc:call(node_id, "kong.rpc.debug.mock", { call_id = id, method = proxy_api, node_id = client_id, payload = payload })
if not res then
return nil, "Failed to proxy(" .. node_id .. "): " .. err
end

if res.error then
return nil, res.error
end

if res.prehook or res.posthook then
if res.prehook then
payload = res.args
end

local origin_res, origin_err = original_cb(client_id, payload)

if res.posthook then
res, err = kong.rpc:call(node_id, "kong.rpc.debug.posthook", { call_id = id, method = proxy_api, node_id = client_id, payload = {result = origin_res, error = origin_err} })
if not res then
return nil, "Failed to call post hook(" .. node_id .. "): " .. err
end

return res.result, res.error
end
elseif res.mock then
return res.result, res.error
end

return nil, "invalid response from proxy"
end)
end

return true
end)
end

return _M
12 changes: 12 additions & 0 deletions spec/fixtures/custom_plugins/kong/plugins/rpc-debug/schema.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
return {
name = "rpc-debug",
fields = {
{
config = {
type = "record",
fields = {
},
},
},
},
}
24 changes: 24 additions & 0 deletions spec/helpers.lua
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,30 @@ local server = reload_module("spec.internal.server")
local client = reload_module("spec.internal.client")
local wait = reload_module("spec.internal.wait")

-- redo the patches to apply the kong global patches
local _timerng

_timerng = require("resty.timerng").new({
min_threads = 16,
max_threads = 32,
})

_timerng:start()

_G.timerng = _timerng

_G.ngx.timer.at = function (delay, callback, ...)
return _timerng:at(delay, callback, ...)
end

_G.ngx.timer.every = function (interval, callback, ...)
return _timerng:every(interval, callback, ...)
end

if not kong.timer then
kong.timer = _timerng
end


----------------
-- Variables/constants
Expand Down
57 changes: 57 additions & 0 deletions spec/helpers/rpc_mock/client.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
-- by importing helpers, we ensure the kong PDK module is initialized
local helpers = require "spec.helpers"
local rpc_mgr = require("kong.clustering.rpc.manager")
local default_cert = require("spec.helpers.rpc_mock.default").default_cert

local _M = {}

local default_dp_conf = {
role = "data_plane",
cluster_control_plane = "localhost:8005",
}

setmetatable(default_dp_conf, { __index = default_cert })
local default_meta = { __index = default_dp_conf, }

local function do_nothing() end

local function client_stop(rpc_mgr)
-- a hacky way to stop rpc_mgr from reconnecting
rpc_mgr.try_connect = do_nothing

-- this will stop all connections
for _, socket in pairs(rpc_mgr.clients) do
for conn in pairs(socket) do
pcall(conn.stop, conn)
end
end
end

local function client_is_connected(rpc_mgr)
for _, socket in pairs(rpc_mgr.clients) do
if next(socket) then
return true
end
end
return false
end

local function client_wait_until_connected(rpc_mgr, timeout)
return helpers.wait_until(function()
return rpc_mgr:is_connected()
end, timeout or 15)
end

-- TODO: let client not emits logs as it's expected to fail to connect for the first few seconds
function _M.new(opts)
opts = opts or {}
setmetatable(opts, default_meta)
local ret = rpc_mgr.new(default_dp_conf, opts.name or "dp")
ret.stop = client_stop
ret.is_connected = client_is_connected
ret.start = ret.try_connect
ret.wait_until_connected = client_wait_until_connected
return ret
end

return _M
10 changes: 10 additions & 0 deletions spec/helpers/rpc_mock/default.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
local default_cert = {
cluster_mtls = "shared",
cluster_cert = "spec/fixtures/kong_clustering.crt",
cluster_cert_key = "spec/fixtures/kong_clustering.key",
nginx_conf = "spec/fixtures/custom_nginx.template",
}

return {
default_cert = default_cert,
}
Loading

0 comments on commit 5b3a82a

Please sign in to comment.