Skip to content

Commit

Permalink
feat(clustering): entity streaming based cp-dp protocol
Browse files Browse the repository at this point in the history
Signed-off-by: Aapo Talvensaari <[email protected]>
  • Loading branch information
bungle committed Sep 21, 2023
1 parent 02cd600 commit 9da0bf6
Show file tree
Hide file tree
Showing 4 changed files with 224 additions and 151 deletions.
33 changes: 9 additions & 24 deletions kong/clustering/compat/init.lua
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
local cjson = require("cjson.safe")
local constants = require("kong.constants")
local meta = require("kong.meta")
local version = require("kong.clustering.compat.version")
Expand All @@ -10,8 +9,6 @@ local table_insert = table.insert
local table_sort = table.sort
local gsub = string.gsub
local split = utils.split
local deflate_gzip = utils.deflate_gzip
local cjson_encode = cjson.encode

local ngx = ngx
local ngx_log = ngx.log
Expand All @@ -24,10 +21,13 @@ local extract_major_minor = version.extract_major_minor

local _log_prefix = "[clustering] "


local REMOVED_FIELDS = require("kong.clustering.compat.removed_fields")
local COMPATIBILITY_CHECKERS = require("kong.clustering.compat.checkers")
local CLUSTERING_SYNC_STATUS = constants.CLUSTERING_SYNC_STATUS
local KONG_VERSION = meta.version
local KONG_VERSION_NUM = version_num(KONG_VERSION)


local EMPTY = {}

Expand Down Expand Up @@ -360,45 +360,30 @@ end

-- returns has_update, modified_deflated_payload, err
function _M.update_compatible_payload(payload, dp_version, log_suffix)
local cp_version_num = version_num(KONG_VERSION)
local dp_version_num = version_num(dp_version)

-- if the CP and DP have the same version, avoid the payload
-- copy and compatibility updates
if cp_version_num == dp_version_num then
return false
if KONG_VERSION_NUM == dp_version_num then
return payload
end

local has_update
payload = utils.cycle_aware_deep_copy(payload, true)
local config_table = payload["config_table"]

for _, checker in ipairs(COMPATIBILITY_CHECKERS) do
local ver = checker[1]
local fn = checker[2]
if dp_version_num < ver and fn(config_table, dp_version, log_suffix) then
has_update = true
if dp_version_num < ver then
fn(payload.config_table, dp_version, log_suffix)
end
end

local fields = get_removed_fields(dp_version_num)
if fields then
if invalidate_keys_from_config(config_table["plugins"], fields, log_suffix, dp_version_num) then
has_update = true
end
end

if has_update then
local deflated_payload, err = deflate_gzip(cjson_encode(payload))
if deflated_payload then
return true, deflated_payload

else
return true, nil, err
end
invalidate_keys_from_config(payload.config_table.plugins, fields, log_suffix, dp_version_num)
end

return false, nil, nil
return payload
end


Expand Down
155 changes: 85 additions & 70 deletions kong/clustering/control_plane.lua
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,17 @@ local compat = require("kong.clustering.compat")
local constants = require("kong.constants")
local events = require("kong.clustering.events")
local calculate_config_hash = require("kong.clustering.config_helper").calculate_config_hash
local get_topologically_sorted_schema_names = require("kong.db.declarative.export").get_topologically_sorted_schema_names


local error = error
local assert = assert
local string = string
local setmetatable = setmetatable
local type = type
local pcall = pcall
local pairs = pairs
local ipairs = ipairs
local ngx = ngx
local ngx_log = ngx.log
local timer_at = ngx.timer.at
Expand All @@ -41,7 +45,6 @@ local sleep = ngx.sleep
local plugins_list_to_map = compat.plugins_list_to_map
local update_compatible_payload = compat.update_compatible_payload
local deflate_gzip = utils.deflate_gzip
local yield = utils.yield
local connect_dp = clustering_utils.connect_dp


Expand All @@ -64,10 +67,10 @@ local _log_prefix = "[clustering] "
local no_connected_clients_logged


local function handle_export_deflated_reconfigure_payload(self)
local function handle_export_reconfigure_payload(self)
ngx_log(ngx_DEBUG, _log_prefix, "exporting config")

local ok, p_err, err = pcall(self.export_deflated_reconfigure_payload, self)
local ok, p_err, err = pcall(self.export_reconfigure_payload, self)
return ok, p_err or err
end

Expand All @@ -94,64 +97,35 @@ function _M.new(clustering)
end


function _M:export_deflated_reconfigure_payload()
local config_table, err = declarative.export_config()
if not config_table then
function _M:export_reconfigure_payload()
local config, err, plugins_configured = declarative.export_config()
if not config then
return nil, err
end

-- update plugins map
self.plugins_configured = {}
if config_table.plugins then
for _, plugin in pairs(config_table.plugins) do
self.plugins_configured[plugin.name] = true
end
end

-- store serialized plugins map for troubleshooting purposes
local shm_key_name = "clustering:cp_plugins_configured:worker_" .. worker_id()
kong_dict:set(shm_key_name, cjson_encode(self.plugins_configured))
kong_dict:set(shm_key_name, cjson_encode(plugins_configured))
ngx_log(ngx_DEBUG, "plugin configuration map key: " .. shm_key_name .. " configuration: ", kong_dict:get(shm_key_name))

