From 9da0bf637e0f92c73dbcc4483f169aab31907520 Mon Sep 17 00:00:00 2001 From: Aapo Talvensaari Date: Fri, 15 Sep 2023 01:37:15 +0300 Subject: [PATCH] feat(clustering): entity streaming based cp-dp protocol Signed-off-by: Aapo Talvensaari --- kong/clustering/compat/init.lua | 33 ++----- kong/clustering/control_plane.lua | 155 ++++++++++++++++-------------- kong/clustering/data_plane.lua | 71 +++++++++----- kong/db/declarative/export.lua | 116 +++++++++++++++------- 4 files changed, 224 insertions(+), 151 deletions(-) diff --git a/kong/clustering/compat/init.lua b/kong/clustering/compat/init.lua index 9ae08eadc317..8624a349d83d 100644 --- a/kong/clustering/compat/init.lua +++ b/kong/clustering/compat/init.lua @@ -1,4 +1,3 @@ -local cjson = require("cjson.safe") local constants = require("kong.constants") local meta = require("kong.meta") local version = require("kong.clustering.compat.version") @@ -10,8 +9,6 @@ local table_insert = table.insert local table_sort = table.sort local gsub = string.gsub local split = utils.split -local deflate_gzip = utils.deflate_gzip -local cjson_encode = cjson.encode local ngx = ngx local ngx_log = ngx.log @@ -24,10 +21,13 @@ local extract_major_minor = version.extract_major_minor local _log_prefix = "[clustering] " + local REMOVED_FIELDS = require("kong.clustering.compat.removed_fields") local COMPATIBILITY_CHECKERS = require("kong.clustering.compat.checkers") local CLUSTERING_SYNC_STATUS = constants.CLUSTERING_SYNC_STATUS local KONG_VERSION = meta.version +local KONG_VERSION_NUM = version_num(KONG_VERSION) + local EMPTY = {} @@ -360,45 +360,30 @@ end -- returns has_update, modified_deflated_payload, err function _M.update_compatible_payload(payload, dp_version, log_suffix) - local cp_version_num = version_num(KONG_VERSION) local dp_version_num = version_num(dp_version) -- if the CP and DP have the same version, avoid the payload -- copy and compatibility updates - if cp_version_num == dp_version_num then - return false + if KONG_VERSION_NUM == dp_version_num then + return payload end - local has_update payload = utils.cycle_aware_deep_copy(payload, true) - local config_table = payload["config_table"] for _, checker in ipairs(COMPATIBILITY_CHECKERS) do local ver = checker[1] local fn = checker[2] - if dp_version_num < ver and fn(config_table, dp_version, log_suffix) then - has_update = true + if dp_version_num < ver then + fn(payload.config_table, dp_version, log_suffix) end end local fields = get_removed_fields(dp_version_num) if fields then - if invalidate_keys_from_config(config_table["plugins"], fields, log_suffix, dp_version_num) then - has_update = true - end - end - - if has_update then - local deflated_payload, err = deflate_gzip(cjson_encode(payload)) - if deflated_payload then - return true, deflated_payload - - else - return true, nil, err - end + invalidate_keys_from_config(payload.config_table.plugins, fields, log_suffix, dp_version_num) end - return false, nil, nil + return payload end diff --git a/kong/clustering/control_plane.lua b/kong/clustering/control_plane.lua index 2add807e88cf..ec08b68bdfb3 100644 --- a/kong/clustering/control_plane.lua +++ b/kong/clustering/control_plane.lua @@ -11,13 +11,17 @@ local compat = require("kong.clustering.compat") local constants = require("kong.constants") local events = require("kong.clustering.events") local calculate_config_hash = require("kong.clustering.config_helper").calculate_config_hash +local get_topologically_sorted_schema_names = require("kong.db.declarative.export").get_topologically_sorted_schema_names +local error = error +local assert = assert local string = string local setmetatable = setmetatable local type = type local pcall = pcall local pairs = pairs +local ipairs = ipairs local ngx = ngx local ngx_log = ngx.log local timer_at = ngx.timer.at @@ -41,7 +45,6 @@ local sleep = ngx.sleep local plugins_list_to_map = compat.plugins_list_to_map local update_compatible_payload = compat.update_compatible_payload local deflate_gzip = utils.deflate_gzip -local yield = utils.yield local connect_dp = clustering_utils.connect_dp @@ -64,10 +67,10 @@ local _log_prefix = "[clustering] " local no_connected_clients_logged -local function handle_export_deflated_reconfigure_payload(self) +local function handle_export_reconfigure_payload(self) ngx_log(ngx_DEBUG, _log_prefix, "exporting config") - local ok, p_err, err = pcall(self.export_deflated_reconfigure_payload, self) + local ok, p_err, err = pcall(self.export_reconfigure_payload, self) return ok, p_err or err end @@ -94,64 +97,35 @@ function _M.new(clustering) end -function _M:export_deflated_reconfigure_payload() - local config_table, err = declarative.export_config() - if not config_table then +function _M:export_reconfigure_payload() + local config, err, plugins_configured = declarative.export_config() + if not config then return nil, err end - -- update plugins map - self.plugins_configured = {} - if config_table.plugins then - for _, plugin in pairs(config_table.plugins) do - self.plugins_configured[plugin.name] = true - end - end - -- store serialized plugins map for troubleshooting purposes local shm_key_name = "clustering:cp_plugins_configured:worker_" .. worker_id() - kong_dict:set(shm_key_name, cjson_encode(self.plugins_configured)) + kong_dict:set(shm_key_name, cjson_encode(plugins_configured)) ngx_log(ngx_DEBUG, "plugin configuration map key: " .. shm_key_name .. " configuration: ", kong_dict:get(shm_key_name)) - local config_hash, hashes = calculate_config_hash(config_table) + local _, hashes = calculate_config_hash(config) - local payload = { - type = "reconfigure", - timestamp = ngx_now(), - config_table = config_table, - config_hash = config_hash, + self.plugins_configured = plugins_configured + self.reconfigure_payload = { + timestamp = ngx_now() * 1000, + config = config, hashes = hashes, } - self.reconfigure_payload = payload - - payload, err = cjson_encode(payload) - if not payload then - return nil, err - end - - yield() - - payload, err = deflate_gzip(payload) - if not payload then - return nil, err - end - - yield() - - self.current_hashes = hashes - self.current_config_hash = config_hash - self.deflated_reconfigure_payload = payload - - return payload, nil, config_hash + return true end function _M:push_config() local start = ngx_now() - local payload, err = self:export_deflated_reconfigure_payload() - if not payload then + local ok, err = self:export_reconfigure_payload() + if not ok then ngx_log(ngx_ERR, _log_prefix, "unable to export config from database: ", err) return end @@ -271,13 +245,13 @@ function _M:handle_cp_websocket() -- if clients table is empty, we might have skipped some config -- push event in `push_config_loop`, which means the cached config -- might be stale, so we always export the latest config again in this case - if isempty(self.clients) or not self.deflated_reconfigure_payload then - _, err = handle_export_deflated_reconfigure_payload(self) + if isempty(self.clients) or not self.reconfigure_payload then + _, err = handle_export_reconfigure_payload(self) end self.clients[wb] = queue - if self.deflated_reconfigure_payload then + if self.reconfigure_payload then -- initial configuration compatibility for sync status variable _, _, sync_status = self:check_configuration_compatibility({ dp_plugins_map = dp_plugins_map, @@ -357,6 +331,13 @@ function _M:handle_cp_websocket() end) local write_thread = ngx.thread.spawn(function() + local function send_binary(tbl) + local _, err = wb:send_binary(cjson_encode(tbl)) + if err then + return error("unable to send updated configuration to data plane: " .. err) + end + end + while not exiting() do local ok, err = queue.wait(5) @@ -413,30 +394,65 @@ function _M:handle_cp_websocket() goto continue end - local _, deflated_payload, err = update_compatible_payload(self.reconfigure_payload, dp_version, log_suffix) - - if not deflated_payload then -- no modification or err, use the cached payload - deflated_payload = self.deflated_reconfigure_payload - end - - if err then - ngx_log(ngx_WARN, "unable to update compatible payload: ", err, ", the unmodified config ", - "is returned", log_suffix) - end - - -- config update - local _, err = wb:send_binary(deflated_payload) - if err then - if not is_timeout(err) then - return nil, "unable to send updated configuration to data plane: " .. err + local reconfigure_payload = update_compatible_payload(self.reconfigure_payload, dp_version, log_suffix) + local timetamp = reconfigure_payload.timestamp + local config = reconfigure_payload.config + local hashes = reconfigure_payload.hashes + local hash = hashes.config + + send_binary({ + type = "reconfigure:start", + hash = hash, + meta = { + _transform = config._transform, + _format_version = config._format_version, + } + }) + + local schema_names = get_topologically_sorted_schema_names() + + local batch = kong.table.new(0, 1000) + for _, name in ipairs(schema_names) do + local entities = config[name] + if entities and not isempty(entities) then + local count = #entities + if count <= 1000 then + send_binary({ + type = "entities", + hash = hash, + name = name, + data = assert(deflate_gzip(assert(cjson_encode(entities)))), + }) + + else + + local i = 0 + for j, entity in ipairs(entities) do + i = i + 1 + batch[i] = entity + if i == 1000 or j == count then + send_binary({ + type = "entities", + hash = hash, + name = name, + data = assert(deflate_gzip(assert(cjson_encode(batch)))), + }) + + kong.table.clear(batch) + i = 0 + end + end + end end - - ngx_log(ngx_NOTICE, _log_prefix, "unable to send updated configuration to data plane: ", err, log_suffix) - - else - ngx_log(ngx_DEBUG, _log_prefix, "sent config update to data plane", log_suffix) end + send_binary({ + type = "reconfigure:end", + hash = hash, + hashes = hashes, + timestamp = timetamp, + }) + ::continue:: end end) @@ -473,7 +489,7 @@ local function push_config_loop(premature, self, push_config_semaphore, delay) return end - local ok, err = handle_export_deflated_reconfigure_payload(self) + local ok, err = handle_export_reconfigure_payload(self) if not ok then ngx_log(ngx_ERR, _log_prefix, "unable to export initial config from database: ", err) end @@ -543,7 +559,6 @@ function _M:init_worker(basic_info) self.plugins_list = plugins_list self.plugins_map = plugins_list_to_map(plugins_list) - self.deflated_reconfigure_payload = nil self.reconfigure_payload = nil self.plugins_configured = {} self.plugin_versions = {} diff --git a/kong/clustering/data_plane.lua b/kong/clustering/data_plane.lua index d0f0e1e020a9..8e33539935b5 100644 --- a/kong/clustering/data_plane.lua +++ b/kong/clustering/data_plane.lua @@ -14,6 +14,7 @@ local pl_stringx = require("pl.stringx") local assert = assert local setmetatable = setmetatable +local ipairs = ipairs local math = math local pcall = pcall local tostring = tostring @@ -26,7 +27,6 @@ local cjson_encode = cjson.encode local exiting = ngx.worker.exiting local ngx_time = ngx.time local inflate_gzip = utils.inflate_gzip -local yield = utils.yield local ngx_ERR = ngx.ERR @@ -194,29 +194,20 @@ function _M:communicate(premature) goto continue end - + local data = next_data if not data then goto continue end - - local msg = assert(inflate_gzip(data)) - yield() - msg = assert(cjson_decode(msg)) - yield() - - if msg.type ~= "reconfigure" then - goto continue - end - + ngx_log(ngx_DEBUG, _log_prefix, "received reconfigure frame from control plane", - msg.timestamp and " with timestamp: " .. msg.timestamp or "", + data.timestamp and " with timestamp: " .. data.timestamp or "", log_suffix) - local config_table = assert(msg.config_table) + local config = assert(data.config) local pok, res, err = pcall(config_helper.update, self.declarative_config, - config_table, msg.config_hash, msg.hashes) + config, data.hashes.config, data.hashes) if pok then ping_immediately = true end @@ -253,7 +244,8 @@ function _M:communicate(premature) local read_thread = ngx.thread.spawn(function() local last_seen = ngx_time() - + local config + local hash while not exiting() do local data, typ, err = c:recv_frame() if err then @@ -277,12 +269,47 @@ function _M:communicate(premature) last_seen = ngx_time() if typ == "binary" then - next_data = data - if config_semaphore:count() <= 0 then - -- the following line always executes immediately after the `if` check - -- because `:count` will never yield, end result is that the semaphore - -- count is guaranteed to not exceed 1 - config_semaphore:post() + local msg = assert(cjson_decode(data)) + + if msg.hash == declarative.get_current_hash() then + ngx_log(ngx_DEBUG, _log_prefix, "same config received from control plane, no need to reload", log_suffix) + goto continue + end + + if msg.type == "reconfigure:start" then + config = msg.meta + hash = msg.hash + + elseif msg.type == "entities" then + assert(hash == msg.hash, "reconfigure hash mismatch") + + local name = msg.name + local entities = assert(cjson_decode(inflate_gzip(msg.data))) + if config[name] then + local count = #config[name] + for i, entity in ipairs(entities) do + config[name][count + i] = entity + end + + else + config[name] = entities + end + + elseif msg.type == "reconfigure:end" then + assert(hash == msg.hash, "reconfigure hash mismatch") + + next_data = { + config = config, + hashes = msg.hashes, + timestamp = msg.timestamp + } + + if config_semaphore:count() <= 0 then + -- the following line always executes immediately after the `if` check + -- because `:count` will never yield, end result is that the semaphore + -- count is guaranteed to not exceed 1 + config_semaphore:post() + end end goto continue diff --git a/kong/db/declarative/export.lua b/kong/db/declarative/export.lua index c3b6b8c1366b..81c799e1059b 100644 --- a/kong/db/declarative/export.lua +++ b/kong/db/declarative/export.lua @@ -8,6 +8,7 @@ local assert = assert local type = type local pcall = pcall local pairs = pairs +local ipairs = ipairs local insert = table.insert local io_open = io.open local null = ngx.null @@ -15,6 +16,8 @@ local null = ngx.null local REMOVE_FIRST_LINE_PATTERN = "^[^\n]+\n(.+)$" local GLOBAL_QUERY_OPTS = { nulls = true, workspace = null } +local TOPOLOGICALLY_SORTED_SCHEMA_NAMES +local FOREIGN_KEY_NAMES = {} local function convert_nulls(tbl, from, to) @@ -68,6 +71,70 @@ local function to_yaml_file(entities, filename) end +local function get_foreign_key_names(name) + if FOREIGN_KEY_NAMES[name] then + return FOREIGN_KEY_NAMES[name] + end + + local dao = kong.db.daos[name] + if not dao then + kong.log.err("unable to find dao with name '", name, "'") + return {} + end + + local schema = dao.schema + + local fks = {} + for field_name, field in schema:each_field() do + if field.type == "foreign" then + insert(fks, field_name) + end + end + + FOREIGN_KEY_NAMES[name] = fks + + return fks +end + + +local function get_topologically_sorted_schema_names(skip_ws) + if skip_ws == nil then + skip_ws = true + end + + if TOPOLOGICALLY_SORTED_SCHEMA_NAMES then + -- TODO: what about skip_ws? + return TOPOLOGICALLY_SORTED_SCHEMA_NAMES + end + + local schemas = {} + for _, dao in pairs(kong.db.daos) do + local schema = dao.schema + if schema.db_export == false or (skip_ws and schema.name == "workspaces") then + goto continue + end + + insert(schemas, schema) + + ::continue:: + end + + local schemas, err = schema_topological_sort(schemas) + if not schemas then + kong.log.err("topological sort of schemas failed (", err, ")") + return {} + end + + for i, schema in ipairs(schemas) do + schemas[i] = schema.name + end + + TOPOLOGICALLY_SORTED_SCHEMA_NAMES = schemas + + return schemas +end + + local function begin_transaction(db) if db.strategy == "postgres" then local ok, err = db.connector:connect("read") @@ -96,23 +163,8 @@ end local function export_from_db_impl(emitter, skip_ws, skip_disabled_entities, expand_foreigns) - local schemas = {} - local db = kong.db - - for _, dao in pairs(db.daos) do - if not (skip_ws and dao.schema.name == "workspaces") then - insert(schemas, dao.schema) - end - end - - local sorted_schemas, err = schema_topological_sort(schemas) - if not sorted_schemas then - return nil, err - end - - local ok - ok, err = begin_transaction(db) + local ok, err = begin_transaction(db) if not ok then return nil, err end @@ -121,23 +173,13 @@ local function export_from_db_impl(emitter, skip_ws, skip_disabled_entities, exp _format_version = "3.0", _transform = false, }) - + + local schema_names = get_topologically_sorted_schema_names(skip_ws) + local plugins_configured = {} local disabled_services = {} local disabled_routes = {} - for i = 1, #sorted_schemas do - local schema = sorted_schemas[i] - if schema.db_export == false then - goto continue - end - - local name = schema.name - local fks = {} - for field_name, field in schema:each_field() do - if field.type == "foreign" then - insert(fks, field_name) - end - end - + for _, name in ipairs(schema_names) do + local fks = get_foreign_key_names(name) local page_size if db[name].pagination then page_size = db[name].pagination.max_page_size @@ -150,7 +192,7 @@ local function export_from_db_impl(emitter, skip_ws, skip_disabled_entities, exp end -- do not export disabled services and disabled plugins when skip_disabled_entities - -- as well do not export plugins and routes of dsiabled services + -- as well do not export plugins and routes of disabled services if skip_disabled_entities and name == "services" and not row.enabled then disabled_services[row.id] = true @@ -176,18 +218,20 @@ local function export_from_db_impl(emitter, skip_ws, skip_disabled_entities, exp end end end + + if name == "plugins" then + plugins_configured[row.name] = true + end emitter:emit_entity(name, row) end ::skip_emit:: end - - ::continue:: end end_transaction(db) - return emitter:done() + return (emitter:done()), nil, plugins_configured end @@ -349,4 +393,6 @@ return { export_config_proto = export_config_proto, sanitize_output = sanitize_output, + + get_topologically_sorted_schema_names = get_topologically_sorted_schema_names, }