Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(clustering): entity streaming based cp-dp protocol #11630

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions kong-3.5.0-0.rockspec
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ build = {

["kong.clustering"] = "kong/clustering/init.lua",
["kong.clustering.data_plane"] = "kong/clustering/data_plane.lua",
["kong.clustering.protocol"] = "kong/clustering/protocol.lua",
["kong.clustering.control_plane"] = "kong/clustering/control_plane.lua",
["kong.clustering.utils"] = "kong/clustering/utils.lua",
["kong.clustering.events"] = "kong/clustering/events.lua",
Expand Down Expand Up @@ -101,6 +102,8 @@ build = {
["kong.resty.mlcache"] = "kong/resty/mlcache/init.lua",
["kong.resty.mlcache.ipc"] = "kong/resty/mlcache/ipc.lua",

["kong.resty.lmdb.reset-transaction"] = "kong/resty/lmdb/reset-transaction.lua",

["kong.cmd"] = "kong/cmd/init.lua",
["kong.cmd.roar"] = "kong/cmd/roar.lua",
["kong.cmd.stop"] = "kong/cmd/stop.lua",
Expand Down
127 changes: 61 additions & 66 deletions kong/clustering/compat/checkers.lua
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,25 @@ local ipairs = ipairs
local type = type


local ngx_log = ngx.log
local ngx_WARN = ngx.WARN


local LOG_PREFIX = "[clustering] "


local log_warn_message
do
local ngx_log = ngx.log
local ngx_WARN = ngx.WARN
local fmt = string.format

local KONG_VERSION = require("kong.meta").version

local _log_prefix = "[clustering] "

log_warn_message = function(hint, action, dp_version, log_suffix)
local msg = fmt("Kong Gateway v%s %s " ..
"which is incompatible with dataplane version %s " ..
"and will %s.",
KONG_VERSION, hint, dp_version, action)
ngx_log(ngx_WARN, _log_prefix, msg, log_suffix)
ngx_log(ngx_WARN, LOG_PREFIX, msg, log_suffix)
end
end

Expand Down Expand Up @@ -92,64 +95,62 @@ local compatible_checkers = {
},

{ 3003000000, --[[ 3.3.0.0 ]]
function(config_table, dp_version, log_suffix)
function(config)
local has_update

-- Support legacy queueing parameters for plugins that used queues prior to 3.3. `retry_count` has been
-- completely removed, so we always supply the default of 10 as that provides the same behavior as with a
-- pre 3.3 CP. The other queueing related legacy parameters can be determined from the new queue
-- configuration table.
for _, plugin in ipairs(config_table.plugins or {}) do
local config = plugin.config

if plugin.name == 'statsd' or plugin.name == 'datadog' then
if type(config.retry_count) ~= "number" then
config.retry_count = 10
for _, plugin in ipairs(config.plugins or {}) do
local plugin_config = plugin.config
if plugin.name == "statsd" or plugin.name == "datadog" then
if type(plugin_config.retry_count) ~= "number" then
plugin_config.retry_count = 10
has_update = true
end

if type(config.queue_size) ~= "number" then
if config.queue and type(config.queue.max_batch_size) == "number" then
config.queue_size = config.queue.max_batch_size
if type(plugin_config.queue_size) ~= "number" then
if plugin_config.queue and type(plugin_config.queue.max_batch_size) == "number" then
plugin_config.queue_size = plugin_config.queue.max_batch_size
has_update = true

else
config.queue_size = 1
plugin_config.queue_size = 1
has_update = true
end
end

if type(config.flush_timeout) ~= "number" then
if config.queue and type(config.queue.max_coalescing_delay) == "number" then
config.flush_timeout = config.queue.max_coalescing_delay
if type(plugin_config.flush_timeout) ~= "number" then
if plugin_config.queue and type(plugin_config.queue.max_coalescing_delay) == "number" then
plugin_config.flush_timeout = plugin_config.queue.max_coalescing_delay
has_update = true

else
config.flush_timeout = 2
plugin_config.flush_timeout = 2
has_update = true
end
end

elseif plugin.name == 'opentelemetry' then

if type(config.batch_span_count) ~= "number" then
if config.queue and type(config.queue.max_batch_size) == "number" then
config.batch_span_count = config.queue.max_batch_size
elseif plugin.name == "opentelemetry" then
if type(plugin_config.batch_span_count) ~= "number" then
if plugin_config.queue and type(plugin_config.queue.max_batch_size) == "number" then
plugin_config.batch_span_count = plugin_config.queue.max_batch_size
has_update = true

else
config.batch_span_count = 200
plugin_config.batch_span_count = 200
has_update = true
end
end

if type(config.batch_flush_delay) ~= "number" then
if config.queue and type(config.queue.max_coalescing_delay) == "number" then
config.batch_flush_delay = config.queue.max_coalescing_delay
if type(plugin_config.batch_flush_delay) ~= "number" then
if plugin_config.queue and type(plugin_config.queue.max_coalescing_delay) == "number" then
plugin_config.batch_flush_delay = plugin_config.queue.max_coalescing_delay
has_update = true

else
config.batch_flush_delay = 3
plugin_config.batch_flush_delay = 3
has_update = true
end
end
Expand All @@ -161,18 +162,16 @@ local compatible_checkers = {
},

{ 3003000000, --[[ 3.3.0.0 ]]
function(config_table, dp_version, log_suffix)
function(config)
local has_update

for _, config_entity in ipairs(config_table.vaults or {}) do
for _, config_entity in ipairs(config.vaults or {}) do
if config_entity.name == "env" and type(config_entity.config) == "table" then
local config = config_entity.config
local prefix = config.prefix

local entity_config = config_entity.config
local prefix = entity_config.prefix
if type(prefix) == "string" then
local new_prefix = prefix:gsub("-", "_")
if new_prefix ~= prefix then
config.prefix = new_prefix
entity_config.prefix = new_prefix
has_update = true
end
end
Expand All @@ -184,7 +183,7 @@ local compatible_checkers = {
},

{ 3003000000, --[[ 3.3.0.0 ]]
function(config_table, dp_version, log_suffix)
function(config, dp_version, log_suffix)
-- remove updated_at field for core entities ca_certificates, certificates, consumers,
-- targets, upstreams, plugins, workspaces, clustering_data_planes and snis
local entity_names = {
Expand All @@ -196,19 +195,15 @@ local compatible_checkers = {
local updated_entities = {}

for _, name in ipairs(entity_names) do
for _, config_entity in ipairs(config_table[name] or {}) do
if config_entity["updated_at"] then

config_entity["updated_at"] = nil

for _, config_entity in ipairs(config[name] or {}) do
if config_entity.updated_at then
config_entity.updated_at = nil
has_update = true

if not updated_entities[name] then
log_warn_message("contains configuration '" .. name .. ".updated_at'",
"be removed",
dp_version,
log_suffix)

updated_entities[name] = true
end
end
Expand All @@ -220,23 +215,23 @@ local compatible_checkers = {
},

{ 3002000000, --[[ 3.2.0.0 ]]
function(config_table, dp_version, log_suffix)
local config_services = config_table["services"]
function(config, dp_version, log_suffix)
local config_services = config.services
if not config_services then
return nil
end

local has_update
for _, t in ipairs(config_services) do
if t["protocol"] == "tls" then
if t["client_certificate"] or t["tls_verify"] or
t["tls_verify_depth"] or t["ca_certificates"]
if t.protocol == "tls" then
if t.client_certificate or t.tls_verify or
t.tls_verify_depth or t.ca_certificates
then

t["client_certificate"] = nil
t["tls_verify"] = nil
t["tls_verify_depth"] = nil
t["ca_certificates"] = nil
t.client_certificate = nil
t.tls_verify = nil
t.tls_verify_depth = nil
t.ca_certificates = nil

has_update = true
end
Expand All @@ -256,16 +251,16 @@ local compatible_checkers = {
},

{ 3002000000, --[[ 3.2.0.0 ]]
function(config_table, dp_version, log_suffix)
local config_upstreams = config_table["upstreams"]
function(config, dp_version, log_suffix)
local config_upstreams = config.upstreams
if not config_upstreams then
return nil
end

local has_update
for _, t in ipairs(config_upstreams) do
if t["algorithm"] == "latency" then
t["algorithm"] = "round-robin"
if t.algorithm == "latency" then
t.algorithm = "round-robin"
has_update = true
end
end
Expand All @@ -282,16 +277,16 @@ local compatible_checkers = {
},

{ 3002000000, --[[ 3.2.0.0 ]]
function(config_table, dp_version, log_suffix)
local config_plugins = config_table["plugins"]
function(config, dp_version, log_suffix)
local config_plugins = config.plugins
if not config_plugins then
return nil
end

local has_update
for _, plugin in ipairs(config_plugins) do
if plugin["instance_name"] ~= nil then
plugin["instance_name"] = nil
if plugin.instance_name ~= nil then
plugin.instance_name = nil
has_update = true
end
end
Expand All @@ -308,16 +303,16 @@ local compatible_checkers = {
},

{ 3001000000, --[[ 3.1.0.0 ]]
function(config_table, dp_version, log_suffix)
local config_upstreams = config_table["upstreams"]
function(config, dp_version, log_suffix)
local config_upstreams = config.upstreams
if not config_upstreams then
return nil
end

local has_update
for _, t in ipairs(config_upstreams) do
if t["use_srv_name"] ~= nil then
t["use_srv_name"] = nil
if t.use_srv_name ~= nil then
t.use_srv_name = nil
has_update = true
end
end
Expand Down
Loading