local config_hash, hashes = calculate_config_hash(config_table)
local _, hashes = calculate_config_hash(config)

local payload = {
type = "reconfigure",
timestamp = ngx_now(),
config_table = config_table,
config_hash = config_hash,
self.plugins_configured = plugins_configured
self.reconfigure_payload = {
timestamp = ngx_now() * 1000,
config = config,
hashes = hashes,
}

self.reconfigure_payload = payload

payload, err = cjson_encode(payload)
if not payload then
return nil, err
end

yield()

payload, err = deflate_gzip(payload)
if not payload then
return nil, err
end

yield()

self.current_hashes = hashes
self.current_config_hash = config_hash
self.deflated_reconfigure_payload = payload

return payload, nil, config_hash
return true
end


function _M:push_config()
local start = ngx_now()

local payload, err = self:export_deflated_reconfigure_payload()
if not payload then
local ok, err = self:export_reconfigure_payload()
if not ok then
ngx_log(ngx_ERR, _log_prefix, "unable to export config from database: ", err)
return
end
Expand Down Expand Up @@ -271,13 +245,13 @@ function _M:handle_cp_websocket()
-- if clients table is empty, we might have skipped some config
-- push event in `push_config_loop`, which means the cached config
-- might be stale, so we always export the latest config again in this case
if isempty(self.clients) or not self.deflated_reconfigure_payload then
_, err = handle_export_deflated_reconfigure_payload(self)
if isempty(self.clients) or not self.reconfigure_payload then
_, err = handle_export_reconfigure_payload(self)
end

self.clients[wb] = queue

if self.deflated_reconfigure_payload then
if self.reconfigure_payload then
-- initial configuration compatibility for sync status variable
_, _, sync_status = self:check_configuration_compatibility({
dp_plugins_map = dp_plugins_map,
Expand Down Expand Up @@ -357,6 +331,13 @@ function _M:handle_cp_websocket()
end)

local write_thread = ngx.thread.spawn(function()
local function send_binary(tbl)
local _, err = wb:send_binary(cjson_encode(tbl))
if err then
return error("unable to send updated configuration to data plane: " .. err)
end
end

while not exiting() do
local ok, err = queue.wait(5)

Expand Down Expand Up @@ -413,30 +394,65 @@ function _M:handle_cp_websocket()
goto continue
end

local _, deflated_payload, err = update_compatible_payload(self.reconfigure_payload, dp_version, log_suffix)

if not deflated_payload then -- no modification or err, use the cached payload
deflated_payload = self.deflated_reconfigure_payload
end

if err then
ngx_log(ngx_WARN, "unable to update compatible payload: ", err, ", the unmodified config ",
"is returned", log_suffix)
end

-- config update
local _, err = wb:send_binary(deflated_payload)
if err then
if not is_timeout(err) then
return nil, "unable to send updated configuration to data plane: " .. err
local reconfigure_payload = update_compatible_payload(self.reconfigure_payload, dp_version, log_suffix)
local timetamp = reconfigure_payload.timestamp
local config = reconfigure_payload.config
local hashes = reconfigure_payload.hashes
local hash = hashes.config

send_binary({
type = "reconfigure:start",
hash = hash,
meta = {
_transform = config._transform,
_format_version = config._format_version,
}
})

local schema_names = get_topologically_sorted_schema_names()

local batch = kong.table.new(0, 1000)
for _, name in ipairs(schema_names) do
local entities = config[name]
if entities and not isempty(entities) then
local count = #entities
if count <= 1000 then
send_binary({
type = "entities",
hash = hash,
name = name,
data = assert(deflate_gzip(assert(cjson_encode(entities)))),
})

else

local i = 0
for j, entity in ipairs(entities) do
i = i + 1
batch[i] = entity
if i == 1000 or j == count then
send_binary({
type = "entities",
hash = hash,
name = name,
data = assert(deflate_gzip(assert(cjson_encode(batch)))),
})

kong.table.clear(batch)
i = 0
end
end
end
end

ngx_log(ngx_NOTICE, _log_prefix, "unable to send updated configuration to data plane: ", err, log_suffix)

else
ngx_log(ngx_DEBUG, _log_prefix, "sent config update to data plane", log_suffix)
end

send_binary({
type = "reconfigure:end",
hash = hash,
hashes = hashes,
timestamp = timetamp,
})

::continue::
end
end)
Expand Down Expand Up @@ -473,7 +489,7 @@ local function push_config_loop(premature, self, push_config_semaphore, delay)
return
end

local ok, err = handle_export_deflated_reconfigure_payload(self)
local ok, err = handle_export_reconfigure_payload(self)
if not ok then
ngx_log(ngx_ERR, _log_prefix, "unable to export initial config from database: ", err)
end
Expand Down Expand Up @@ -543,7 +559,6 @@ function _M:init_worker(basic_info)
self.plugins_list = plugins_list
self.plugins_map = plugins_list_to_map(plugins_list)

self.deflated_reconfigure_payload = nil
self.reconfigure_payload = nil
self.plugins_configured = {}
self.plugin_versions = {}
Expand Down
Loading

0 comments on commit 9da0bf6

Please sign in to comment.