Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tests(clustering/rpc): rpc mock/hook #14030

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions bin/busted
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ local cert_path do
busted_cert_content = busted_cert_content .. "\n" .. pl_file.read(system_cert_path)
end

local cluster_cert_content = assert(pl_file.read("spec/fixtures/kong_clustering.crt"))
busted_cert_content = busted_cert_content .. "\n" .. cluster_cert_content

pl_file.write(busted_cert_file, busted_cert_content)
cert_path = busted_cert_file
end
Expand Down
132 changes: 132 additions & 0 deletions spec/02-integration/01-helpers/05-rpc-mock_spec.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
local helpers = require("spec.helpers")
local server = require("spec.helpers.rpc_mock.server")
local client = require("spec.helpers.rpc_mock.client")
local get_node_id = helpers.get_node_id

local function trigger_change()
-- the initial sync is flaky. let's trigger a sync by creating a service
local admin_client = helpers.admin_client()
assert.res_status(201, admin_client:send {
method = "POST",
path = "/services/",
body = {
url = "http://example.com",
},
headers = {
["Content-Type"] = "application/json",
},
})
end

describe("rpc mock/hook", function()
describe("server side", function()
local server_mock

lazy_setup(function()
helpers.get_db_utils()

server_mock = server.new()
assert(server_mock:start())

assert(helpers.start_kong({
database = "off",
role = "data_plane",
cluster_mtls = "shared",
cluster_cert = "spec/fixtures/kong_clustering.crt",
cluster_cert_key = "spec/fixtures/kong_clustering.key",
nginx_conf = "spec/fixtures/custom_nginx.template",
cluster_rpc = "on",
cluster_rpc_sync = "on",
log_level = "debug",
cluster_control_plane = "127.0.0.1:8005",
}))
end)

lazy_teardown(function()
server_mock:stop(true)
helpers.stop_kong(nil, true)
end)

it("recording", function()
trigger_change()

local record = server_mock:wait_for_call()
assert.is_table(record.response.result.default.deltas)
end)

it("mock", function()
local client_version
server_mock:mock("kong.sync.v2.get_delta", function(node_id, payload)
client_version = payload.default.version
return { default = { version = 100, deltas = {} } }
end)
server_mock:attach_debugger()

local dp_id = get_node_id("servroot")

server_mock:wait_for_node(dp_id)

assert(server_mock:call(dp_id, "kong.sync.v2.notify_new_version", { default = { new_version = 100, } }))

-- the mock should have been called
helpers.wait_until(function()
return client_version
end, 20)
end)
end)

describe("client side", function()
local client_mock
local called = false

lazy_setup(function()
helpers.get_db_utils()

client_mock = assert(client.new())
assert(helpers.start_kong({
role = "control_plane",
cluster_mtls = "shared",
cluster_cert = "spec/fixtures/kong_clustering.crt",
cluster_cert_key = "spec/fixtures/kong_clustering.key",
nginx_conf = "spec/fixtures/custom_nginx.template",
cluster_rpc = "on",
cluster_rpc_sync = "on",
}))

client_mock.callbacks:register("kong.sync.v2.notify_new_version", function(node_id, payload)
called = true
end)

client_mock:start()
client_mock:wait_until_connected()
end)

lazy_teardown(function()
helpers.stop_kong(nil, true)
client_mock:stop()
end)

it("client->CP", function()
local res, err = client_mock:call("control_plane", "kong.sync.v2.get_delta", { default = { version = 0,},})
assert.is_nil(err)
assert.is_table(res and res.default and res.default.deltas)

local res, err = client_mock:call("control_plane", "kong.sync.v2.unknown", { default = { },})
assert.is_string(err)
assert.is_nil(res)
end)

it("CP->client", function()
-- this registers the data plane node
local res, err = client_mock:call("control_plane", "kong.sync.v2.get_delta", { default = { version = 0,},})
assert.is_nil(err)
assert.is_table(res and res.default and res.default.deltas)

trigger_change()

helpers.wait_until(function()
return called
end, 20)
end)
end)
end)
76 changes: 76 additions & 0 deletions spec/fixtures/custom_plugins/kong/plugins/rpc-debug/handler.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
local kong_meta = require("kong.meta")

local _M = {
PRIORITY = 1000,
VERSION = kong_meta.version,
}

local original_callbacks = {}
local inc_id = 0

