From fe3f8b2988d61fb24a5b50c342eea7b4bdad5900 Mon Sep 17 00:00:00 2001 From: Datong Sun Date: Fri, 26 Apr 2024 01:00:58 -0700 Subject: [PATCH] fix(clustering/rpc): fix incorrect `is_timeout()` call introduced during review address and adds tests for concentrator. This fixes error like: ``` concentrator.lua:237: [rpc] concentrator event loop error: wait_for_notification error: receive_message: failed to get type: timeout, reconnecting in 8 seconds, context: ngx.timer ``` --- kong/clustering/rpc/concentrator.lua | 2 +- .../18-hybrid_rpc/04-concentrator_spec.lua | 106 ++++++++++++++++++ .../nginx_kong_test_custom_inject_http.lua | 4 +- 3 files changed, 109 insertions(+), 3 deletions(-) create mode 100644 spec/02-integration/18-hybrid_rpc/04-concentrator_spec.lua diff --git a/kong/clustering/rpc/concentrator.lua b/kong/clustering/rpc/concentrator.lua index a7815d7a6c19..c1370a58e198 100644 --- a/kong/clustering/rpc/concentrator.lua +++ b/kong/clustering/rpc/concentrator.lua @@ -186,7 +186,7 @@ function _M:_event_loop(lconn) local res, err = lconn:wait_for_notification() if not res then - if is_timeout(err) then + if not is_timeout(err) then return nil, "wait_for_notification error: " .. err end diff --git a/spec/02-integration/18-hybrid_rpc/04-concentrator_spec.lua b/spec/02-integration/18-hybrid_rpc/04-concentrator_spec.lua new file mode 100644 index 000000000000..d818d87b0a1b --- /dev/null +++ b/spec/02-integration/18-hybrid_rpc/04-concentrator_spec.lua @@ -0,0 +1,106 @@ +local helpers = require "spec.helpers" +local cjson = require("cjson.safe") + + +local function obtain_dp_node_id() + local dp_node_id + + helpers.wait_until(function() + local admin_client = helpers.admin_client() + finally(function() + admin_client:close() + end) + + local res = assert(admin_client:get("/clustering/data-planes")) + local body = assert.res_status(200, res) + local json = cjson.decode(body) + + for _, v in pairs(json.data) do + if v.ip == "127.0.0.1" and ngx.time() - v.last_seen < 3 then + dp_node_id = v.id + return true + end + end + end, 10) + + return dp_node_id +end + + +for _, strategy in helpers.each_strategy() do + describe("Hybrid Mode RPC over DB concentrator #" .. strategy, function() + + lazy_setup(function() + helpers.get_db_utils(strategy, { + "clustering_data_planes", + }) -- runs migrations + + assert(helpers.start_kong({ + role = "control_plane", + cluster_cert = "spec/fixtures/kong_clustering.crt", + cluster_cert_key = "spec/fixtures/kong_clustering.key", + database = strategy, + cluster_listen = "127.0.0.1:9005", + admin_listen = "127.0.0.1:" .. helpers.get_available_port(), + nginx_conf = "spec/fixtures/custom_nginx.template", + })) + + assert(helpers.start_kong({ + role = "control_plane", + cluster_cert = "spec/fixtures/kong_clustering.crt", + prefix = "servroot3", + cluster_cert_key = "spec/fixtures/kong_clustering.key", + database = strategy, + cluster_listen = "127.0.0.1:" .. helpers.get_available_port(), + nginx_conf = "spec/fixtures/custom_nginx.template", + })) + + assert(helpers.start_kong({ + role = "data_plane", + database = "off", + prefix = "servroot2", + cluster_cert = "spec/fixtures/kong_clustering.crt", + cluster_cert_key = "spec/fixtures/kong_clustering.key", + cluster_control_plane = "127.0.0.1:9005", + proxy_listen = "0.0.0.0:9002", + nginx_conf = "spec/fixtures/custom_nginx.template", + })) + end) + + lazy_teardown(function() + helpers.stop_kong("servroot2") + helpers.stop_kong("servroot3") + helpers.stop_kong() + end) + + describe("Dynamic log level over RPC", function() + it("can get the current log level", function() + local dp_node_id = obtain_dp_node_id() + + -- this sleep is *not* needed for the below wait_until to succeed, + -- but it makes the wait_until tried succeed sooner because this + -- extra time gives the concentrator enough time to report the node is + -- online inside the DB. Without it, the first call to "/log-level" + -- will always timeout after 5 seconds + ngx.sleep(1) + + helpers.wait_until(function() + local admin_client = helpers.admin_client() + finally(function() + admin_client:close() + end) + + local res = assert(admin_client:get("/clustering/data-planes/" .. dp_node_id .. "/log-level")) + if res.status == 200 then + local body = assert.res_status(200, res) + local json = cjson.decode(body) + assert.equal(0, json.timeout) + assert.equal("debug", json.current_level) + assert.equal("debug", json.original_level) + return true + end + end, 10) + end) + end) + end) +end diff --git a/spec/fixtures/template_inject/nginx_kong_test_custom_inject_http.lua b/spec/fixtures/template_inject/nginx_kong_test_custom_inject_http.lua index 46439562963a..2c8dcb95b890 100644 --- a/spec/fixtures/template_inject/nginx_kong_test_custom_inject_http.lua +++ b/spec/fixtures/template_inject/nginx_kong_test_custom_inject_http.lua @@ -5,8 +5,8 @@ lua_shared_dict kong_mock_upstream_loggers 10m; server { server_name mock_upstream; - listen 15555; - listen 15556 ssl; + listen 15555 reuseport; + listen 15556 ssl reuseport; > for i = 1, #ssl_cert do ssl_certificate $(ssl_cert[i]);