From fad17ba3c6d4701a78e784e3cd45a846fddb9f99 Mon Sep 17 00:00:00 2001 From: Xiaochen Wang Date: Tue, 17 Dec 2024 14:10:23 +0800 Subject: [PATCH] fix(clustering/rpc): support `cluster_use_proxy` option for clustering rpc protocol (#13971) The original hybrid mode connections like full sync (sync v1) support forward proxy via the option `cluster_use_proxy`. While clustering RPC protocol does not support this, this commit introduces this feature to RPC protocol. https://konghq.atlassian.net/browse/KAG-5555 --- kong/clustering/rpc/manager.lua | 12 ++++++++++++ kong/clustering/utils.lua | 4 ++-- .../09-hybrid_mode/10-forward-proxy_spec.lua | 15 ++++++++++++--- .../14-dp_privileged_agent_spec.lua | 1 + 4 files changed, 27 insertions(+), 5 deletions(-) diff --git a/kong/clustering/rpc/manager.lua b/kong/clustering/rpc/manager.lua index c3925c5073cb..3d08963b4687 100644 --- a/kong/clustering/rpc/manager.lua +++ b/kong/clustering/rpc/manager.lua @@ -30,6 +30,7 @@ local cjson_encode = cjson.encode local cjson_decode = cjson.decode local validate_client_cert = clustering_tls.validate_client_cert local CLUSTERING_PING_INTERVAL = constants.CLUSTERING_PING_INTERVAL +local parse_proxy_url = require("kong.clustering.utils").parse_proxy_url local RPC_MATA_V1 = "kong.meta.v1" @@ -474,6 +475,17 @@ function _M:connect(premature, node_id, host, path, cert, key) local c = assert(client:new(WS_OPTS)) + if self.conf.cluster_use_proxy then + local proxy_opts = parse_proxy_url(self.conf.proxy_server) + opts.proxy_opts = { + wss_proxy = proxy_opts.proxy_url, + wss_proxy_authorization = proxy_opts.proxy_authorization, + } + + ngx_log(ngx_DEBUG, "[rpc] using proxy ", proxy_opts.proxy_url, + " to connect control plane") + end + local ok, err = c:connect(uri, opts) if not ok then ngx_log(ngx_ERR, "[rpc] unable to connect to peer: ", err) diff --git a/kong/clustering/utils.lua b/kong/clustering/utils.lua index 5ee56d30bafc..ee34e7dce2e4 100644 --- a/kong/clustering/utils.lua +++ b/kong/clustering/utils.lua @@ -33,7 +33,7 @@ local CLUSTER_PROXY_SSL_TERMINATOR_SOCK = fmt("unix:%s/%s", local _M = {} -local function parse_proxy_url(proxy_server) +function _M.parse_proxy_url(proxy_server) local ret = {} if proxy_server then @@ -84,7 +84,7 @@ function _M.connect_cp(dp, endpoint, protocols) } if conf.cluster_use_proxy then - local proxy_opts = parse_proxy_url(conf.proxy_server) + local proxy_opts = _M.parse_proxy_url(conf.proxy_server) opts.proxy_opts = { wss_proxy = proxy_opts.proxy_url, wss_proxy_authorization = proxy_opts.proxy_authorization, diff --git a/spec/02-integration/09-hybrid_mode/10-forward-proxy_spec.lua b/spec/02-integration/09-hybrid_mode/10-forward-proxy_spec.lua index a7f11e41059e..27856b4554ee 100644 --- a/spec/02-integration/09-hybrid_mode/10-forward-proxy_spec.lua +++ b/spec/02-integration/09-hybrid_mode/10-forward-proxy_spec.lua @@ -71,11 +71,13 @@ local proxy_configs = { -- if existing lmdb data is set, the service/route exists and -- test run too fast before the proxy connection is established --- XXX FIXME: enable inc_sync = on -for _, inc_sync in ipairs { "off" } do +for _, v in ipairs({ {"off", "off"}, {"on", "off"}, {"on", "on"}, }) do + local rpc, inc_sync = v[1], v[2] for _, strategy in helpers.each_strategy() do for proxy_desc, proxy_opts in pairs(proxy_configs) do - describe("CP/DP sync through proxy (" .. proxy_desc .. ") works with #" .. strategy .. " inc_sync=" .. inc_sync .. " backend", function() + describe("CP/DP sync through proxy (" .. proxy_desc .. ") works with #" + .. strategy .. " rpc=" .. rpc .. " inc_sync=" .. inc_sync + .. " backend", function() lazy_setup(function() helpers.get_db_utils(strategy) -- runs migrations @@ -87,6 +89,7 @@ for _, strategy in helpers.each_strategy() do db_update_frequency = 0.1, cluster_listen = "127.0.0.1:9005", nginx_conf = "spec/fixtures/custom_nginx.template", + cluster_rpc = rpc, cluster_incremental_sync = inc_sync, })) @@ -108,6 +111,7 @@ for _, strategy in helpers.each_strategy() do proxy_server_ssl_verify = proxy_opts.proxy_server_ssl_verify, lua_ssl_trusted_certificate = proxy_opts.lua_ssl_trusted_certificate, + cluster_rpc = rpc, cluster_incremental_sync = inc_sync, -- this is unused, but required for the template to include a stream {} block @@ -166,6 +170,11 @@ for _, strategy in helpers.each_strategy() do if auth_on then assert.matches("accepted basic proxy%-authorization", contents) end + + -- check the debug log of the `cluster_use_proxy` option + local line = inc_sync == "on" and "[rpc] using proxy" or + "[clustering] using proxy" + assert.logfile("servroot2/logs/error.log").has.line(line, true) end) end) end) diff --git a/spec/02-integration/09-hybrid_mode/14-dp_privileged_agent_spec.lua b/spec/02-integration/09-hybrid_mode/14-dp_privileged_agent_spec.lua index b0743edb7467..1c5e351bf874 100644 --- a/spec/02-integration/09-hybrid_mode/14-dp_privileged_agent_spec.lua +++ b/spec/02-integration/09-hybrid_mode/14-dp_privileged_agent_spec.lua @@ -20,6 +20,7 @@ describe("DP diabled Incremental Sync RPC #" .. strategy, function() cluster_listen = "127.0.0.1:9005", nginx_conf = "spec/fixtures/custom_nginx.template", + cluster_rpc = "on", cluster_incremental_sync = "on", -- ENABLE incremental sync }))