Skip to content

Commit

Permalink
tests(rpc): rpc mock/hook
Browse files Browse the repository at this point in the history
  • Loading branch information
StarlightIbuki committed Dec 18, 2024
1 parent be7e356 commit c79e2be
Show file tree
Hide file tree
Showing 7 changed files with 412 additions and 0 deletions.
68 changes: 68 additions & 0 deletions spec/01-unit/19-hybrid/04-rpc_spec.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
-- by importing helpers, we initialize the kong PDK module
local helpers = require "spec.helpers"
local server = require("spec.helpers.rpc_mock.server")
local client = require("spec.helpers.rpc_mock.client")

describe("rpc v2", function()
describe("full sync pagination", function()
describe("server side", function()
local server_mock
local port
lazy_setup(function()
server_mock = server.new()
assert(server_mock:start())
port = server_mock.listen

helpers.start_kong({
role = "data_plane",
cluster_cert = "spec/fixtures/kong_spec.crt",
cluster_cert_key = "spec/fixtures/kong_spec.key",
cluster_rpc = "on",
cluster_rpc_listen = "localhost:" .. port,
cluster_incremental_sync = "on",
})
end)
lazy_teardown(function()
server_mock:stop(true)

helpers.stop_kong(nil, true)
end)

it("works", function()
helpers.wait_until(function()
return server_mock.records and next(server_mock.records)
end,20)
end)
end)

describe("client side", function()
local client_mock
lazy_setup(function()
client_mock = assert(client.new())
helpers.start_kong({
role = "control_plane",
cluster_cert = "spec/fixtures/kong_spec.crt",
cluster_cert_key = "spec/fixtures/kong_spec.key",
cluster_rpc = "on",
cluster_incremental_sync = "on",
})
client_mock:start()
end)
lazy_teardown(function()
helpers.stop_kong(nil, true)
client_mock:stop()
end)

it("works", function()
client_mock:wait_until_connected()

local res, err = client_mock:call("control_plane", "kong.sync.v2.get_delta", { default = { version = 0,},})
assert.is_nil(err)
assert.is_table(res and res.default and res.default.deltas)

local res, err = client_mock:call("control_plane", "kong.sync.v2.unknown", { default = { },})
assert.is_string(err)
end)
end)
end)
end)
59 changes: 59 additions & 0 deletions spec/fixtures/custom_plugins/kong/plugins/rpc-proxy/handler.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
local rpc_mgr = require("kong.clustering.rpc.manager")
local kong_meta = require("kong.meta")

local _M = {
PRIORITY = 1000,
VERSION = kong_meta.version,
}

local original_callbacks = {}

function _M.init_worker()
kong.rpc.callbacks:register("kong.rpc.proxy.register", function(node_id, register_payload)
local proxy_apis = register_payload.proxy_apis

for _, proxy_api in ipairs(proxy_apis) do
kong.log.info("Hook registering RPC proxy API: ", proxy_api)
local original = kong.rpc.callbacks[proxy_api]
if original and not original_callbacks[proxy_api] then
original_callbacks[proxy_api] = original
end
kong.rpc.callbacks[proxy_api] = nil
kong.rpc.callbacks:register(proxy_api, function(client_id, payload)
local res, err = kong.rpc:call(node_id, "kong.rpc.proxy", { method = proxy_api, node_id = client_id, payload = payload })
if not res then
return nil, "Failed to proxy(" .. node_id .. "): " .. err
end

if res.error then
return nil, res.error
end

if res.prehook or res.posthook then
if res.prehook then
payload = res.args
end

local origin_res, origin_err = original(client_id, payload)

if res.posthook then
res, err = kong.rpc:call(node_id, "kong.rpc.proxy.posthook", { method = proxy_api, node_id = client_id, payload = {result = origin_res, error = origin_err} })
if not res then
return nil, "Failed to call post hook(" .. node_id .. "): " .. err
end

return res.result, res.error
end
elseif res.mock then
return res.result, res.error
end

return nil, "invalid response from proxy"
end)
end

return true
end)
end

return _M
12 changes: 12 additions & 0 deletions spec/fixtures/custom_plugins/kong/plugins/rpc-proxy/schema.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
return {
name = "rpc-proxy",
fields = {
{
config = {
type = "record",
fields = {
},
},
},
},
}
20 changes: 20 additions & 0 deletions spec/helpers.lua
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,26 @@ local server = reload_module("spec.internal.server")
local client = reload_module("spec.internal.client")
local wait = reload_module("spec.internal.wait")

-- redo the patches to apply the kong global patches
local _timerng

_timerng = require("resty.timerng").new({
min_threads = 16,
max_threads = 32,
})

_timerng:start()

_G.timerng = _timerng

_G.ngx.timer.at = function (delay, callback, ...)
return _timerng:at(delay, callback, ...)
end

_G.ngx.timer.every = function (interval, callback, ...)
return _timerng:every(interval, callback, ...)
end


----------------
-- Variables/constants
Expand Down
58 changes: 58 additions & 0 deletions spec/helpers/rpc_mock/client.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
-- by importing helpers, we ensure the kong PDK module is initialized
local helpers = require "spec.helpers"
local rpc_mgr = require("kong.clustering.rpc.manager")
local default_cert = require("spec.helpers.rpc_mock.default").default_cert

local _M = {}
local _MT = { __index = _M, }

local default_dp_conf = {
role = "data_plane",
cluster_control_plane = "localhost:8005",
}

setmetatable(default_dp_conf, { __index = default_cert })
local default_meta = { __index = default_dp_conf, }

local function do_nothing() end

local function client_stop(rpc_mgr)
-- a hacky way to stop rpc_mgr from reconnecting
rpc_mgr.try_connect = do_nothing

-- this will stop all connections
for _, socket in pairs(rpc_mgr.clients) do
for conn in pairs(socket) do
conn:stop()
end
end
end

local function client_is_connected(rpc_mgr)
for _, socket in pairs(rpc_mgr.clients) do
for conn in pairs(socket) do
return true
end
end
return false
end

local function client_wait_until_connected(rpc_mgr, timeout)
return helpers.wait_until(function()
return rpc_mgr:is_connected()
end, timeout or 15)
end

-- TODO: let client not emits logs as it's expected to fail to connect for the first few seconds
function _M.new(opts)
opts = opts or {}
setmetatable(opts, default_meta)
local ret = rpc_mgr.new(default_dp_conf, opts.name or "dp")
ret.stop = client_stop
ret.is_connected = client_is_connected
ret.start = ret.try_connect
ret.wait_until_connected = client_wait_until_connected
return ret
end

return _M
8 changes: 8 additions & 0 deletions spec/helpers/rpc_mock/default.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
local default_cert = {
cluster_cert = "spec/fixtures/kong_spec.crt",
cluster_cert_key = "spec/fixtures/kong_spec.key",
}

return {
default_cert = default_cert,
}
Loading

0 comments on commit c79e2be

Please sign in to comment.