diff --git a/kong-3.5.0-0.rockspec b/kong-3.5.0-0.rockspec index 2b4ae9d3c033..9677d002f174 100644 --- a/kong-3.5.0-0.rockspec +++ b/kong-3.5.0-0.rockspec @@ -69,7 +69,7 @@ build = { ["kong.clustering"] = "kong/clustering/init.lua", ["kong.clustering.data_plane"] = "kong/clustering/data_plane.lua", - ["kong.clustering.data_plane_stream_protocol"] = "kong/clustering/protocol.lua", + ["kong.clustering.protocol"] = "kong/clustering/protocol.lua", ["kong.clustering.control_plane"] = "kong/clustering/control_plane.lua", ["kong.clustering.utils"] = "kong/clustering/utils.lua", ["kong.clustering.events"] = "kong/clustering/events.lua", diff --git a/kong/clustering/compat/checkers.lua b/kong/clustering/compat/checkers.lua index 7fbfca3cec22..ea11b1598823 100644 --- a/kong/clustering/compat/checkers.lua +++ b/kong/clustering/compat/checkers.lua @@ -24,14 +24,14 @@ end local compatible_checkers = { { 3003000000, --[[ 3.3.0.0 ]] - function(config_table, dp_version, log_suffix) + function(config, dp_version, log_suffix) local has_update -- Support legacy queueing parameters for plugins that used queues prior to 3.3. `retry_count` has been -- completely removed, so we always supply the default of 10 as that provides the same behavior as with a -- pre 3.3 CP. The other queueing related legacy parameters can be determined from the new queue -- configuration table. - for _, plugin in ipairs(config_table.plugins or {}) do + for _, plugin in ipairs(config.plugins or {}) do local config = plugin.config if plugin.name == 'statsd' or plugin.name == 'datadog' then @@ -93,10 +93,10 @@ local compatible_checkers = { }, { 3003000000, --[[ 3.3.0.0 ]] - function(config_table, dp_version, log_suffix) + function(config, dp_version, log_suffix) local has_update - for _, config_entity in ipairs(config_table.vaults or {}) do + for _, config_entity in ipairs(config.vaults or {}) do if config_entity.name == "env" and type(config_entity.config) == "table" then local config = config_entity.config local prefix = config.prefix @@ -116,7 +116,7 @@ local compatible_checkers = { }, { 3003000000, --[[ 3.3.0.0 ]] - function(config_table, dp_version, log_suffix) + function(config, dp_version, log_suffix) -- remove updated_at field for core entities ca_certificates, certificates, consumers, -- targets, upstreams, plugins, workspaces, clustering_data_planes and snis local entity_names = { @@ -128,7 +128,7 @@ local compatible_checkers = { local updated_entities = {} for _, name in ipairs(entity_names) do - for _, config_entity in ipairs(config_table[name] or {}) do + for _, config_entity in ipairs(config[name] or {}) do if config_entity["updated_at"] then config_entity["updated_at"] = nil @@ -152,8 +152,8 @@ local compatible_checkers = { }, { 3002000000, --[[ 3.2.0.0 ]] - function(config_table, dp_version, log_suffix) - local config_services = config_table["services"] + function(config, dp_version, log_suffix) + local config_services = config["services"] if not config_services then return nil end @@ -188,8 +188,8 @@ local compatible_checkers = { }, { 3002000000, --[[ 3.2.0.0 ]] - function(config_table, dp_version, log_suffix) - local config_upstreams = config_table["upstreams"] + function(config, dp_version, log_suffix) + local config_upstreams = config["upstreams"] if not config_upstreams then return nil end @@ -214,8 +214,8 @@ local compatible_checkers = { }, { 3002000000, --[[ 3.2.0.0 ]] - function(config_table, dp_version, log_suffix) - local config_plugins = config_table["plugins"] + function(config, dp_version, log_suffix) + local config_plugins = config["plugins"] if not config_plugins then return nil end @@ -240,8 +240,8 @@ local compatible_checkers = { }, { 3001000000, --[[ 3.1.0.0 ]] - function(config_table, dp_version, log_suffix) - local config_upstreams = config_table["upstreams"] + function(config, dp_version, log_suffix) + local config_upstreams = config["upstreams"] if not config_upstreams then return nil end diff --git a/kong/clustering/compat/init.lua b/kong/clustering/compat/init.lua index 8624a349d83d..337e725cc2ea 100644 --- a/kong/clustering/compat/init.lua +++ b/kong/clustering/compat/init.lua @@ -374,13 +374,13 @@ function _M.update_compatible_payload(payload, dp_version, log_suffix) local ver = checker[1] local fn = checker[2] if dp_version_num < ver then - fn(payload.config_table, dp_version, log_suffix) + fn(payload.config, dp_version, log_suffix) end end local fields = get_removed_fields(dp_version_num) if fields then - invalidate_keys_from_config(payload.config_table.plugins, fields, log_suffix, dp_version_num) + invalidate_keys_from_config(payload.config.plugins, fields, log_suffix, dp_version_num) end return payload diff --git a/kong/clustering/config_helper.lua b/kong/clustering/config_helper.lua index 790f3e72c15d..7748a131d195 100644 --- a/kong/clustering/config_helper.lua +++ b/kong/clustering/config_helper.lua @@ -138,18 +138,18 @@ local function calculate_hash(input, o) end -local function calculate_config_hash(config_table) +local function calculate_config_hash(config) local o = buffer.new() - if type(config_table) ~= "table" then - local config_hash = calculate_hash(config_table, o) + if type(config) ~= "table" then + local config_hash = calculate_hash(config, o) return config_hash, { config = config_hash, } end - local routes = config_table.routes - local services = config_table.services - local plugins = config_table.plugins - local upstreams = config_table.upstreams - local targets = config_table.targets + local routes = config.routes + local services = config.services + local plugins = config.plugins + local upstreams = config.upstreams + local targets = config.targets local routes_hash = calculate_hash(routes, o) local services_hash = calculate_hash(services, o) @@ -157,13 +157,13 @@ local function calculate_config_hash(config_table) local upstreams_hash = calculate_hash(upstreams, o) local targets_hash = calculate_hash(targets, o) - config_table.routes = nil - config_table.services = nil - config_table.plugins = nil - config_table.upstreams = nil - config_table.targets = nil + config.routes = nil + config.services = nil + config.plugins = nil + config.upstreams = nil + config.targets = nil - local rest_hash = calculate_hash(config_table, o) + local rest_hash = calculate_hash(config, o) local config_hash = ngx_md5(routes_hash .. services_hash .. plugins_hash .. @@ -171,11 +171,11 @@ local function calculate_config_hash(config_table) targets_hash .. rest_hash) - config_table.routes = routes - config_table.services = services - config_table.plugins = plugins - config_table.upstreams = upstreams - config_table.targets = targets + config.routes = routes + config.services = services + config.plugins = plugins + config.upstreams = upstreams + config.targets = targets return config_hash, { config = config_hash, @@ -202,11 +202,11 @@ local function fill_empty_hashes(hashes) end end -function _M.update(declarative_config, config_table, config_hash, hashes) - assert(type(config_table) == "table") +function _M.update(declarative_config, config, config_hash, hashes) + assert(type(config) == "table") if not config_hash then - config_hash, hashes = calculate_config_hash(config_table) + config_hash, hashes = calculate_config_hash(config) end if hashes then @@ -221,7 +221,7 @@ function _M.update(declarative_config, config_table, config_hash, hashes) end local entities, err, _, meta, new_hash = - declarative_config:parse_table(config_table, config_hash) + declarative_config:parse_table(config, config_hash) if not entities then return nil, "bad config received from control plane " .. err end diff --git a/spec/01-unit/19-hybrid/03-compat_spec.lua b/spec/01-unit/19-hybrid/03-compat_spec.lua index 11cc6e672783..b46ce2ab7aa2 100644 --- a/spec/01-unit/19-hybrid/03-compat_spec.lua +++ b/spec/01-unit/19-hybrid/03-compat_spec.lua @@ -1,8 +1,6 @@ local compat = require("kong.clustering.compat") local helpers = require ("spec.helpers") local declarative = require("kong.db.declarative") -local inflate_gzip = require("kong.tools.utils").inflate_gzip -local cjson_decode = require("cjson.safe").decode local ssl_fixtures = require ("spec.fixtures.ssl") local function reset_fields() @@ -131,16 +129,11 @@ describe("kong.clustering.compat", function() lazy_setup(function() test_with = function(plugins, dp_version) - local has_update, new_conf = compat.update_compatible_payload( - { config_table = { plugins = plugins } }, dp_version, "" + local new_conf = compat.update_compatible_payload( + { config = { plugins = plugins } }, dp_version, "" ) - if has_update then - new_conf = cjson_decode(inflate_gzip(new_conf)) - return new_conf.config_table.plugins - end - - return plugins + return new_conf.config.plugins end compat._set_removed_fields({ @@ -391,9 +384,9 @@ describe("kong.clustering.compat", function() end) it(function() - local config = { config_table = declarative.export_config() } - local has_update = compat.update_compatible_payload(config, "3.0.0", "test_") - assert.truthy(has_update) + local data = { config = declarative.export_config() } + local result = compat.update_compatible_payload(data, "3.0.0", "test_") + assert.truthy(result) end) end) end @@ -559,33 +552,26 @@ describe("kong.clustering.compat", function() }, }, { _transform = true })) - config = { config_table = declarative.export_config() } + config = { config = declarative.export_config() } end) it(function() - local has_update, result = compat.update_compatible_payload(config, "3.0.0", "test_") - assert.truthy(has_update) - result = cjson_decode(inflate_gzip(result)).config_table - - local upstreams = assert(assert(assert(result).upstreams)) + local result = compat.update_compatible_payload(config, "3.0.0", "test_") + local upstreams = assert(assert(assert(assert(result).config).upstreams)) assert.is_nil(assert(upstreams[1]).use_srv_name) assert.is_nil(assert(upstreams[2]).use_srv_name) assert.is_nil(assert(upstreams[3]).use_srv_name) end) it("plugin.instance_name", function() - local has_update, result = compat.update_compatible_payload(config, "3.1.0", "test_") - assert.truthy(has_update) - result = cjson_decode(inflate_gzip(result)).config_table - local plugins = assert(assert(assert(result).plugins)) + local result = compat.update_compatible_payload(config, "3.1.0", "test_") + local plugins = assert(assert(assert(assert(result).config).plugins)) assert.is_nil(assert(plugins[1]).instance_name) assert.is_nil(assert(plugins[2]).instance_name) end) it("plugin.queue_parameters", function() - local has_update, result = compat.update_compatible_payload(config, "3.2.0", "test_") - assert.truthy(has_update) - result = cjson_decode(inflate_gzip(result)).config_table - local plugins = assert(assert(assert(result).plugins)) + local result = compat.update_compatible_payload(config, "3.2.0", "test_") + local plugins = assert(assert(assert(assert(result).config).plugins)) for _, plugin in ipairs(plugins) do if plugin.name == "statsd" then assert.equals(10, plugin.config.retry_count) @@ -603,19 +589,15 @@ describe("kong.clustering.compat", function() end) it("upstream.algorithm", function() - local has_update, result = compat.update_compatible_payload(config, "3.1.0", "test_") - assert.truthy(has_update) - result = cjson_decode(inflate_gzip(result)).config_table - local upstreams = assert(assert(assert(result).upstreams)) + local result = compat.update_compatible_payload(config, "3.1.0", "test_") + local upstreams = assert(assert(assert(assert(result).config).upstreams)) assert.equals(assert(upstreams[4]).algorithm, "round-robin") assert.equals(assert(upstreams[5]).algorithm, "round-robin") end) it("service.protocol", function() - local has_update, result = compat.update_compatible_payload(config, "3.1.0", "test_") - assert.truthy(has_update) - result = cjson_decode(inflate_gzip(result)).config_table - local services = assert(assert(assert(result).services)) + local result = compat.update_compatible_payload(config, "3.1.0", "test_") + local services = assert(assert(assert(assert(result).config).services)) assert.is_nil(assert(services[1]).client_certificate) assert.is_nil(assert(services[1]).tls_verify) assert.is_nil(assert(services[1]).tls_verify_depth) diff --git a/spec/02-integration/09-hybrid_mode/01-sync_spec.lua b/spec/02-integration/09-hybrid_mode/01-sync_spec.lua index 95ff59a75287..80481fe688ab 100644 --- a/spec/02-integration/09-hybrid_mode/01-sync_spec.lua +++ b/spec/02-integration/09-hybrid_mode/01-sync_spec.lua @@ -494,8 +494,7 @@ describe("CP/DP #version check #" .. strategy, function() node_process_conf = harness.process_conf, })) - assert.equals("reconfigure", res.type) - assert.is_table(res.config_table) + assert.is_table(res.config) -- needs wait_until for C* convergence helpers.wait_until(function() diff --git a/spec/02-integration/09-hybrid_mode/09-config-compat_spec.lua b/spec/02-integration/09-hybrid_mode/09-config-compat_spec.lua index 3e4470c05f7d..ca25bbb707bb 100644 --- a/spec/02-integration/09-hybrid_mode/09-config-compat_spec.lua +++ b/spec/02-integration/09-hybrid_mode/09-config-compat_spec.lua @@ -25,21 +25,17 @@ local function cluster_client(opts) }) assert.is_nil(err) - if res and res.config_table then - res.config = res.config_table - end - return res end local function get_plugin(node_id, node_version, name) local res, err = cluster_client({ id = node_id, version = node_version }) assert.is_nil(err) - assert.is_table(res and res.config_table and res.config_table.plugins, + assert.is_table(res and res.config and res.config.plugins, "invalid response from clustering client") local plugin - for _, p in ipairs(res.config_table.plugins or {}) do + for _, p in ipairs(res.config.plugins or {}) do if p.name == name then plugin = p break diff --git a/spec/helpers.lua b/spec/helpers.lua index 6ba834fbf471..b892cfd5a01e 100644 --- a/spec/helpers.lua +++ b/spec/helpers.lua @@ -3800,23 +3800,53 @@ local function clustering_client(opts) process_conf = opts.node_process_conf, })) assert(c:send_binary(payload)) - assert(c:send_ping(string.rep("0", 32))) - local data, typ, err - data, typ, err = c:recv_frame() - c:close() + local config + local hash - if typ == "binary" then - local odata = assert(utils.inflate_gzip(data)) - local msg = assert(cjson.decode(odata)) - return msg + while true do + local data, typ, err = c:recv_frame() + if typ == "binary" then + local msg = assert(cjson.decode(data)) + if msg.type == "reconfigure:start" then + config = msg.meta + hash = msg.hash + + elseif msg.type == "entities" then + assert(hash == msg.hash, "reconfigure hash mismatch") + + local name = msg.name + local entities = assert(cjson.decode(assert(utils.inflate_gzip(msg.data)))) + if config[name] then + local count = #config[name] + for i, entity in ipairs(entities) do + config[name][count + i] = entity + end - elseif typ == "pong" then - return "PONG" - end + else + config[name] = entities + end - return nil, "unknown frame from CP: " .. (typ or err) + elseif msg.type == "reconfigure:end" then + c:close() + assert(hash == msg.hash, "reconfigure hash mismatch") + return { + config = config, + hashes = msg.hashes, + timestamp = msg.timestamp, + } + end + + elseif typ == "pong" then + c:close() + return "PONG" + + else + c:close() + return nil, "unknown frame from CP: " .. (typ or err) + end + end end