Skip to content

Commit

Permalink
fix(clustering/rpc): support cluster_use_proxy option for clusterin…
Browse files Browse the repository at this point in the history
…g rpc protocol (#13971)

The original hybrid mode connections like full sync (sync v1) support forward proxy via the option `cluster_use_proxy`. While clustering RPC protocol does not support this, this commit introduces this feature to RPC protocol.

https://konghq.atlassian.net/browse/KAG-5555
  • Loading branch information
chobits authored Dec 17, 2024
1 parent eeded78 commit fad17ba
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 5 deletions.
12 changes: 12 additions & 0 deletions kong/clustering/rpc/manager.lua
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ local cjson_encode = cjson.encode
local cjson_decode = cjson.decode
local validate_client_cert = clustering_tls.validate_client_cert
local CLUSTERING_PING_INTERVAL = constants.CLUSTERING_PING_INTERVAL
local parse_proxy_url = require("kong.clustering.utils").parse_proxy_url


local RPC_MATA_V1 = "kong.meta.v1"
Expand Down Expand Up @@ -474,6 +475,17 @@ function _M:connect(premature, node_id, host, path, cert, key)

local c = assert(client:new(WS_OPTS))

if self.conf.cluster_use_proxy then
local proxy_opts = parse_proxy_url(self.conf.proxy_server)
opts.proxy_opts = {
wss_proxy = proxy_opts.proxy_url,
wss_proxy_authorization = proxy_opts.proxy_authorization,
}

ngx_log(ngx_DEBUG, "[rpc] using proxy ", proxy_opts.proxy_url,
" to connect control plane")
end

local ok, err = c:connect(uri, opts)
if not ok then
ngx_log(ngx_ERR, "[rpc] unable to connect to peer: ", err)
Expand Down
4 changes: 2 additions & 2 deletions kong/clustering/utils.lua
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ local CLUSTER_PROXY_SSL_TERMINATOR_SOCK = fmt("unix:%s/%s",
local _M = {}


local function parse_proxy_url(proxy_server)
function _M.parse_proxy_url(proxy_server)
local ret = {}

if proxy_server then
Expand Down Expand Up @@ -84,7 +84,7 @@ function _M.connect_cp(dp, endpoint, protocols)
}

if conf.cluster_use_proxy then
local proxy_opts = parse_proxy_url(conf.proxy_server)
local proxy_opts = _M.parse_proxy_url(conf.proxy_server)
opts.proxy_opts = {
wss_proxy = proxy_opts.proxy_url,
wss_proxy_authorization = proxy_opts.proxy_authorization,
Expand Down
15 changes: 12 additions & 3 deletions spec/02-integration/09-hybrid_mode/10-forward-proxy_spec.lua
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,13 @@ local proxy_configs = {
-- if existing lmdb data is set, the service/route exists and
-- test run too fast before the proxy connection is established

-- XXX FIXME: enable inc_sync = on
for _, inc_sync in ipairs { "off" } do
for _, v in ipairs({ {"off", "off"}, {"on", "off"}, {"on", "on"}, }) do
local rpc, inc_sync = v[1], v[2]
for _, strategy in helpers.each_strategy() do
for proxy_desc, proxy_opts in pairs(proxy_configs) do
describe("CP/DP sync through proxy (" .. proxy_desc .. ") works with #" .. strategy .. " inc_sync=" .. inc_sync .. " backend", function()
describe("CP/DP sync through proxy (" .. proxy_desc .. ") works with #"
.. strategy .. " rpc=" .. rpc .. " inc_sync=" .. inc_sync
.. " backend", function()
lazy_setup(function()
helpers.get_db_utils(strategy) -- runs migrations

Expand All @@ -87,6 +89,7 @@ for _, strategy in helpers.each_strategy() do
db_update_frequency = 0.1,
cluster_listen = "127.0.0.1:9005",
nginx_conf = "spec/fixtures/custom_nginx.template",
cluster_rpc = rpc,
cluster_incremental_sync = inc_sync,
}))

Expand All @@ -108,6 +111,7 @@ for _, strategy in helpers.each_strategy() do
proxy_server_ssl_verify = proxy_opts.proxy_server_ssl_verify,
lua_ssl_trusted_certificate = proxy_opts.lua_ssl_trusted_certificate,

cluster_rpc = rpc,
cluster_incremental_sync = inc_sync,

-- this is unused, but required for the template to include a stream {} block
Expand Down Expand Up @@ -166,6 +170,11 @@ for _, strategy in helpers.each_strategy() do
if auth_on then
assert.matches("accepted basic proxy%-authorization", contents)
end

-- check the debug log of the `cluster_use_proxy` option
local line = inc_sync == "on" and "[rpc] using proxy" or
"[clustering] using proxy"
assert.logfile("servroot2/logs/error.log").has.line(line, true)
end)
end)
end)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ describe("DP diabled Incremental Sync RPC #" .. strategy, function()
cluster_listen = "127.0.0.1:9005",
nginx_conf = "spec/fixtures/custom_nginx.template",

cluster_rpc = "on",
cluster_incremental_sync = "on", -- ENABLE incremental sync
}))

Expand Down

1 comment on commit fad17ba

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bazel Build

Docker image available kong/kong:fad17ba3c6d4701a78e784e3cd45a846fddb9f99
Artifacts available https://github.com/Kong/kong/actions/runs/12367408586

Please sign in to comment.