function _M.init_worker()
kong.rpc.callbacks:register("kong.rpc.debug.register", function(node_id, register_payload)
local proxy_apis = register_payload.proxy_apis

for _, proxy_api in ipairs(proxy_apis) do
-- unregister and save the original callback
local original_cb
if not original_callbacks[proxy_api] then
original_callbacks[proxy_api] = kong.rpc.callbacks.callbacks[proxy_api]
end
original_cb = original_callbacks[proxy_api]
kong.rpc.callbacks.callbacks[proxy_api] = nil

kong.log.info("hooking registering RPC proxy API: ", proxy_api)
kong.rpc.callbacks:register(proxy_api, function(client_id, payload)
local id = inc_id
inc_id = inc_id + 1
kong.log.info("hooked proxy API ", proxy_api, " called by node: ", client_id)
kong.log.info("forwarding to node: ", node_id)
local res, err = kong.rpc:call(node_id, "kong.rpc.debug.mock", { call_id = id, method = proxy_api, node_id = client_id, payload = payload })
if not res then
return nil, "Failed to proxy(" .. node_id .. "): " .. err
end

if res.error then
return nil, res.error
end

if res.prehook or res.posthook then
if res.prehook then
payload = res.args
end

local origin_res, origin_err = original_cb(client_id, payload)

if res.posthook then
res, err = kong.rpc:call(node_id, "kong.rpc.debug.posthook", { call_id = id, method = proxy_api, node_id = client_id, payload = {result = origin_res, error = origin_err} })
if not res then
return nil, "Failed to call post hook(" .. node_id .. "): " .. err
end

return res.result, res.error
end
elseif res.mock then
return res.result, res.error
end

return nil, "invalid response from proxy"
end)
end

return true
end)

kong.rpc.callbacks:register("kong.rpc.debug.call", function(node_id, payload)
local res, err = kong.rpc:call(payload.node_id, payload.method, payload.args)
return res, err
end)

kong.rpc.callbacks:register("kong.rpc.debug.lua_code", function(node_id, payload)
local code = assert(loadstring(payload))
return code()
end)
end

return _M
12 changes: 12 additions & 0 deletions spec/fixtures/custom_plugins/kong/plugins/rpc-debug/schema.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
return {
name = "rpc-debug",
fields = {
{
config = {
type = "record",
fields = {
},
},
},
},
}
26 changes: 26 additions & 0 deletions spec/helpers.lua
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,30 @@ local server = reload_module("spec.internal.server")
local client = reload_module("spec.internal.client")
local wait = reload_module("spec.internal.wait")

-- redo the patches to apply the kong global patches
local _timerng

_timerng = require("resty.timerng").new({
min_threads = 16,
max_threads = 32,
})

_timerng:start()

_G.timerng = _timerng

_G.ngx.timer.at = function (delay, callback, ...)
return _timerng:at(delay, callback, ...)
end

_G.ngx.timer.every = function (interval, callback, ...)
return _timerng:every(interval, callback, ...)
end

if not kong.timer then
kong.timer = _timerng
end


----------------
-- Variables/constants
Expand Down Expand Up @@ -226,4 +250,6 @@ local wait = reload_module("spec.internal.wait")
make_temp_dir = misc.make_temp_dir,

build_go_plugins = cmd.build_go_plugins,

get_node_id = misc.get_node_id,
}
68 changes: 68 additions & 0 deletions spec/helpers/rpc_mock/client.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
-- by importing helpers, we ensure the kong PDK module is initialized
local helpers = require "spec.helpers"
local rpc_mgr = require("kong.clustering.rpc.manager")
local default_cert = require("spec.helpers.rpc_mock.default").default_cert
local uuid = require "kong.tools.uuid"

StarlightIbuki marked this conversation as resolved.
Show resolved Hide resolved

local _M = {}


local default_dp_conf = {
role = "data_plane",
cluster_control_plane = "localhost:8005",
}

setmetatable(default_dp_conf, { __index = default_cert })
local default_meta = { __index = default_dp_conf, }


local function do_nothing() end


local function client_stop(rpc_mgr)
-- a hacky way to stop rpc_mgr from reconnecting
rpc_mgr.try_connect = do_nothing

-- this will stop all connections
for _, socket in pairs(rpc_mgr.clients) do
for conn in pairs(socket) do
pcall(conn.stop, conn)
end
end
end


local function client_is_connected(rpc_mgr)
for _, socket in pairs(rpc_mgr.clients) do
if next(socket) then
return true
end
end
return false
end


local function client_wait_until_connected(rpc_mgr, timeout)
return helpers.wait_until(function()
return rpc_mgr:is_connected()
end, timeout or 15)
end


-- TODO: let client not emits logs as it's expected to fail to connect for the first few seconds
function _M.new(opts)
opts = opts or {}
setmetatable(opts, default_meta)
local ret = rpc_mgr.new(default_dp_conf, opts.name or uuid.uuid())

ret.stop = client_stop
ret.is_connected = client_is_connected
ret.start = ret.try_connect
ret.wait_until_connected = client_wait_until_connected

return ret
end


return _M
10 changes: 10 additions & 0 deletions spec/helpers/rpc_mock/default.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
local default_cert = {
cluster_mtls = "shared",
cluster_cert = "spec/fixtures/kong_clustering.crt",
cluster_cert_key = "spec/fixtures/kong_clustering.key",
nginx_conf = "spec/fixtures/custom_nginx.template",
}

return {
default_cert = default_cert,
}
Loading
Loading