From 398e180145c8b5123a8cd884328e318d8fe79914 Mon Sep 17 00:00:00 2001 From: Datong Sun Date: Wed, 16 Oct 2024 22:15:08 +0800 Subject: [PATCH] feat(clustering): introduce incremental sync for clustering (#13157) * Revert "fix(rpc): disable cluster_rpc for 3.7" This reverts commit ddda6a1f2abbd1c8030a4325a807a17755a8bd19. This commit introduces a freshly redesigned DB-less mode that is more efficient when storing data, more efficient when accessing data and incrementally update able. This commit also introduces the "incremental sync" RPC server/client. It introduces the `kong.sync.v2` RPC capability that is used by CP to notify new version updates to DP and for DP to pull config delta. DP applies these config delta into the LMDB incrementally and transactionally which avoids the expensive config flip and cache wipe majority of the time. This commit also modifies the DAO so that for the CP, config diff logs are generated transactionally whenever entity modification occurs. Finally, this commit modifies the `off` strategy so that it works with the redesigned DB-less mode and storage format. Incremental sync is not yet enabled by default, it can be enabled by setting `cluster_incremental_sync = on` via `kong.conf`. KAG-4865 KAG-2986 KAG-2987 KAG-3502 KAG-3258 KAG-5283 --------- Co-authored-by: Chrono Co-authored-by: Xiaochen Wang --- changelog/unreleased/kong/cp-dp-rpc.yml | 3 + .../unreleased/kong/dynamic-log-level-rpc.yml | 6 + kong-3.9.0-0.rockspec | 9 +- kong/clustering/rpc/manager.lua | 19 +- kong/clustering/services/sync/hooks.lua | 179 +++++++ kong/clustering/services/sync/init.lua | 63 +++ kong/clustering/services/sync/rpc.lua | 354 ++++++++++++++ .../services/sync/strategies/postgres.lua | 130 ++++++ kong/conf_loader/constants.lua | 1 + kong/conf_loader/init.lua | 7 +- kong/constants.lua | 3 + kong/db/dao/init.lua | 6 + kong/db/dao/workspaces.lua | 23 + kong/db/declarative/export.lua | 40 ++ kong/db/declarative/import.lua | 441 ++++++++---------- kong/db/declarative/init.lua | 7 + kong/db/migrations/core/024_370_to_380.lua | 22 + kong/db/migrations/core/init.lua | 1 + kong/db/schema/others/declarative_config.lua | 36 +- kong/db/strategies/connector.lua | 4 +- kong/db/strategies/off/init.lua | 299 +++++------- kong/db/strategies/off/tags.lua | 11 - kong/init.lua | 27 +- kong/pdk/vault.lua | 14 +- kong/runloop/events.lua | 6 +- kong/runloop/handler.lua | 100 ++-- kong/templates/kong_defaults.lua | 3 +- spec/01-unit/01-db/04-dao_spec.lua | 2 +- spec/01-unit/01-db/10-declarative_spec.lua | 9 +- .../01-db/11-declarative_lmdb_spec.lua | 24 +- spec/01-unit/04-prefix_handler_spec.lua | 17 +- .../04-admin_api/15-off_spec.lua | 32 +- .../02-integration/07-sdk/03-cluster_spec.lua | 8 +- .../09-hybrid_mode/01-sync_spec.lua | 26 +- .../09-hybrid_mode/03-pki_spec.lua | 15 +- .../04-cp_cluster_sync_spec.lua | 9 +- .../09-hybrid_mode/05-ocsp_spec.lua | 22 +- .../09-hybrid_mode/08-lazy_export_spec.lua | 35 +- .../09-hybrid_mode/09-config-compat_spec.lua | 4 + .../09-node-id-persistence_spec.lua | 9 +- .../09-hybrid_mode/10-forward-proxy_spec.lua | 10 +- .../09-hybrid_mode/11-status_spec.lua | 12 +- .../09-hybrid_mode/12-errors_spec.lua | 8 +- .../09-hybrid_mode/13-deprecations_spec.lua | 8 +- .../18-hybrid_rpc/01-rpc_spec.lua | 40 +- .../18-hybrid_rpc/02-log-level_spec.lua | 36 +- .../18-hybrid_rpc/03-inert_spec.lua | 19 +- .../18-hybrid_rpc/04-concentrator_spec.lua | 11 +- .../20-wasm/06-clustering_spec.lua | 9 +- .../20-wasm/10-wasmtime_spec.lua | 8 + .../09-key-auth/04-hybrid_mode_spec.lua | 10 +- .../11-correlation-id/02-schema_spec.lua | 10 +- .../migrations/core/024_370_to_380_spec.lua | 17 + spec/internal/db.lua | 8 +- 54 files changed, 1604 insertions(+), 628 deletions(-) create mode 100644 changelog/unreleased/kong/cp-dp-rpc.yml create mode 100644 changelog/unreleased/kong/dynamic-log-level-rpc.yml create mode 100644 kong/clustering/services/sync/hooks.lua create mode 100644 kong/clustering/services/sync/init.lua create mode 100644 kong/clustering/services/sync/rpc.lua create mode 100644 kong/clustering/services/sync/strategies/postgres.lua create mode 100644 kong/db/migrations/core/024_370_to_380.lua delete mode 100644 kong/db/strategies/off/tags.lua create mode 100644 spec/05-migration/db/migrations/core/024_370_to_380_spec.lua diff --git a/changelog/unreleased/kong/cp-dp-rpc.yml b/changelog/unreleased/kong/cp-dp-rpc.yml new file mode 100644 index 000000000000..cb8efa9d1bcf --- /dev/null +++ b/changelog/unreleased/kong/cp-dp-rpc.yml @@ -0,0 +1,3 @@ +message: "Added a remote procedure call (RPC) framework for Hybrid mode deployments." +type: feature +scope: Clustering diff --git a/changelog/unreleased/kong/dynamic-log-level-rpc.yml b/changelog/unreleased/kong/dynamic-log-level-rpc.yml new file mode 100644 index 000000000000..69096eb0afe1 --- /dev/null +++ b/changelog/unreleased/kong/dynamic-log-level-rpc.yml @@ -0,0 +1,6 @@ +message: | + Dynamic log level over Hybrid mode RPC which allows setting DP log level + to a different level for specified duration before reverting back + to the `kong.conf` configured value. +type: feature +scope: Clustering diff --git a/kong-3.9.0-0.rockspec b/kong-3.9.0-0.rockspec index c0cc4c02d11e..b9c9d121764b 100644 --- a/kong-3.9.0-0.rockspec +++ b/kong-3.9.0-0.rockspec @@ -88,7 +88,6 @@ build = { ["kong.clustering.compat.checkers"] = "kong/clustering/compat/checkers.lua", ["kong.clustering.config_helper"] = "kong/clustering/config_helper.lua", ["kong.clustering.tls"] = "kong/clustering/tls.lua", - ["kong.clustering.services.debug"] = "kong/clustering/services/debug.lua", ["kong.clustering.rpc.callbacks"] = "kong/clustering/rpc/callbacks.lua", ["kong.clustering.rpc.future"] = "kong/clustering/rpc/future.lua", @@ -99,6 +98,12 @@ build = { ["kong.clustering.rpc.utils"] = "kong/clustering/rpc/utils.lua", ["kong.clustering.rpc.concentrator"] = "kong/clustering/rpc/concentrator.lua", + ["kong.clustering.services.debug"] = "kong/clustering/services/debug.lua", + ["kong.clustering.services.sync"] = "kong/clustering/services/sync/init.lua", + ["kong.clustering.services.sync.rpc"] = "kong/clustering/services/sync/rpc.lua", + ["kong.clustering.services.sync.hooks"] = "kong/clustering/services/sync/hooks.lua", + ["kong.clustering.services.sync.strategies.postgres"] = "kong/clustering/services/sync/strategies/postgres.lua", + ["kong.cluster_events"] = "kong/cluster_events/init.lua", ["kong.cluster_events.strategies.postgres"] = "kong/cluster_events/strategies/postgres.lua", ["kong.cluster_events.strategies.off"] = "kong/cluster_events/strategies/off.lua", @@ -289,7 +294,6 @@ build = { ["kong.db.strategies.postgres.plugins"] = "kong/db/strategies/postgres/plugins.lua", ["kong.db.strategies.off"] = "kong/db/strategies/off/init.lua", ["kong.db.strategies.off.connector"] = "kong/db/strategies/off/connector.lua", - ["kong.db.strategies.off.tags"] = "kong/db/strategies/off/tags.lua", ["kong.db.migrations.state"] = "kong/db/migrations/state.lua", ["kong.db.migrations.subsystems"] = "kong/db/migrations/subsystems.lua", @@ -316,6 +320,7 @@ build = { ["kong.db.migrations.core.021_340_to_350"] = "kong/db/migrations/core/021_340_to_350.lua", ["kong.db.migrations.core.022_350_to_360"] = "kong/db/migrations/core/022_350_to_360.lua", ["kong.db.migrations.core.023_360_to_370"] = "kong/db/migrations/core/023_360_to_370.lua", + ["kong.db.migrations.core.024_370_to_380"] = "kong/db/migrations/core/024_370_to_380.lua", ["kong.db.migrations.operations.200_to_210"] = "kong/db/migrations/operations/200_to_210.lua", ["kong.db.migrations.operations.212_to_213"] = "kong/db/migrations/operations/212_to_213.lua", ["kong.db.migrations.operations.280_to_300"] = "kong/db/migrations/operations/280_to_300.lua", diff --git a/kong/clustering/rpc/manager.lua b/kong/clustering/rpc/manager.lua index 5104fdab7235..7881b1661ffe 100644 --- a/kong/clustering/rpc/manager.lua +++ b/kong/clustering/rpc/manager.lua @@ -42,6 +42,7 @@ function _M.new(conf, node_id) -- clients[node_id]: { socket1 => true, socket2 => true, ... } clients = {}, client_capabilities = {}, + client_ips = {}, -- store DP node's ip addr node_id = node_id, conf = conf, cluster_cert = assert(clustering_tls.get_cluster_cert(conf)), @@ -75,16 +76,18 @@ end function _M:_remove_socket(socket) - local sockets = assert(self.clients[socket.node_id]) + local node_id = socket.node_id + local sockets = assert(self.clients[node_id]) assert(sockets[socket]) sockets[socket] = nil if table_isempty(sockets) then - self.clients[socket.node_id] = nil - self.client_capabilities[socket.node_id] = nil - assert(self.concentrator:_enqueue_unsubscribe(socket.node_id)) + self.clients[node_id] = nil + self.client_ips[node_id] = nil + self.client_capabilities[node_id] = nil + assert(self.concentrator:_enqueue_unsubscribe(node_id)) end end @@ -255,6 +258,9 @@ function _M:handle_websocket() local s = socket.new(self, wb, node_id) self:_add_socket(s, rpc_capabilities) + -- store DP's ip addr + self.client_ips[node_id] = ngx_var.remote_addr + s:start() local res, err = s:join() self:_remove_socket(s) @@ -362,4 +368,9 @@ function _M:get_peers() end +function _M:get_peer_ip(node_id) + return self.client_ips[node_id] +end + + return _M diff --git a/kong/clustering/services/sync/hooks.lua b/kong/clustering/services/sync/hooks.lua new file mode 100644 index 000000000000..7a3a1402558d --- /dev/null +++ b/kong/clustering/services/sync/hooks.lua @@ -0,0 +1,179 @@ +local _M = {} +local _MT = { __index = _M, } + + +local hooks = require("kong.hooks") +local EMPTY = require("kong.tools.table").EMPTY + + +local ipairs = ipairs +local ngx_null = ngx.null +local ngx_log = ngx.log +local ngx_ERR = ngx.ERR +local ngx_DEBUG = ngx.DEBUG + + +local DEFAULT_PAGE_SIZE = 512 + + +function _M.new(strategy) + local self = { + strategy = strategy, + } + + return setmetatable(self, _MT) +end + + +local function get_all_nodes_with_sync_cap() + local res, err = kong.db.clustering_data_planes:page(DEFAULT_PAGE_SIZE) + if err then + return nil, "unable to query DB " .. err + end + + if not res then + return EMPTY + end + + local ret = {} + local ret_n = 0 + + for _, row in ipairs(res) do + for _, c in ipairs(row.rpc_capabilities) do + if c == "kong.sync.v2" then + ret_n = ret_n + 1 + ret[ret_n] = row.id + break + end + end + end + + return ret +end + + +function _M:notify_all_nodes() + local latest_version, err = self.strategy:get_latest_version() + if not latest_version then + ngx_log(ngx_ERR, "can not get the latest version: ", err) + return + end + + local msg = { default = { new_version = latest_version, }, } + + for _, node in ipairs(get_all_nodes_with_sync_cap()) do + local res, err = kong.rpc:call(node, "kong.sync.v2.notify_new_version", msg) + if not res then + if not err:find("requested capability does not exist", nil, true) then + ngx_log(ngx_ERR, "unable to notify new version: ", err) + end + + else + ngx_log(ngx_DEBUG, "notified ", node, " ", latest_version) + end + end +end + + +function _M:entity_delta_writer(row, name, options, ws_id, is_delete) + local deltas = { + { + type = name, + id = row.id, + ws_id = ws_id, + row = is_delete and ngx_null or row, + }, + } + + local res, err = self.strategy:insert_delta(deltas) + if not res then + self.strategy:cancel_txn() + return nil, err + end + + res, err = self.strategy:commit_txn() + if not res then + self.strategy:cancel_txn() + return nil, err + end + + self:notify_all_nodes() + + return row -- for other hooks +end + + +-- only control plane has these delta operations +function _M:register_dao_hooks() + local function is_db_export(name) + local db_export = kong.db[name].schema.db_export + return db_export == nil or db_export == true + end + + -- common hook functions (pre/fail/post) + + local function pre_hook_func(entity, name, options) + if not is_db_export(name) then + return true + end + + return self.strategy:begin_txn() + end + + local function fail_hook_func(err, entity, name) + if not is_db_export(name) then + return + end + + local res, err = self.strategy:cancel_txn() + if not res then + ngx_log(ngx_ERR, "unable to cancel cancel_txn: ", tostring(err)) + end + end + + local function post_hook_writer_func(row, name, options, ws_id) + if not is_db_export(name) then + return row + end + + return self:entity_delta_writer(row, name, options, ws_id) + end + + local function post_hook_delete_func(row, name, options, ws_id, cascade_entries) + if not is_db_export(name) then + return row + end + + -- set lmdb value to ngx_null then return row + return self:entity_delta_writer(row, name, options, ws_id, true) + end + + local dao_hooks = { + -- dao:insert + ["dao:insert:pre"] = pre_hook_func, + ["dao:insert:fail"] = fail_hook_func, + ["dao:insert:post"] = post_hook_writer_func, + + -- dao:delete + ["dao:delete:pre"] = pre_hook_func, + ["dao:delete:fail"] = fail_hook_func, + ["dao:delete:post"] = post_hook_delete_func, + + -- dao:update + ["dao:update:pre"] = pre_hook_func, + ["dao:update:fail"] = fail_hook_func, + ["dao:update:post"] = post_hook_writer_func, + + -- dao:upsert + ["dao:upsert:pre"] = pre_hook_func, + ["dao:upsert:fail"] = fail_hook_func, + ["dao:upsert:post"] = post_hook_writer_func, + } + + for ev, func in pairs(dao_hooks) do + hooks.register_hook(ev, func) + end +end + + +return _M diff --git a/kong/clustering/services/sync/init.lua b/kong/clustering/services/sync/init.lua new file mode 100644 index 000000000000..40f1b836241b --- /dev/null +++ b/kong/clustering/services/sync/init.lua @@ -0,0 +1,63 @@ +local _M = {} +local _MT = { __index = _M, } + + +local events = require("kong.clustering.events") +local strategy = require("kong.clustering.services.sync.strategies.postgres") +local rpc = require("kong.clustering.services.sync.rpc") + + +-- TODO: what is the proper value? +local FIRST_SYNC_DELAY = 0.5 -- seconds +local EACH_SYNC_DELAY = 30 -- seconds + + +function _M.new(db, is_cp) + local strategy = strategy.new(db) + + local self = { + db = db, + strategy = strategy, + rpc = rpc.new(strategy), + is_cp = is_cp, + } + + -- only cp needs hooks + if is_cp then + self.hooks = require("kong.clustering.services.sync.hooks").new(strategy) + end + + return setmetatable(self, _MT) +end + + +function _M:init(manager) + if self.hooks then + self.hooks:register_dao_hooks() + end + self.rpc:init(manager, self.is_cp) +end + + +function _M:init_worker() + -- is CP, enable clustering broadcasts + if self.is_cp then + events.init() + + self.strategy:init_worker() + return + end + + -- is DP, sync only in worker 0 + if ngx.worker.id() ~= 0 then + return + end + + -- sync to CP ASAP + assert(self.rpc:sync_once(FIRST_SYNC_DELAY)) + + assert(self.rpc:sync_every(EACH_SYNC_DELAY)) +end + + +return _M diff --git a/kong/clustering/services/sync/rpc.lua b/kong/clustering/services/sync/rpc.lua new file mode 100644 index 000000000000..3d3ec5360890 --- /dev/null +++ b/kong/clustering/services/sync/rpc.lua @@ -0,0 +1,354 @@ +local _M = {} +local _MT = { __index = _M, } + + +local txn = require("resty.lmdb.transaction") +local declarative = require("kong.db.declarative") +local constants = require("kong.constants") +local concurrency = require("kong.concurrency") + + +local insert_entity_for_txn = declarative.insert_entity_for_txn +local delete_entity_for_txn = declarative.delete_entity_for_txn +local DECLARATIVE_HASH_KEY = constants.DECLARATIVE_HASH_KEY +local CLUSTERING_SYNC_STATUS = constants.CLUSTERING_SYNC_STATUS +local SYNC_MUTEX_OPTS = { name = "get_delta", timeout = 0, } + + +local pairs = pairs +local ipairs = ipairs +local fmt = string.format +local ngx_null = ngx.null +local ngx_log = ngx.log +local ngx_ERR = ngx.ERR +local ngx_INFO = ngx.INFO +local ngx_DEBUG = ngx.DEBUG + + +-- number of versions behind before a full sync is forced +local FULL_SYNC_THRESHOLD = 512 + + +function _M.new(strategy) + local self = { + strategy = strategy, + } + + return setmetatable(self, _MT) +end + + +function _M:init_cp(manager) + -- CP + -- Method: kong.sync.v2.get_delta + -- Params: versions: list of current versions of the database + -- { { namespace = "default", version = 1000, }, } + local purge_delay = manager.conf.cluster_data_plane_purge_delay + + local function gen_delta_result(res, wipe) + return { default = { deltas = res, wipe = wipe, }, } + end + + manager.callbacks:register("kong.sync.v2.get_delta", function(node_id, current_versions) + ngx_log(ngx_DEBUG, "[kong.sync.v2] config push (connected client)") + + local rpc_peers + if kong.rpc then + rpc_peers = kong.rpc:get_peers() + end + + local default_namespace + for namespace, v in pairs(current_versions) do + if namespace == "default" then + default_namespace = v + break + end + end + + if not default_namespace then + return nil, "default namespace does not exist inside params" + end + + -- { { namespace = "default", version = 1000, }, } + local default_namespace_version = default_namespace.version + + -- XXX TODO: follow update_sync_status() in control_plane.lua + local ok, err = kong.db.clustering_data_planes:upsert({ id = node_id }, { + last_seen = ngx.time(), + hostname = node_id, + ip = kong.rpc:get_peer_ip(node_id), -- try to get the correct ip + version = "3.8.0.0", -- XXX TODO: get from rpc call + sync_status = CLUSTERING_SYNC_STATUS.NORMAL, + config_hash = fmt("%032d", default_namespace_version), + rpc_capabilities = rpc_peers and rpc_peers[node_id] or {}, + }, { ttl = purge_delay }) + if not ok then + ngx_log(ngx_ERR, "unable to update clustering data plane status: ", err) + end + + local latest_version, err = self.strategy:get_latest_version() + if not latest_version then + return nil, err + end + + -- is the node empty? If so, just do a full sync to bring it up to date faster + if default_namespace_version == 0 or + latest_version - default_namespace_version > FULL_SYNC_THRESHOLD + then + -- we need to full sync because holes are found + + ngx_log(ngx_INFO, + "[kong.sync.v2] database is empty or too far behind for node_id: ", node_id, + ", current_version: ", default_namespace_version, + ", forcing a full sync") + + + local deltas, err = declarative.export_config_sync() + if not deltas then + return nil, err + end + + -- wipe dp lmdb, full sync + return gen_delta_result(deltas, true) + end + + local res, err = self.strategy:get_delta(default_namespace_version) + if not res then + return nil, err + end + + if #res == 0 then + ngx_log(ngx_DEBUG, + "[kong.sync.v2] no delta for node_id: ", node_id, + ", current_version: ", default_namespace_version, + ", node is already up to date" ) + return gen_delta_result(res, false) + end + + -- some deltas are returned, are they contiguous? + if res[1].version == default_namespace.version + 1 then + -- doesn't wipe dp lmdb, incremental sync + return gen_delta_result(res, false) + end + + -- we need to full sync because holes are found + -- in the delta, meaning the oldest version is no longer + -- available + + ngx_log(ngx_INFO, + "[kong.sync.v2] delta for node_id no longer available: ", node_id, + ", current_version: ", default_namespace_version, + ", forcing a full sync") + + local deltas, err = declarative.export_config_sync() + if not deltas then + return nil, err + end + + -- wipe dp lmdb, full sync + return gen_delta_result(deltas, true) + end) +end + + +function _M:init_dp(manager) + -- DP + -- Method: kong.sync.v2.notify_new_version + -- Params: new_versions: list of namespaces and their new versions, like: + -- { { new_version = 1000, }, }, possible field: namespace = "default" + manager.callbacks:register("kong.sync.v2.notify_new_version", function(node_id, new_versions) + -- TODO: currently only default is supported, and anything else is ignored + local default_new_version = new_versions.default + if not default_new_version then + return nil, "default namespace does not exist inside params" + end + + local version = default_new_version.new_version + if not version then + return nil, "'new_version' key does not exist" + end + + local lmdb_ver = tonumber(declarative.get_current_hash()) or 0 + if lmdb_ver < version then + return self:sync_once() + end + + return true + end) +end + + +function _M:init(manager, is_cp) + if is_cp then + self:init_cp(manager) + else + self:init_dp(manager) + end +end + + +local function do_sync() + local ns_deltas, err = kong.rpc:call("control_plane", "kong.sync.v2.get_delta", + { default = + { version = + tonumber(declarative.get_current_hash()) or 0, + }, + }) + if not ns_deltas then + ngx_log(ngx_ERR, "sync get_delta error: ", err) + return true + end + + local ns_delta + + for namespace, delta in pairs(ns_deltas) do + if namespace == "default" then + ns_delta = delta + break -- should we break here? + end + end + + if not ns_delta then + return nil, "default namespace does not exist inside params" + end + + if #ns_delta.deltas == 0 then + ngx_log(ngx_DEBUG, "no delta to sync") + return true + end + + local t = txn.begin(512) + + local wipe = ns_delta.wipe + if wipe then + t:db_drop(false) + end + + local db = kong.db + + local version = 0 + local crud_events = {} + local crud_events_n = 0 + + for _, delta in ipairs(ns_delta.deltas) do + local delta_type = delta.type + local delta_row = delta.row + local ev + + if delta_row ~= ngx_null then + -- upsert the entity + -- does the entity already exists? + local old_entity, err = db[delta_type]:select(delta_row) + if err then + return nil, err + end + + local crud_event_type = old_entity and "update" or "create" + + -- If we will wipe lmdb, we don't need to delete it from lmdb. + if old_entity and not wipe then + local res, err = delete_entity_for_txn(t, delta_type, old_entity, nil) + if not res then + return nil, err + end + end + + local res, err = insert_entity_for_txn(t, delta_type, delta_row, nil) + if not res then + return nil, err + end + + ev = { delta_type, crud_event_type, delta_row, old_entity, } + + else + -- delete the entity + local old_entity, err = kong.db[delta_type]:select({ id = delta.id, }) -- TODO: composite key + if err then + return nil, err + end + + -- If we will wipe lmdb, we don't need to delete it from lmdb. + if old_entity and not wipe then + local res, err = delete_entity_for_txn(t, delta_type, old_entity, nil) + if not res then + return nil, err + end + end + + ev = { delta_type, "delete", old_entity, } + end + + crud_events_n = crud_events_n + 1 + crud_events[crud_events_n] = ev + + -- XXX TODO: could delta.version be nil or ngx.null + if type(delta.version) == "number" and delta.version ~= version then + version = delta.version + end + end -- for _, delta + + t:set(DECLARATIVE_HASH_KEY, fmt("%032d", version)) + local ok, err = t:commit() + if not ok then + return nil, err + end + + if wipe then + kong.core_cache:purge() + kong.cache:purge() + + else + for _, event in ipairs(crud_events) do + -- delta_type, crud_event_type, delta.row, old_entity + db[event[1]]:post_crud_event(event[2], event[3], event[4]) + end + end + + return true +end + + +local function sync_handler(premature) + if premature then + return + end + + local res, err = concurrency.with_worker_mutex(SYNC_MUTEX_OPTS, function() + -- here must be 2 times + for _ = 1, 2 do + local ok, err = do_sync() + if not ok then + return nil, err + end + end -- for + + return true + end) + if not res and err ~= "timeout" then + ngx_log(ngx_ERR, "unable to create worker mutex and sync: ", err) + end +end + + +local function start_sync_timer(timer_func, delay) + local hdl, err = timer_func(delay, sync_handler) + + if not hdl then + return nil, err + end + + return true +end + + +function _M:sync_once(delay) + return start_sync_timer(ngx.timer.at, delay or 0) +end + + +function _M:sync_every(delay) + return start_sync_timer(ngx.timer.every, delay) +end + + +return _M diff --git a/kong/clustering/services/sync/strategies/postgres.lua b/kong/clustering/services/sync/strategies/postgres.lua new file mode 100644 index 000000000000..39c550b8ffb0 --- /dev/null +++ b/kong/clustering/services/sync/strategies/postgres.lua @@ -0,0 +1,130 @@ +local _M = {} +local _MT = { __index = _M } + + +local cjson = require("cjson.safe") +local buffer = require("string.buffer") + + +local string_format = string.format +local cjson_encode = cjson.encode +local ngx_log = ngx.log +local ngx_ERR = ngx.ERR +local ngx_DEBUG = ngx.DEBUG + + +local CLEANUP_VERSION_COUNT = 100 +local CLEANUP_TIME_DELAY = 3600 -- 1 hour + + +function _M.new(db) + local self = { + connector = db.connector, + } + + return setmetatable(self, _MT) +end + + +local PURGE_QUERY = [[ + DELETE FROM clustering_sync_version + WHERE "version" < ( + SELECT MAX("version") - %d + FROM clustering_sync_version + ); +]] + + +function _M:init_worker() + local function cleanup_handler(premature) + if premature then + ngx_log(ngx_DEBUG, "[incremental] worker exiting, killing incremental cleanup timer") + + return + end + + local res, err = self.connector:query(string_format(PURGE_QUERY, CLEANUP_VERSION_COUNT)) + if not res then + ngx_log(ngx_ERR, + "[incremental] unable to purge old data from incremental delta table, err: ", + err) + + return + end + + ngx_log(ngx_DEBUG, + "[incremental] successfully purged old data from incremental delta table") + end + + assert(ngx.timer.every(CLEANUP_TIME_DELAY, cleanup_handler)) +end + + +local NEW_VERSION_QUERY = [[ + DO $$ + DECLARE + new_version integer; + BEGIN + INSERT INTO clustering_sync_version DEFAULT VALUES RETURNING version INTO new_version; + INSERT INTO clustering_sync_delta (version, type, id, ws_id, row) VALUES %s; + END $$; +]] + + +-- deltas: { +-- { type = "service", "id" = "d78eb00f-8702-4d6a-bfd9-e005f904ae3e", "ws_id" = "73478cf6-964f-412d-b1c4-8ac88d9e85e9", row = "JSON", } +-- { type = "route", "id" = "0a5bac5c-b795-4981-95d2-919ba3390b7e", "ws_id" = "73478cf6-964f-412d-b1c4-8ac88d9e85e9", row = "JSON", } +-- } +function _M:insert_delta(deltas) + local buf = buffer.new() + for _, d in ipairs(deltas) do + buf:putf("(new_version, %s, %s, %s, %s)", + self.connector:escape_literal(d.type), + self.connector:escape_literal(d.id), + self.connector:escape_literal(d.ws_id), + self.connector:escape_literal(cjson_encode(d.row))) + end + + local sql = string_format(NEW_VERSION_QUERY, buf:get()) + + return self.connector:query(sql) +end + + +function _M:get_latest_version() + local sql = "SELECT MAX(version) AS max_version FROM clustering_sync_version" + + local res, err = self.connector:query(sql) + if not res then + return nil, err + end + + return res[1] and res[1].max_version +end + + +function _M:get_delta(version) + local sql = "SELECT * FROM clustering_sync_delta" .. + " WHERE version > " .. self.connector:escape_literal(version) .. + " ORDER BY version ASC" + return self.connector:query(sql) +end + + +function _M:begin_txn() + return self.connector:query("BEGIN;") +end + + +function _M:commit_txn() + return self.connector:query("COMMIT;") +end + + +function _M:cancel_txn() + -- we will close the connection, not execute 'ROLLBACK' + return self.connector:close() +end + + +return _M diff --git a/kong/conf_loader/constants.lua b/kong/conf_loader/constants.lua index 21326a588e3e..76cbb36394cf 100644 --- a/kong/conf_loader/constants.lua +++ b/kong/conf_loader/constants.lua @@ -510,6 +510,7 @@ local CONF_PARSERS = { cluster_use_proxy = { typ = "boolean" }, cluster_dp_labels = { typ = "array" }, cluster_rpc = { typ = "boolean" }, + cluster_incremental_sync = { typ = "boolean" }, cluster_cjson = { typ = "boolean" }, kic = { typ = "boolean" }, diff --git a/kong/conf_loader/init.lua b/kong/conf_loader/init.lua index 96ff04522ac2..51ea979d2cc9 100644 --- a/kong/conf_loader/init.lua +++ b/kong/conf_loader/init.lua @@ -953,10 +953,9 @@ local function load(path, custom_conf, opts) end end - -- TODO: remove this when cluster_rpc is ready for GA - if conf.cluster_rpc then - log.warn("Cluster RPC has been forcibly disabled") - conf.cluster_rpc = "off" + if not conf.cluster_rpc then + log.warn("Cluster incremental sync has been forcibly disabled") + conf.cluster_incremental_sync = false end -- initialize the dns client, so the globally patched tcp.connect method diff --git a/kong/constants.lua b/kong/constants.lua index 6de799f980b7..7a05f24cf530 100644 --- a/kong/constants.lua +++ b/kong/constants.lua @@ -206,8 +206,11 @@ local constants = { PROTOCOLS = protocols, PROTOCOLS_WITH_SUBSYSTEM = protocols_with_subsystem, + DECLARATIVE_DEFAULT_WORKSPACE_ID = "0dc6f45b-8f8d-40d2-a504-473544ee190b", + DECLARATIVE_LOAD_KEY = "declarative_config:loaded", DECLARATIVE_HASH_KEY = "declarative_config:hash", + DECLARATIVE_DEFAULT_WORKSPACE_KEY = "declarative_config:default_workspace", PLUGINS_REBUILD_COUNTER_KEY = "readiness_probe_config:plugins_rebuild_counter", ROUTERS_REBUILD_COUNTER_KEY = "readiness_probe_config:routers_rebuild_counter", DECLARATIVE_EMPTY_CONFIG_HASH = string.rep("0", 32), diff --git a/kong/db/dao/init.lua b/kong/db/dao/init.lua index 515e6c0c719c..4305be4a96f9 100644 --- a/kong/db/dao/init.lua +++ b/kong/db/dao/init.lua @@ -1156,12 +1156,14 @@ function DAO:insert(entity, options) local row, err_t = self.strategy:insert(entity_to_insert, options) if not row then + run_hook("dao:insert:fail", err_t, entity, self.schema.name, options) return nil, tostring(err_t), err_t end local ws_id = row.ws_id row, err, err_t = self:row_to_entity(row, options) if not row then + run_hook("dao:insert:fail", err, entity, self.schema.name, options) return nil, err, err_t end @@ -1209,12 +1211,14 @@ function DAO:update(pk_or_entity, entity, options) local row, err_t = self.strategy:update(primary_key, entity_to_update, options) if not row then + run_hook("dao:update:fail", err_t, entity_to_update, self.schema.name, options) return nil, tostring(err_t), err_t end local ws_id = row.ws_id row, err, err_t = self:row_to_entity(row, options) if not row then + run_hook("dao:update:fail", err_t, entity_to_update, self.schema.name, options) return nil, err, err_t end @@ -1337,9 +1341,11 @@ function DAO:delete(pk_or_entity, options) local rows_affected rows_affected, err_t = self.strategy:delete(primary_key, options) if err_t then + run_hook("dao:delete:fail", err_t, entity, self.schema.name, options) return nil, tostring(err_t), err_t elseif not rows_affected then + run_hook("dao:delete:post", nil, self.schema.name, options, ws_id, nil) return nil end diff --git a/kong/db/dao/workspaces.lua b/kong/db/dao/workspaces.lua index f42832014abb..f93531292467 100644 --- a/kong/db/dao/workspaces.lua +++ b/kong/db/dao/workspaces.lua @@ -1,6 +1,14 @@ local Workspaces = {} +local constants = require("kong.constants") +local lmdb = require("resty.lmdb") + + +local DECLARATIVE_DEFAULT_WORKSPACE_KEY = constants.DECLARATIVE_DEFAULT_WORKSPACE_KEY +local DECLARATIVE_DEFAULT_WORKSPACE_ID = constants.DECLARATIVE_DEFAULT_WORKSPACE_ID + + function Workspaces:truncate() self.super.truncate(self) if kong.configuration.database == "off" then @@ -18,4 +26,19 @@ function Workspaces:truncate() end +function Workspaces:select_by_name(key, options) + if kong.configuration.database == "off" and key == "default" then + -- TODO: Currently, only Kong workers load the declarative config into lmdb. + -- The Kong master doesn't get the default workspace from lmdb, so we + -- return the default constant value. It would be better to have the + -- Kong master load the declarative config into lmdb in the future. + -- + -- it should be a table, not a single string + return { id = lmdb.get(DECLARATIVE_DEFAULT_WORKSPACE_KEY) or DECLARATIVE_DEFAULT_WORKSPACE_ID, } + end + + return self.super.select_by_name(self, key, options) +end + + return Workspaces diff --git a/kong/db/declarative/export.lua b/kong/db/declarative/export.lua index c3b6b8c1366b..6c1c66ede7fd 100644 --- a/kong/db/declarative/export.lua +++ b/kong/db/declarative/export.lua @@ -117,9 +117,20 @@ local function export_from_db_impl(emitter, skip_ws, skip_disabled_entities, exp return nil, err end + local sync_version + if emitter.want_sync_version then + ok, err = db.connector:query("SELECT max(version) from clustering_sync_version", "read") + if not ok then + return nil, err + end + + sync_version = assert(ok[1].max) + end + emitter:emit_toplevel({ _format_version = "3.0", _transform = false, + _sync_version = sync_version, -- only used by sync emitter, DP doesn't care about this }) local disabled_services = {} @@ -339,6 +350,34 @@ local function sanitize_output(entities) end +local sync_emitter = { + emit_toplevel = function(self, tbl) + self.out = {} + self.out_n = 0 + self.sync_version = tbl._sync_version + end, + + emit_entity = function(self, entity_name, entity_data) + self.out_n = self.out_n + 1 + self.out[self.out_n] = { type = entity_name , row = entity_data, version = self.sync_version, } + end, + + done = function(self) + return self.out + end, +} + + +function sync_emitter.new() + return setmetatable({ want_sync_version = true, }, { __index = sync_emitter }) +end + + +local function export_config_sync() + return export_from_db_impl(sync_emitter.new(), false, false, true) +end + + return { convert_nulls = convert_nulls, to_yaml_string = to_yaml_string, @@ -347,6 +386,7 @@ return { export_from_db = export_from_db, export_config = export_config, export_config_proto = export_config_proto, + export_config_sync = export_config_sync, sanitize_output = sanitize_output, } diff --git a/kong/db/declarative/import.lua b/kong/db/declarative/import.lua index 454af9edb126..ea8f23a546d5 100644 --- a/kong/db/declarative/import.lua +++ b/kong/db/declarative/import.lua @@ -11,19 +11,57 @@ local yield = require("kong.tools.yield").yield local marshall = require("kong.db.declarative.marshaller").marshall local schema_topological_sort = require("kong.db.schema.topological_sort") local nkeys = require("table.nkeys") +local sha256_hex = require("kong.tools.sha256").sha256_hex +local pk_string = declarative_config.pk_string +local EMPTY = require("kong.tools.table").EMPTY local assert = assert -local sort = table.sort local type = type local pairs = pairs -local next = next local insert = table.insert +local string_format = string.format local null = ngx.null local get_phase = ngx.get_phase +local get_workspace_id = workspaces.get_workspace_id local DECLARATIVE_HASH_KEY = constants.DECLARATIVE_HASH_KEY local DECLARATIVE_EMPTY_CONFIG_HASH = constants.DECLARATIVE_EMPTY_CONFIG_HASH +local DECLARATIVE_DEFAULT_WORKSPACE_KEY = constants.DECLARATIVE_DEFAULT_WORKSPACE_KEY + + +-- Generates the appropriate workspace ID for current operating context +-- depends on schema settings +-- +-- Non-workspaceable entities are always placed under the "default" +-- workspace +-- +-- If the query explicitly set options.workspace == null, then "default" +-- workspace shall be used +-- +-- If the query explicitly set options.workspace == "some UUID", then +-- it will be returned +-- +-- Otherwise, the current workspace ID will be returned +local function workspace_id(schema, options) + if not schema.workspaceable then + return kong.default_workspace + end + + -- options.workspace does not exist + if not options or not options.workspace then + return get_workspace_id() + end + + -- options.workspace == null must be handled by caller by querying + -- all available workspaces one by one + if options.workspace == null then + return kong.default_workspace + end + + -- options.workspace is a UUID + return options.workspace +end local function find_or_create_current_workspace(name) @@ -133,6 +171,7 @@ local function remove_nulls(tbl) return tbl end + --- Restore all nulls for declarative config. -- Declarative config is a huge table. Use iteration -- instead of recursion to improve performance. @@ -174,26 +213,42 @@ local function restore_nulls(original_tbl, transformed_tbl) return transformed_tbl end + local function get_current_hash() return lmdb.get(DECLARATIVE_HASH_KEY) end -local function find_default_ws(entities) - for _, v in pairs(entities.workspaces or {}) do - if v.name == "default" then +local function find_ws(entities, name) + for _, v in pairs(entities.workspaces or EMPTY) do + if v.name == name then return v.id end end end -local function unique_field_key(schema_name, ws_id, field, value, unique_across_ws) - if unique_across_ws then - ws_id = "" - end +local function unique_field_key(schema_name, ws_id, field, value) + return string_format("%s|%s|%s|%s", schema_name, ws_id, field, sha256_hex(value)) +end + + +local function foreign_field_key_prefix(schema_name, ws_id, field, foreign_id) + return string_format("%s|%s|%s|%s|", schema_name, ws_id, field, foreign_id) +end + + +local function foreign_field_key(schema_name, ws_id, field, foreign_id, pk) + return foreign_field_key_prefix(schema_name, ws_id, field, foreign_id) .. pk +end + +local function item_key_prefix(schema_name, ws_id) + return string_format("%s|%s|*|", schema_name, ws_id) +end + - return schema_name .. "|" .. ws_id .. "|" .. field .. ":" .. value +local function item_key(schema_name, ws_id, pk_str) + return item_key_prefix(schema_name, ws_id) .. pk_str end @@ -203,6 +258,122 @@ local function config_is_empty(entities) end +-- common implementation for +-- insert_entity_for_txn() and delete_entity_for_txn() +local function _set_entity_for_txn(t, entity_name, item, options, is_delete) + local dao = kong.db[entity_name] + local schema = dao.schema + local pk = pk_string(schema, item) + local ws_id = workspace_id(schema, options) + + local itm_key = item_key(entity_name, ws_id, pk) + + -- if we are deleting, item_value and idx_value should be nil + local itm_value, idx_value + + -- if we are inserting or updating + -- itm_value is serialized entity + -- idx_value is the lmdb item_key + if not is_delete then + local err + + -- serialize item with possible nulls + itm_value, err = marshall(item) + if not itm_value then + return nil, err + end + + idx_value = itm_key + end + + -- store serialized entity into lmdb + t:set(itm_key, itm_value) + + -- select_by_cache_key + if schema.cache_key then + local cache_key = dao:cache_key(item) + local key = unique_field_key(entity_name, ws_id, "cache_key", cache_key) + + -- store item_key or nil into lmdb + t:set(key, idx_value) + end + + for fname, fdata in schema:each_field() do + local is_foreign = fdata.type == "foreign" + local fdata_reference = fdata.reference + local value = item[fname] + + -- value may be null, we should skip it + if not value or value == null then + goto continue + end + + -- value should be a string or table + + local value_str + + if fdata.unique then + -- unique and not a foreign key, or is a foreign key, but non-composite + -- see: validate_foreign_key_is_single_primary_key, composite foreign + -- key is currently unsupported by the DAO + if type(value) == "table" then + assert(is_foreign) + value_str = pk_string(kong.db[fdata_reference].schema, value) + end + + if fdata.unique_across_ws then + ws_id = kong.default_workspace + end + + local key = unique_field_key(entity_name, ws_id, fname, value_str or value) + + -- store item_key or nil into lmdb + t:set(key, idx_value) + end + + if is_foreign then + -- is foreign, generate page_for_foreign_field indexes + assert(type(value) == "table") + + value_str = pk_string(kong.db[fdata_reference].schema, value) + + local key = foreign_field_key(entity_name, ws_id, fname, value_str, pk) + + -- store item_key or nil into lmdb + t:set(key, idx_value) + end + + ::continue:: + end -- for fname, fdata in schema:each_field() + + return true +end + + +-- Serialize and set keys for a single validated entity into +-- the provided LMDB txn object, this operation is only safe +-- is the entity does not already exist inside the LMDB database +-- +-- This function sets the following: +-- * ||*| => serialized item +-- * |||sha256(field_value) => ||*| +-- * |||| -> ||*| +-- +-- DO NOT touch `item`, or else the entity will be changed +local function insert_entity_for_txn(t, entity_name, item, options) + return _set_entity_for_txn(t, entity_name, item, options, false) +end + + +-- Serialize and remove keys for a single validated entity into +-- the provided LMDB txn object, this operation is safe whether the provided +-- entity exists inside LMDB or not, but the provided entity must contains the +-- correct field value so indexes can be deleted correctly +local function delete_entity_for_txn(t, entity_name, item, options) + return _set_entity_for_txn(t, entity_name, item, options, true) +end + + -- entities format: -- { -- services: { @@ -217,35 +388,24 @@ end -- _transform: true, -- } local function load_into_cache(entities, meta, hash) - -- Array of strings with this format: - -- "||". - -- For example, a service tagged "admin" would produce - -- "admin|services|" - local tags = {} - meta = meta or {} + local default_workspace_id = assert(find_ws(entities, "default")) + local should_transform = meta._transform == nil and true or meta._transform - local default_workspace = assert(find_default_ws(entities)) - local fallback_workspace = default_workspace + assert(type(default_workspace_id) == "string") - assert(type(fallback_workspace) == "string") + -- set it for insert_entity_for_txn() + kong.default_workspace = default_workspace_id if not hash or hash == "" or config_is_empty(entities) then hash = DECLARATIVE_EMPTY_CONFIG_HASH end - -- Keys: tag name, like "admin" - -- Values: array of encoded tags, similar to the `tags` variable, - -- but filtered for a given tag - local tags_by_name = {} - local db = kong.db - local t = txn.begin(128) + local t = txn.begin(512) t:db_drop(false) local phase = get_phase() - yield(false, phase) -- XXX - local transform = meta._transform == nil and true or meta._transform for entity_name, items in pairs(entities) do yield(true, phase) @@ -256,63 +416,14 @@ local function load_into_cache(entities, meta, hash) end local schema = dao.schema - -- Keys: tag_name, eg "admin" - -- Values: dictionary of keys associated to this tag, - -- for a specific entity type - -- i.e. "all the services associated to the 'admin' tag" - -- The ids are keys, and the values are `true` - local taggings = {} - - local uniques = {} - local page_for = {} - local foreign_fields = {} - for fname, fdata in schema:each_field() do - local is_foreign = fdata.type == "foreign" - local fdata_reference = fdata.reference - - if fdata.unique then - if is_foreign then - if #db[fdata_reference].schema.primary_key == 1 then - insert(uniques, fname) - end - - else - insert(uniques, fname) - end - end - if is_foreign then - page_for[fdata_reference] = {} - foreign_fields[fname] = fdata_reference - end - end - - local keys_by_ws = { - -- map of keys for global queries - ["*"] = {} - } - for id, item in pairs(items) do - -- When loading the entities, when we load the default_ws, we - -- set it to the current. But this only works in the worker that - -- is doing the loading (0), other ones still won't have it - - yield(true, phase) - - assert(type(fallback_workspace) == "string") - - local ws_id = "" - if schema.workspaceable then - local item_ws_id = item.ws_id - if item_ws_id == null or item_ws_id == nil then - item_ws_id = fallback_workspace - end - item.ws_id = item_ws_id - ws_id = item_ws_id + for _, item in pairs(items) do + if not schema.workspaceable or item.ws_id == null or item.ws_id == nil then + item.ws_id = default_workspace_id end - assert(type(ws_id) == "string") + assert(type(item.ws_id) == "string") - local cache_key = dao:cache_key(id, nil, nil, nil, nil, item.ws_id) - if transform and schema:has_transformations(item) then + if should_transform and schema:has_transformations(item) then local transformed_item = cycle_aware_deep_copy(item) remove_nulls(transformed_item) @@ -328,161 +439,16 @@ local function load_into_cache(entities, meta, hash) end end - local item_marshalled, err = marshall(item) - if not item_marshalled then - return nil, err - end - - t:set(cache_key, item_marshalled) - - local global_query_cache_key = dao:cache_key(id, nil, nil, nil, nil, "*") - t:set(global_query_cache_key, item_marshalled) - - -- insert individual entry for global query - insert(keys_by_ws["*"], cache_key) - - -- insert individual entry for workspaced query - if ws_id ~= "" then - keys_by_ws[ws_id] = keys_by_ws[ws_id] or {} - local keys = keys_by_ws[ws_id] - insert(keys, cache_key) - end - - if schema.cache_key then - local cache_key = dao:cache_key(item) - t:set(cache_key, item_marshalled) - end - - for i = 1, #uniques do - local unique = uniques[i] - local unique_key = item[unique] - if unique_key and unique_key ~= null then - if type(unique_key) == "table" then - local _ - -- this assumes that foreign keys are not composite - _, unique_key = next(unique_key) - end - - local key = unique_field_key(entity_name, ws_id, unique, unique_key, - schema.fields[unique].unique_across_ws) - - t:set(key, item_marshalled) - end - end - - for fname, ref in pairs(foreign_fields) do - local item_fname = item[fname] - if item_fname and item_fname ~= null then - local fschema = db[ref].schema - - local fid = declarative_config.pk_string(fschema, item_fname) - - -- insert paged search entry for global query - page_for[ref]["*"] = page_for[ref]["*"] or {} - page_for[ref]["*"][fid] = page_for[ref]["*"][fid] or {} - insert(page_for[ref]["*"][fid], cache_key) - - -- insert paged search entry for workspaced query - page_for[ref][ws_id] = page_for[ref][ws_id] or {} - page_for[ref][ws_id][fid] = page_for[ref][ws_id][fid] or {} - insert(page_for[ref][ws_id][fid], cache_key) - end - end - - local item_tags = item.tags - if item_tags and item_tags ~= null then - local ws = schema.workspaceable and ws_id or "" - for i = 1, #item_tags do - local tag_name = item_tags[i] - insert(tags, tag_name .. "|" .. entity_name .. "|" .. id) - - tags_by_name[tag_name] = tags_by_name[tag_name] or {} - insert(tags_by_name[tag_name], tag_name .. "|" .. entity_name .. "|" .. id) - - taggings[tag_name] = taggings[tag_name] or {} - taggings[tag_name][ws] = taggings[tag_name][ws] or {} - taggings[tag_name][ws][cache_key] = true - end - end - end - - for ws_id, keys in pairs(keys_by_ws) do - local entity_prefix = entity_name .. "|" .. (schema.workspaceable and ws_id or "") - - local keys, err = marshall(keys) - if not keys then + -- nil means no extra options + local res, err = insert_entity_for_txn(t, entity_name, item, nil) + if not res then return nil, err end + end -- for for _, item + end -- for entity_name, items - t:set(entity_prefix .. "|@list", keys) - - for ref, wss in pairs(page_for) do - local fids = wss[ws_id] - if fids then - for fid, entries in pairs(fids) do - local key = entity_prefix .. "|" .. ref .. "|" .. fid .. "|@list" - - local entries, err = marshall(entries) - if not entries then - return nil, err - end - - t:set(key, entries) - end - end - end - end - - -- taggings:admin|services|ws_id|@list -> uuids of services tagged "admin" on workspace ws_id - for tag_name, workspaces_dict in pairs(taggings) do - for ws_id, keys_dict in pairs(workspaces_dict) do - local key = "taggings:" .. tag_name .. "|" .. entity_name .. "|" .. ws_id .. "|@list" - - -- transform the dict into a sorted array - local arr = {} - local len = 0 - for id in pairs(keys_dict) do - len = len + 1 - arr[len] = id - end - -- stay consistent with pagination - sort(arr) - - local arr, err = marshall(arr) - if not arr then - return nil, err - end - - t:set(key, arr) - end - end - end - - for tag_name, tags in pairs(tags_by_name) do - yield(true, phase) - - -- tags:admin|@list -> all tags tagged "admin", regardless of the entity type - -- each tag is encoded as a string with the format "admin|services|uuid", where uuid is the service uuid - local key = "tags:" .. tag_name .. "|@list" - local tags, err = marshall(tags) - if not tags then - return nil, err - end - - t:set(key, tags) - end - - -- tags||@list -> all tags, with no distinction of tag name or entity type. - -- each tag is encoded as a string with the format "admin|services|uuid", where uuid is the service uuid - local tags, err = marshall(tags) - if not tags then - return nil, err - end - - t:set("tags||@list", tags) t:set(DECLARATIVE_HASH_KEY, hash) - - kong.default_workspace = default_workspace + t:set(DECLARATIVE_DEFAULT_WORKSPACE_KEY, default_workspace_id) local ok, err = t:commit() if not ok then @@ -492,9 +458,7 @@ local function load_into_cache(entities, meta, hash) kong.core_cache:purge() kong.cache:purge() - yield(false, phase) - - return true, nil, default_workspace + return true, nil, default_workspace_id end @@ -599,8 +563,15 @@ end return { get_current_hash = get_current_hash, unique_field_key = unique_field_key, + foreign_field_key = foreign_field_key, + foreign_field_key_prefix = foreign_field_key_prefix, + item_key = item_key, + item_key_prefix = item_key_prefix, + workspace_id = workspace_id, load_into_db = load_into_db, load_into_cache = load_into_cache, load_into_cache_with_events = load_into_cache_with_events, + insert_entity_for_txn = insert_entity_for_txn, + delete_entity_for_txn = delete_entity_for_txn, } diff --git a/kong/db/declarative/init.lua b/kong/db/declarative/init.lua index a7dd6d2b0734..73a2704f51e9 100644 --- a/kong/db/declarative/init.lua +++ b/kong/db/declarative/init.lua @@ -245,15 +245,22 @@ _M.to_yaml_file = declarative_export.to_yaml_file _M.export_from_db = declarative_export.export_from_db _M.export_config = declarative_export.export_config _M.export_config_proto = declarative_export.export_config_proto +_M.export_config_sync = declarative_export.export_config_sync _M.sanitize_output = declarative_export.sanitize_output -- import _M.get_current_hash = declarative_import.get_current_hash _M.unique_field_key = declarative_import.unique_field_key +_M.item_key = declarative_import.item_key +_M.item_key_prefix = declarative_import.item_key_prefix +_M.foreign_field_key_prefix = declarative_import.foreign_field_key_prefix _M.load_into_db = declarative_import.load_into_db _M.load_into_cache = declarative_import.load_into_cache _M.load_into_cache_with_events = declarative_import.load_into_cache_with_events +_M.insert_entity_for_txn = declarative_import.insert_entity_for_txn +_M.delete_entity_for_txn = declarative_import.delete_entity_for_txn +_M.workspace_id = declarative_import.workspace_id return _M diff --git a/kong/db/migrations/core/024_370_to_380.lua b/kong/db/migrations/core/024_370_to_380.lua new file mode 100644 index 000000000000..9d78807962cf --- /dev/null +++ b/kong/db/migrations/core/024_370_to_380.lua @@ -0,0 +1,22 @@ +return { + postgres = { + up = [[ + DO $$ + BEGIN + CREATE TABLE IF NOT EXISTS clustering_sync_version ( + "version" SERIAL PRIMARY KEY + ); + CREATE TABLE IF NOT EXISTS clustering_sync_delta ( + "version" INT NOT NULL, + "type" TEXT NOT NULL, + "id" UUID NOT NULL, + "ws_id" UUID NOT NULL, + "row" JSON, + FOREIGN KEY (version) REFERENCES clustering_sync_version(version) ON DELETE CASCADE + ); + CREATE INDEX IF NOT EXISTS clustering_sync_delta_version_idx ON clustering_sync_delta (version); + END; + $$; + ]] + } +} diff --git a/kong/db/migrations/core/init.lua b/kong/db/migrations/core/init.lua index 2f18b1cb5f76..394f13bf382b 100644 --- a/kong/db/migrations/core/init.lua +++ b/kong/db/migrations/core/init.lua @@ -21,4 +21,5 @@ return { "021_340_to_350", "022_350_to_360", "023_360_to_370", + "024_370_to_380", } diff --git a/kong/db/schema/others/declarative_config.lua b/kong/db/schema/others/declarative_config.lua index 4e56cb24826f..6d7c47e4d50e 100644 --- a/kong/db/schema/others/declarative_config.lua +++ b/kong/db/schema/others/declarative_config.lua @@ -39,15 +39,33 @@ local foreign_references = {} local foreign_children = {} -function DeclarativeConfig.pk_string(schema, object) - if #schema.primary_key == 1 then - return tostring(object[schema.primary_key[1]]) - else - local out = {} - for _, k in ipairs(schema.primary_key) do - insert(out, tostring(object[k])) +do + local tb_nkeys = require("table.nkeys") + local request_aware_table = require("kong.tools.request_aware_table") + + local CACHED_OUT + + -- Generate a stable and unique string key from primary key defined inside + -- schema, supports both non-composite and composite primary keys + function DeclarativeConfig.pk_string(schema, object) + local primary_key = schema.primary_key + local count = tb_nkeys(primary_key) + + if count == 1 then + return tostring(object[primary_key[1]]) + end + + if not CACHED_OUT then + CACHED_OUT = request_aware_table.new() + end + + CACHED_OUT.clear() + for i = 1, count do + local k = primary_key[i] + insert(CACHED_OUT, tostring(object[k])) end - return concat(out, ":") + + return concat(CACHED_OUT, ":") end end @@ -725,7 +743,7 @@ end local function insert_default_workspace_if_not_given(_, entities) - local default_workspace = find_default_ws(entities) or "0dc6f45b-8f8d-40d2-a504-473544ee190b" + local default_workspace = find_default_ws(entities) or constants.DECLARATIVE_DEFAULT_WORKSPACE_ID if not entities.workspaces then entities.workspaces = {} diff --git a/kong/db/strategies/connector.lua b/kong/db/strategies/connector.lua index 3f03cddc11f7..719ef8078ca5 100644 --- a/kong/db/strategies/connector.lua +++ b/kong/db/strategies/connector.lua @@ -5,8 +5,8 @@ local fmt = string.format local Connector = { defaults = { pagination = { - page_size = 1000, - max_page_size = 50000, + page_size = 512, -- work with lmdb + max_page_size = 512, -- work with lmdb }, }, } diff --git a/kong/db/strategies/off/init.lua b/kong/db/strategies/off/init.lua index c984510877ab..1ab27cddf56e 100644 --- a/kong/db/strategies/off/init.lua +++ b/kong/db/strategies/off/init.lua @@ -1,26 +1,31 @@ -local declarative_config = require "kong.db.schema.others.declarative_config" -local workspaces = require "kong.workspaces" +local declarative_config = require("kong.db.schema.others.declarative_config") local lmdb = require("resty.lmdb") +local lmdb_prefix = require("resty.lmdb.prefix") local marshaller = require("kong.db.declarative.marshaller") -local yield = require("kong.tools.yield").yield -local unique_field_key = require("kong.db.declarative").unique_field_key +local declarative = require("kong.db.declarative") local kong = kong local fmt = string.format local type = type local next = next -local sort = table.sort -local pairs = pairs -local match = string.match local assert = assert -local tostring = tostring -local tonumber = tonumber local encode_base64 = ngx.encode_base64 local decode_base64 = ngx.decode_base64 local null = ngx.null local unmarshall = marshaller.unmarshall local lmdb_get = lmdb.get -local get_workspace_id = workspaces.get_workspace_id +local pk_string = declarative_config.pk_string +local unique_field_key = declarative.unique_field_key +local item_key = declarative.item_key +local item_key_prefix = declarative.item_key_prefix +local workspace_id = declarative.workspace_id +local foreign_field_key_prefix = declarative.foreign_field_key_prefix + + +local PROCESS_AUTO_FIELDS_OPTS = { + no_defaults = true, + show_ws_id = true, +} local off = {} @@ -30,21 +35,16 @@ local _mt = {} _mt.__index = _mt -local function ws(schema, options) - if not schema.workspaceable then - return "" - end +local UNINIT_WORKSPACE_ID = "00000000-0000-0000-0000-000000000000" - if options then - if options.workspace == null then - return "*" - end - if options.workspace then - return options.workspace - end + +local function get_default_workspace() + if kong.default_workspace == UNINIT_WORKSPACE_ID then + local res = kong.db.workspaces:select_by_name("default") + kong.default_workspace = assert(res and res.id) end - return get_workspace_id() + return kong.default_workspace end @@ -53,229 +53,163 @@ local function process_ttl_field(entity) local ttl_value = entity.ttl - ngx.time() if ttl_value > 0 then entity.ttl = ttl_value + else entity = nil -- do not return the expired entity end end + return entity end --- Returns a dict of entity_ids tagged according to the given criteria. --- Currently only the following kinds of keys are supported: --- * A key like `services||@list` will only return service keys --- @tparam string the key to be used when filtering --- @tparam table tag_names an array of tag names (strings) --- @tparam string|nil tags_cond either "or", "and". `nil` means "or" --- @treturn table|nil returns a table with entity_ids as values, and `true` as keys -local function get_entity_ids_tagged(key, tag_names, tags_cond) - local tag_name, list, err - local dict = {} -- keys are entity_ids, values are true - - for i = 1, #tag_names do - tag_name = tag_names[i] - list, err = unmarshall(lmdb_get("taggings:" .. tag_name .. "|" .. key)) - if err then - return nil, err - end - - yield(true) +local function construct_entity(schema, value) + local entity, err = unmarshall(value) + if not entity then + return nil, err + end - list = list or {} + if schema.ttl then + entity = process_ttl_field(entity) + if not entity then + return nil + end + end - if i > 1 and tags_cond == "and" then - local list_len = #list - -- optimization: exit early when tags_cond == "and" and one of the tags does not return any entities - if list_len == 0 then - return {} - end + entity = schema:process_auto_fields(entity, "select", true, PROCESS_AUTO_FIELDS_OPTS) - local and_dict = {} - local new_tag_id - for i = 1, list_len do - new_tag_id = list[i] - and_dict[new_tag_id] = dict[new_tag_id] -- either true or nil - end - dict = and_dict + return entity +end - -- optimization: exit early when tags_cond == "and" and current list is empty - if not next(dict) then - return {} - end - else -- tags_cond == "or" or first iteration - -- the first iteration is the same for both "or" and "and": put all ids into dict - for i = 1, #list do - dict[list[i]] = true - end +-- select item by primary key, if follow is true, then one indirection +-- will be followed indirection means the value of `key` is not the actual +-- serialized item, but rather the value is a pointer to the key where +-- actual serialized item is located. This way this function can be shared +-- by both primary key lookup as well as unique key lookup without needing +-- to duplicate the item content +local function select_by_key(schema, key, follow) + if follow then + local actual_key, err = lmdb_get(key) + if not actual_key then + return nil, err end + + return select_by_key(schema, actual_key, false) end - local arr = {} - local len = 0 - for entity_id in pairs(dict) do - len = len + 1 - arr[len] = entity_id + local entity, err = construct_entity(schema, lmdb_get(key)) + if not entity then + return nil, err end - sort(arr) -- consistency when paginating results - return arr + return entity end -local function page_for_key(self, key, size, offset, options) +local function page_for_prefix(self, prefix, size, offset, options, follow) if not size then size = self.connector:get_page_size(options) end - if offset then - local token = decode_base64(offset) - if not token then - return nil, self.errors:invalid_offset(offset, "bad base64 encoding") - end - - local number = tonumber(token) - if not number then - return nil, self.errors:invalid_offset(offset, "invalid offset") - end + offset = offset or prefix - offset = number - - else - offset = 1 + local res, err_or_more = lmdb_prefix.page(offset, prefix, nil, size) + if not res then + return nil, err_or_more end - local list, err - if options and options.tags then - list, err = get_entity_ids_tagged(key, options.tags, options.tags_cond) - if err then - return nil, err - end - - else - list, err = unmarshall(lmdb_get(key)) - if err then - return nil, err - end - - list = list or {} - end - - yield() - local ret = {} - local ret_idx = 1 + local ret_idx = 0 local schema = self.schema - local schema_name = schema.name - - local item - for i = offset, offset + size - 1 do - item = list[i] - if not item then - offset = nil - break - end + local last_key - -- Tags are stored in the cache entries "tags||@list" and "tags:|@list" - -- The contents of both of these entries is an array of strings - -- Each of these strings has the form "||" - -- For example "admin|services|" - -- This loop transforms each individual string into tables. - if schema_name == "tags" then - local tag_name, entity_name, uuid = match(item, "^([^|]+)|([^|]+)|(.+)$") - if not tag_name then - return nil, "Could not parse tag from cache: " .. tostring(item) - end + for _, kv in ipairs(res) do + last_key = kv.key + local item, err - item = { tag = tag_name, entity_name = entity_name, entity_id = uuid } + if follow then + item, err = select_by_key(schema, kv.value, false) - -- The rest of entities' lists (i.e. "services||@list") only contain ids, so in order to - -- get the entities we must do an additional cache access per entry else - item, err = unmarshall(lmdb_get(item)) - if err then - return nil, err - end + item, err = construct_entity(schema, kv.value) end - if not item then - return nil, "stale data detected while paginating" - end - - if schema.ttl then - item = process_ttl_field(item) + if err then + return nil, err end - if item then - ret[ret_idx] = item - ret_idx = ret_idx + 1 - end + ret_idx = ret_idx + 1 + ret[ret_idx] = item end - if offset then - return ret, nil, encode_base64(tostring(offset + size), true) + -- more need to query + if err_or_more then + return ret, nil, encode_base64(last_key .. "\x00", true) end return ret end -local function select_by_key(schema, key) - local entity, err = unmarshall(lmdb_get(key)) - if not entity then - return nil, err - end +local function page(self, size, offset, options) + local schema = self.schema + local ws_id = workspace_id(schema, options) + local prefix = item_key_prefix(schema.name, ws_id) - if schema.ttl then - entity = process_ttl_field(entity) - if not entity then - return nil + if offset then + local token = decode_base64(offset) + if not token then + return nil, self.errors:invalid_offset(offset, "bad base64 encoding") end - end - - return entity -end + offset = token + end -local function page(self, size, offset, options) - local schema = self.schema - local ws_id = ws(schema, options) - local key = schema.name .. "|" .. ws_id .. "|@list" - return page_for_key(self, key, size, offset, options) + return page_for_prefix(self, prefix, size, offset, options, false) end +-- select by primary key local function select(self, pk, options) local schema = self.schema - local ws_id = ws(schema, options) - local id = declarative_config.pk_string(schema, pk) - local key = schema.name .. ":" .. id .. ":::::" .. ws_id - return select_by_key(schema, key) + local ws_id = workspace_id(schema, options) + local pk = pk_string(schema, pk) + local key = item_key(schema.name, ws_id, pk) + return select_by_key(schema, key, false) end +-- select by unique field (including select_by_cache_key) +-- the DAO guarantees this method only gets called for unique fields +-- see: validate_foreign_key_is_single_primary_key local function select_by_field(self, field, value, options) + local schema = self.schema + if type(value) == "table" then + -- select by foreign, DAO only support one key for now (no composites) + local fdata = schema.fields[field] + assert(fdata.type == "foreign") + assert(#kong.db[fdata.reference].schema.primary_key == 1) + local _ _, value = next(value) end - local schema = self.schema - local ws_id = ws(schema, options) + local ws_id = workspace_id(schema, options) local key - if field ~= "cache_key" then - local unique_across_ws = schema.fields[field].unique_across_ws - -- only accept global query by field if field is unique across workspaces - assert(not options or options.workspace ~= null or unique_across_ws) - - key = unique_field_key(schema.name, ws_id, field, value, unique_across_ws) - else - -- if select_by_cache_key, use the provided cache_key as key directly - key = value + local unique_across_ws = schema.fields[field].unique_across_ws + -- only accept global query by field if field is unique across workspaces + assert(not options or options.workspace ~= null or unique_across_ws) + + if unique_across_ws then + ws_id = get_default_workspace() end - return select_by_key(schema, key) + key = unique_field_key(schema.name, ws_id, field, value) + + return select_by_key(schema, key, true) end @@ -307,8 +241,6 @@ do _mt.upsert_by_field = unsupported_by("create or update") _mt.delete_by_field = unsupported_by("remove") _mt.truncate = function() return true end - -- off-strategy specific methods: - _mt.page_for_key = page_for_key end @@ -323,18 +255,17 @@ function off.new(connector, schema, errors) -- This is not the id for the default workspace in DB-less. -- This is a sentinel value for the init() phase before -- the declarative config is actually loaded. - kong.default_workspace = "00000000-0000-0000-0000-000000000000" + kong.default_workspace = UNINIT_WORKSPACE_ID end local name = schema.name for fname, fdata in schema:each_field() do if fdata.type == "foreign" then - local entity = fdata.reference local method = "page_for_" .. fname self[method] = function(_, foreign_key, size, offset, options) - local ws_id = ws(schema, options) - local key = name .. "|" .. ws_id .. "|" .. entity .. "|" .. foreign_key.id .. "|@list" - return page_for_key(self, key, size, offset, options) + local ws_id = workspace_id(schema, options) + local prefix = foreign_field_key_prefix(name, ws_id, fname, foreign_key.id) + return page_for_prefix(self, prefix, size, offset, options, true) end end end diff --git a/kong/db/strategies/off/tags.lua b/kong/db/strategies/off/tags.lua deleted file mode 100644 index 15498957df5c..000000000000 --- a/kong/db/strategies/off/tags.lua +++ /dev/null @@ -1,11 +0,0 @@ -local Tags = {} - --- Used by /tags/:tag endpoint --- @tparam string tag_pk the tag value --- @treturn table|nil,err,offset -function Tags:page_by_tag(tag, size, offset, options) - local key = "tags:" .. tag .. "|list" - return self:page_for_key(key, size, offset, options) -end - -return Tags diff --git a/kong/init.lua b/kong/init.lua index 70abad8b59c0..8fcfc5924739 100644 --- a/kong/init.lua +++ b/kong/init.lua @@ -695,6 +695,11 @@ function Kong.init() if config.cluster_rpc then kong.rpc = require("kong.clustering.rpc.manager").new(config, kong.node.get_id()) + if config.cluster_incremental_sync then + kong.sync = require("kong.clustering.services.sync").new(db, is_control_plane(config)) + kong.sync:init(kong.rpc) + end + if is_data_plane(config) then require("kong.clustering.services.debug").init(kong.rpc) end @@ -757,7 +762,7 @@ function Kong.init() require("resty.kong.var").patch_metatable() - if config.dedicated_config_processing and is_data_plane(config) then + if config.dedicated_config_processing and is_data_plane(config) and not kong.sync then -- TODO: figure out if there is better value than 4096 -- 4096 is for the cocurrency of the lua-resty-timer-ng local ok, err = process.enable_privileged_agent(4096) @@ -874,8 +879,9 @@ function Kong.init_worker() kong.cache:invalidate_local(constants.ADMIN_GUI_KCONFIG_CACHE_KEY) end - if process.type() == "privileged agent" then + if process.type() == "privileged agent" and not kong.sync then if kong.clustering then + -- full sync cp/dp kong.clustering:init_worker() end return @@ -910,6 +916,7 @@ function Kong.init_worker() end elseif declarative_entities then + ok, err = load_declarative_config(kong.configuration, declarative_entities, declarative_meta, @@ -975,11 +982,18 @@ function Kong.init_worker() end if kong.clustering then - kong.clustering:init_worker() - local cluster_tls = require("kong.clustering.tls") + -- full sync cp/dp + if not kong.sync then + kong.clustering:init_worker() + end + -- rpc and incremental sync if kong.rpc and is_http_module then + + -- only available in http subsystem + local cluster_tls = require("kong.clustering.tls") + if is_data_plane(kong.configuration) then ngx.timer.at(0, function(premature) kong.rpc:connect(premature, @@ -992,6 +1006,11 @@ function Kong.init_worker() else -- control_plane kong.rpc.concentrator:start() end + + -- init incremental sync + if kong.sync then + kong.sync:init_worker() + end end end diff --git a/kong/pdk/vault.lua b/kong/pdk/vault.lua index 14bf4f8bbbab..ed7f421b45a3 100644 --- a/kong/pdk/vault.lua +++ b/kong/pdk/vault.lua @@ -1439,11 +1439,21 @@ local function new(self) end + local function should_register_crud_event() + local conf = self.configuration + + local not_dbless = conf.database ~= "off" -- postgres + local dp_with_inc_sync = conf.role == "data_plane" and + conf.cluster_incremental_sync + + return not_dbless or dp_with_inc_sync + end + local initialized --- -- Initializes vault. -- - -- Registers event handlers (on non-dbless nodes) and starts a recurring secrets + -- Registers event handlers and starts a recurring secrets -- rotation timer. It does nothing on control planes. -- -- @local @@ -1455,7 +1465,7 @@ local function new(self) initialized = true - if self.configuration.database ~= "off" then + if should_register_crud_event() then self.worker_events.register(handle_vault_crud_event, "crud", "vaults") end diff --git a/kong/runloop/events.lua b/kong/runloop/events.lua index b7b64b161813..dfc9718af3c4 100644 --- a/kong/runloop/events.lua +++ b/kong/runloop/events.lua @@ -490,7 +490,11 @@ local function register_events(reconfigure_handler) if db.strategy == "off" then -- declarative config updates register_for_dbless(reconfigure_handler) - return + + -- dbless (not dataplane) has no other events + if not kong.sync then + return + end end register_for_db() diff --git a/kong/runloop/handler.lua b/kong/runloop/handler.lua index 3f6b08afb109..11986d08d11d 100644 --- a/kong/runloop/handler.lua +++ b/kong/runloop/handler.lua @@ -358,7 +358,13 @@ local function new_router(version) end end - local detect_changes = db.strategy ~= "off" and kong.core_cache + local detect_changes = kong.core_cache and true + + -- for dbless we will not check changes when initing + if db.strategy == "off" and get_phase() == "init_worker" then + detect_changes = false + end + local counter = 0 local page_size = db.routes.pagination.max_page_size for route, err in db.routes:each(page_size, GLOBAL_QUERY_OPTS) do @@ -961,51 +967,87 @@ return { end end - if strategy ~= "off" then + do -- start some rebuild timers local worker_state_update_frequency = kong.configuration.worker_state_update_frequency or 1 - local function rebuild_timer(premature) + local router_async_opts = { + name = "router", + timeout = 0, + on_timeout = "return_true", + } + + local function rebuild_router_timer(premature) if premature then return end - local router_update_status, err = rebuild_router({ - name = "router", - timeout = 0, - on_timeout = "return_true", - }) - if not router_update_status then + -- Don't wait for the semaphore (timeout = 0) when updating via the + -- timer. + -- If the semaphore is locked, that means that the rebuild is + -- already ongoing. + local ok, err = rebuild_router(router_async_opts) + if not ok then log(ERR, "could not rebuild router via timer: ", err) end + end - local plugins_iterator_update_status, err = rebuild_plugins_iterator({ - name = "plugins_iterator", - timeout = 0, - on_timeout = "return_true", - }) - if not plugins_iterator_update_status then + local _, err = kong.timer:named_every("router-rebuild", + worker_state_update_frequency, + rebuild_router_timer) + if err then + log(ERR, "could not schedule timer to rebuild router: ", err) + end + + local plugins_iterator_async_opts = { + name = "plugins_iterator", + timeout = 0, + on_timeout = "return_true", + } + + local function rebuild_plugins_iterator_timer(premature) + if premature then + return + end + + local _, err = rebuild_plugins_iterator(plugins_iterator_async_opts) + if err then log(ERR, "could not rebuild plugins iterator via timer: ", err) end + end - if wasm.enabled() then - local wasm_update_status, err = rebuild_wasm_state({ - name = "wasm", - timeout = 0, - on_timeout = "return_true", - }) - if not wasm_update_status then + local _, err = kong.timer:named_every("plugins-iterator-rebuild", + worker_state_update_frequency, + rebuild_plugins_iterator_timer) + if err then + log(ERR, "could not schedule timer to rebuild plugins iterator: ", err) + end + + if wasm.enabled() then + local wasm_async_opts = { + name = "wasm", + timeout = 0, + on_timeout = "return_true", + } + + local function rebuild_wasm_filter_chains_timer(premature) + if premature then + return + end + + local _, err = rebuild_wasm_state(wasm_async_opts) + if err then log(ERR, "could not rebuild wasm filter chains via timer: ", err) end end - end - local _, err = kong.timer:named_every("rebuild", - worker_state_update_frequency, - rebuild_timer) - if err then - log(ERR, "could not schedule timer to rebuild: ", err) + local _, err = kong.timer:named_every("wasm-rebuild", + worker_state_update_frequency, + rebuild_wasm_filter_chains_timer) + if err then + log(ERR, "could not schedule timer to rebuild WASM filter chains: ", err) + end end - end + end -- rebuild timer do block end, }, preread = { diff --git a/kong/templates/kong_defaults.lua b/kong/templates/kong_defaults.lua index f2cc4e0f13a4..4ffb2b24adf5 100644 --- a/kong/templates/kong_defaults.lua +++ b/kong/templates/kong_defaults.lua @@ -41,7 +41,8 @@ cluster_ocsp = off cluster_max_payload = 16777216 cluster_use_proxy = off cluster_dp_labels = NONE -cluster_rpc = off +cluster_rpc = on +cluster_incremental_sync = off cluster_cjson = off lmdb_environment_path = dbless.lmdb diff --git a/spec/01-unit/01-db/04-dao_spec.lua b/spec/01-unit/01-db/04-dao_spec.lua index a7417805fd80..517db2964569 100644 --- a/spec/01-unit/01-db/04-dao_spec.lua +++ b/spec/01-unit/01-db/04-dao_spec.lua @@ -659,7 +659,7 @@ describe("DAO", function() dao:delete({ id = 1 }) dao:delete({ id = 1 }) - assert.spy(post_hook).was_called(1) + assert.spy(post_hook).was_called(2) end) end) diff --git a/spec/01-unit/01-db/10-declarative_spec.lua b/spec/01-unit/01-db/10-declarative_spec.lua index 59020becffe0..be683a2df37b 100644 --- a/spec/01-unit/01-db/10-declarative_spec.lua +++ b/spec/01-unit/01-db/10-declarative_spec.lua @@ -48,16 +48,19 @@ keyauth_credentials: describe("unique_field_key()", function() local unique_field_key = declarative.unique_field_key + local sha256_hex = require("kong.tools.sha256").sha256_hex it("utilizes the schema name, workspace id, field name, and checksum of the field value", function() local key = unique_field_key("services", "123", "fieldname", "test", false) assert.is_string(key) - assert.equals("services|123|fieldname:test", key) + assert.equals("services|123|fieldname|" .. sha256_hex("test"), key) end) - it("omits the workspace id when 'unique_across_ws' is 'true'", function() + -- since incremental sync the param `unique_across_ws` is useless + -- this test case is just for compatibility + it("does not omits the workspace id when 'unique_across_ws' is 'true'", function() local key = unique_field_key("services", "123", "fieldname", "test", true) - assert.equals("services||fieldname:test", key) + assert.equals("services|123|fieldname|" .. sha256_hex("test"), key) end) end) diff --git a/spec/01-unit/01-db/11-declarative_lmdb_spec.lua b/spec/01-unit/01-db/11-declarative_lmdb_spec.lua index 42756078b8ac..6fbe9181c967 100644 --- a/spec/01-unit/01-db/11-declarative_lmdb_spec.lua +++ b/spec/01-unit/01-db/11-declarative_lmdb_spec.lua @@ -201,11 +201,13 @@ describe("#off preserve nulls", function() assert(declarative.load_into_cache(entities, meta, current_hash)) local id, item = next(entities.basicauth_credentials) + + -- format changed after incremental sync local cache_key = concat({ - "basicauth_credentials:", - id, - ":::::", - item.ws_id + "basicauth_credentials|", + item.ws_id, + "|*|", + id }) local lmdb = require "resty.lmdb" @@ -222,17 +224,23 @@ describe("#off preserve nulls", function() for _, plugin in pairs(entities.plugins) do if plugin.name == PLUGIN_NAME then + + -- format changed after incremental sync cache_key = concat({ - "plugins:" .. PLUGIN_NAME .. ":", + "plugins|", + plugin.ws_id, + "|route|", plugin.route.id, - "::::", - plugin.ws_id + "|", + plugin.id }) value, err, hit_lvl = lmdb.get(cache_key) assert.is_nil(err) assert.are_equal(hit_lvl, 1) - cached_item = buffer.decode(value) + -- get value by the index key + cached_item = buffer.decode(lmdb.get(value)) + assert.are_same(cached_item, plugin) assert.are_equal(cached_item.config.large, null) assert.are_equal(cached_item.config.ttl, null) diff --git a/spec/01-unit/04-prefix_handler_spec.lua b/spec/01-unit/04-prefix_handler_spec.lua index c1e36f8060f0..226949ff8b96 100644 --- a/spec/01-unit/04-prefix_handler_spec.lua +++ b/spec/01-unit/04-prefix_handler_spec.lua @@ -309,8 +309,7 @@ describe("NGINX conf compiler", function() assert.not_matches("ssl_dhparam", kong_nginx_conf) end) - -- TODO: enable when cluster RPC is GA - pending("renders RPC server", function() + it("renders RPC server", function() local conf = assert(conf_loader(helpers.test_conf_path, { role = "control_plane", cluster_cert = "spec/fixtures/kong_clustering.crt", @@ -322,20 +321,6 @@ describe("NGINX conf compiler", function() assert.matches("location = /v2/outlet {", kong_nginx_conf) end) - -- TODO: remove when cluster RPC is GA - it("does not render RPC server, even when cluster_rpc enabled", function() - local conf = assert(conf_loader(helpers.test_conf_path, { - role = "control_plane", - cluster_cert = "spec/fixtures/kong_clustering.crt", - cluster_cert_key = "spec/fixtures/kong_clustering.key", - cluster_rpc = "on", - cluster_listen = "127.0.0.1:9005", - nginx_conf = "spec/fixtures/custom_nginx.template", - })) - local kong_nginx_conf = prefix_handler.compile_kong_conf(conf) - assert.matches("location = /v2/outlet {", kong_nginx_conf) - end) - it("does not renders RPC server when inert", function() local conf = assert(conf_loader(helpers.test_conf_path, { role = "control_plane", diff --git a/spec/02-integration/04-admin_api/15-off_spec.lua b/spec/02-integration/04-admin_api/15-off_spec.lua index cfc6102ed516..554b445fcedf 100644 --- a/spec/02-integration/04-admin_api/15-off_spec.lua +++ b/spec/02-integration/04-admin_api/15-off_spec.lua @@ -3100,15 +3100,37 @@ describe("Admin API #off with Unique Foreign #unique", function() assert.equal(references.data[1].note, "note") assert.equal(references.data[1].unique_foreign.id, foreigns.data[1].id) + -- get default workspace id in lmdb + local cmd = string.format( + [[resty --main-conf "lmdb_environment_path %s/%s;" spec/fixtures/dump_lmdb_key.lua %q]], + TEST_CONF.prefix, TEST_CONF.lmdb_environment_path, + require("kong.constants").DECLARATIVE_DEFAULT_WORKSPACE_KEY) + + local handle = io.popen(cmd) + local ws_id = handle:read("*a") + handle:close() + + -- get unique_field_key local declarative = require "kong.db.declarative" - local key = declarative.unique_field_key("unique_references", "", "unique_foreign", + local key = declarative.unique_field_key("unique_references", ws_id, "unique_foreign", foreigns.data[1].id, true) - local cmd = string.format( [[resty --main-conf "lmdb_environment_path %s/%s;" spec/fixtures/dump_lmdb_key.lua %q]], TEST_CONF.prefix, TEST_CONF.lmdb_environment_path, key) + local handle = io.popen(cmd) + local unique_field_key = handle:read("*a") + handle:close() + + assert.is_string(unique_field_key, "non-string result from unique lookup") + assert.not_equals("", unique_field_key, "empty result from unique lookup") + + -- get the entity value + local cmd = string.format( + [[resty --main-conf "lmdb_environment_path %s/%s;" spec/fixtures/dump_lmdb_key.lua %q]], + TEST_CONF.prefix, TEST_CONF.lmdb_environment_path, unique_field_key) + local handle = io.popen(cmd) local result = handle:read("*a") handle:close() @@ -3116,11 +3138,15 @@ describe("Admin API #off with Unique Foreign #unique", function() assert.not_equals("", result, "empty result from unique lookup") local cached_reference = assert(require("kong.db.declarative.marshaller").unmarshall(result)) + + -- NOTE: we have changed internl LDMB storage format, and dao does not has this field(ws_id) + cached_reference.ws_id = nil + assert.same(cached_reference, references.data[1]) local cache = { get = function(_, k) - if k ~= "unique_references||unique_foreign:" .. foreigns.data[1].id then + if k ~= "unique_references|" ..ws_id .. "|unique_foreign:" .. foreigns.data[1].id then return nil end diff --git a/spec/02-integration/07-sdk/03-cluster_spec.lua b/spec/02-integration/07-sdk/03-cluster_spec.lua index b7af4481cf53..55d27beb732b 100644 --- a/spec/02-integration/07-sdk/03-cluster_spec.lua +++ b/spec/02-integration/07-sdk/03-cluster_spec.lua @@ -40,8 +40,9 @@ fixtures_cp.http_mock.my_server_block = [[ } ]] +for _, inc_sync in ipairs { "on", "off" } do for _, strategy in helpers.each_strategy() do - describe("PDK: kong.cluster for #" .. strategy, function() + describe("PDK: kong.cluster for #" .. strategy .. " inc_sync=" .. inc_sync, function() local proxy_client lazy_setup(function() @@ -61,6 +62,7 @@ for _, strategy in helpers.each_strategy() do db_update_frequency = 0.1, cluster_listen = "127.0.0.1:9005", nginx_conf = "spec/fixtures/custom_nginx.template", + cluster_incremental_sync = inc_sync, }, nil, nil, fixtures_cp)) assert(helpers.start_kong({ @@ -72,6 +74,7 @@ for _, strategy in helpers.each_strategy() do cluster_control_plane = "127.0.0.1:9005", proxy_listen = "0.0.0.0:9002", nginx_conf = "spec/fixtures/custom_nginx.template", + cluster_incremental_sync = inc_sync, }, nil, nil, fixtures_dp)) end) @@ -108,4 +111,5 @@ for _, strategy in helpers.each_strategy() do end, 10) end) end) -end +end -- for _, strategy +end -- for inc_sync diff --git a/spec/02-integration/09-hybrid_mode/01-sync_spec.lua b/spec/02-integration/09-hybrid_mode/01-sync_spec.lua index 71bb46ead473..a1d2f9b37641 100644 --- a/spec/02-integration/09-hybrid_mode/01-sync_spec.lua +++ b/spec/02-integration/09-hybrid_mode/01-sync_spec.lua @@ -12,9 +12,11 @@ local uuid = require("kong.tools.uuid").uuid local KEY_AUTH_PLUGIN +--- XXX FIXME: enable inc_sync = on +for _, inc_sync in ipairs { "off" } do for _, strategy in helpers.each_strategy() do -describe("CP/DP communication #" .. strategy, function() +describe("CP/DP communication #" .. strategy .. " inc_sync=" .. inc_sync, function() lazy_setup(function() helpers.get_db_utils(strategy) -- runs migrations @@ -27,6 +29,7 @@ describe("CP/DP communication #" .. strategy, function() db_update_frequency = 0.1, cluster_listen = "127.0.0.1:9005", nginx_conf = "spec/fixtures/custom_nginx.template", + cluster_incremental_sync = inc_sync, })) assert(helpers.start_kong({ @@ -38,6 +41,7 @@ describe("CP/DP communication #" .. strategy, function() cluster_control_plane = "127.0.0.1:9005", proxy_listen = "0.0.0.0:9002", nginx_conf = "spec/fixtures/custom_nginx.template", + cluster_incremental_sync = inc_sync, })) for _, plugin in ipairs(helpers.get_plugins_list()) do @@ -263,7 +267,13 @@ describe("CP/DP communication #" .. strategy, function() method = "GET", path = "/soon-to-be-disabled", })) - assert.res_status(404, res) + + if inc_sync == "on" then + -- XXX incremental sync does not skip_disabled_services by default + assert.res_status(200, res) + else + assert.res_status(404, res) + end proxy_client:close() end) @@ -357,6 +367,7 @@ describe("CP/DP #version check #" .. strategy, function() cluster_listen = "127.0.0.1:9005", nginx_conf = "spec/fixtures/custom_nginx.template", cluster_version_check = "major_minor", + cluster_incremental_sync = inc_sync, })) for _, plugin in ipairs(helpers.get_plugins_list()) do @@ -624,6 +635,7 @@ describe("CP/DP config sync #" .. strategy, function() database = strategy, db_update_frequency = 3, cluster_listen = "127.0.0.1:9005", + cluster_incremental_sync = inc_sync, })) assert(helpers.start_kong({ @@ -634,6 +646,7 @@ describe("CP/DP config sync #" .. strategy, function() cluster_cert_key = "spec/fixtures/kong_clustering.key", cluster_control_plane = "127.0.0.1:9005", proxy_listen = "0.0.0.0:9002", + cluster_incremental_sync = inc_sync, })) end) @@ -736,6 +749,7 @@ describe("CP/DP labels #" .. strategy, function() db_update_frequency = 0.1, cluster_listen = "127.0.0.1:9005", nginx_conf = "spec/fixtures/custom_nginx.template", + cluster_incremental_sync = inc_sync, })) assert(helpers.start_kong({ @@ -748,6 +762,7 @@ describe("CP/DP labels #" .. strategy, function() proxy_listen = "0.0.0.0:9002", nginx_conf = "spec/fixtures/custom_nginx.template", cluster_dp_labels="deployment:mycloud,region:us-east-1", + cluster_incremental_sync = inc_sync, })) end) @@ -796,6 +811,7 @@ describe("CP/DP cert details(cluster_mtls = shared) #" .. strategy, function() db_update_frequency = 0.1, cluster_listen = "127.0.0.1:9005", nginx_conf = "spec/fixtures/custom_nginx.template", + cluster_incremental_sync = inc_sync, })) assert(helpers.start_kong({ @@ -808,6 +824,7 @@ describe("CP/DP cert details(cluster_mtls = shared) #" .. strategy, function() proxy_listen = "0.0.0.0:9002", nginx_conf = "spec/fixtures/custom_nginx.template", cluster_dp_labels="deployment:mycloud,region:us-east-1", + cluster_incremental_sync = inc_sync, })) end) @@ -854,6 +871,7 @@ describe("CP/DP cert details(cluster_mtls = pki) #" .. strategy, function() -- additional attributes for PKI: cluster_mtls = "pki", cluster_ca_cert = "spec/fixtures/kong_clustering_ca.crt", + cluster_incremental_sync = inc_sync, })) assert(helpers.start_kong({ @@ -869,6 +887,7 @@ describe("CP/DP cert details(cluster_mtls = pki) #" .. strategy, function() cluster_mtls = "pki", cluster_server_name = "kong_clustering", cluster_ca_cert = "spec/fixtures/kong_clustering.crt", + cluster_incremental_sync = inc_sync, })) end) @@ -900,4 +919,5 @@ describe("CP/DP cert details(cluster_mtls = pki) #" .. strategy, function() end) end) -end +end -- for _, strategy +end -- for inc_sync diff --git a/spec/02-integration/09-hybrid_mode/03-pki_spec.lua b/spec/02-integration/09-hybrid_mode/03-pki_spec.lua index 00d8b483cdc1..ec6912d07603 100644 --- a/spec/02-integration/09-hybrid_mode/03-pki_spec.lua +++ b/spec/02-integration/09-hybrid_mode/03-pki_spec.lua @@ -2,9 +2,10 @@ local helpers = require "spec.helpers" local cjson = require "cjson.safe" +for _, inc_sync in ipairs { "on", "off" } do for _, strategy in helpers.each_strategy() do -describe("CP/DP PKI sync #" .. strategy, function() +describe("CP/DP PKI sync #" .. strategy .. " inc_sync=" .. inc_sync, function() lazy_setup(function() helpers.get_db_utils(strategy, { @@ -25,6 +26,7 @@ describe("CP/DP PKI sync #" .. strategy, function() -- additional attributes for PKI: cluster_mtls = "pki", cluster_ca_cert = "spec/fixtures/kong_clustering_ca.crt", + cluster_incremental_sync = inc_sync, })) assert(helpers.start_kong({ @@ -40,6 +42,7 @@ describe("CP/DP PKI sync #" .. strategy, function() cluster_mtls = "pki", cluster_server_name = "kong_clustering", cluster_ca_cert = "spec/fixtures/kong_clustering.crt", + cluster_incremental_sync = inc_sync, })) end) @@ -88,9 +91,12 @@ describe("CP/DP PKI sync #" .. strategy, function() end) describe("sync works", function() + -- XXX FIXME + local skip_inc_sync = inc_sync == "on" and pending or it + local route_id - it("proxy on DP follows CP config", function() + skip_inc_sync("proxy on DP follows CP config", function() local admin_client = helpers.admin_client(10000) finally(function() admin_client:close() @@ -127,7 +133,7 @@ describe("CP/DP PKI sync #" .. strategy, function() end, 10) end) - it("cache invalidation works on config change", function() + skip_inc_sync("cache invalidation works on config change", function() local admin_client = helpers.admin_client() finally(function() admin_client:close() @@ -158,4 +164,5 @@ describe("CP/DP PKI sync #" .. strategy, function() end) end) -end +end -- for _, strategy +end -- for inc_sync diff --git a/spec/02-integration/09-hybrid_mode/04-cp_cluster_sync_spec.lua b/spec/02-integration/09-hybrid_mode/04-cp_cluster_sync_spec.lua index 2e593dd885fd..da00efcf6d80 100644 --- a/spec/02-integration/09-hybrid_mode/04-cp_cluster_sync_spec.lua +++ b/spec/02-integration/09-hybrid_mode/04-cp_cluster_sync_spec.lua @@ -19,8 +19,10 @@ local function find_in_file(filepath, pat) return found end + +for _, inc_sync in ipairs { "on", "off" } do for _, strategy in helpers.each_strategy() do - describe("CP/CP sync works with #" .. strategy .. " backend", function() + describe("CP/CP sync works with #" .. strategy .. " inc_sync=" .. inc_sync .. " backend", function() lazy_setup(function() helpers.get_db_utils(strategy, { "routes", "services" }) @@ -34,6 +36,7 @@ for _, strategy in helpers.each_strategy() do cluster_cert = "spec/fixtures/kong_clustering.crt", cluster_cert_key = "spec/fixtures/kong_clustering.key", database = strategy, + cluster_incremental_sync = inc_sync, })) assert(helpers.start_kong({ @@ -46,6 +49,7 @@ for _, strategy in helpers.each_strategy() do cluster_cert = "spec/fixtures/kong_clustering.crt", cluster_cert_key = "spec/fixtures/kong_clustering.key", database = strategy, + cluster_incremental_sync = inc_sync, })) end) @@ -77,4 +81,5 @@ for _, strategy in helpers.each_strategy() do end, 10) end) end) -end +end -- for _, strategy +end -- for inc_sync diff --git a/spec/02-integration/09-hybrid_mode/05-ocsp_spec.lua b/spec/02-integration/09-hybrid_mode/05-ocsp_spec.lua index d297a6ab6b81..254b09555f84 100644 --- a/spec/02-integration/09-hybrid_mode/05-ocsp_spec.lua +++ b/spec/02-integration/09-hybrid_mode/05-ocsp_spec.lua @@ -14,9 +14,10 @@ local function set_ocsp_status(status) end +for _, inc_sync in ipairs { "on", "off" } do for _, strategy in helpers.each_strategy() do -describe("cluster_ocsp = on works #" .. strategy, function() +describe("cluster_ocsp = on works #" .. strategy .. " inc_sync=" .. inc_sync, function() describe("DP certificate good", function() lazy_setup(function() helpers.get_db_utils(strategy, { @@ -40,6 +41,7 @@ describe("cluster_ocsp = on works #" .. strategy, function() -- additional attributes for PKI: cluster_mtls = "pki", cluster_ca_cert = "spec/fixtures/ocsp_certs/ca.crt", + cluster_incremental_sync = inc_sync, })) set_ocsp_status("good") @@ -57,6 +59,7 @@ describe("cluster_ocsp = on works #" .. strategy, function() cluster_mtls = "pki", cluster_server_name = "kong_clustering", cluster_ca_cert = "spec/fixtures/ocsp_certs/ca.crt", + cluster_incremental_sync = inc_sync, })) end) @@ -110,6 +113,7 @@ describe("cluster_ocsp = on works #" .. strategy, function() -- additional attributes for PKI: cluster_mtls = "pki", cluster_ca_cert = "spec/fixtures/ocsp_certs/ca.crt", + cluster_incremental_sync = inc_sync, })) set_ocsp_status("revoked") @@ -127,6 +131,7 @@ describe("cluster_ocsp = on works #" .. strategy, function() cluster_server_name = "kong_clustering", cluster_ca_cert = "spec/fixtures/ocsp_certs/ca.crt", nginx_conf = "spec/fixtures/custom_nginx.template", + cluster_incremental_sync = inc_sync, })) end) @@ -178,6 +183,7 @@ describe("cluster_ocsp = on works #" .. strategy, function() -- additional attributes for PKI: cluster_mtls = "pki", cluster_ca_cert = "spec/fixtures/ocsp_certs/ca.crt", + cluster_incremental_sync = inc_sync, })) set_ocsp_status("error") @@ -195,6 +201,7 @@ describe("cluster_ocsp = on works #" .. strategy, function() cluster_server_name = "kong_clustering", cluster_ca_cert = "spec/fixtures/ocsp_certs/ca.crt", nginx_conf = "spec/fixtures/custom_nginx.template", + cluster_incremental_sync = inc_sync, })) end) @@ -225,7 +232,7 @@ describe("cluster_ocsp = on works #" .. strategy, function() end) end) -describe("cluster_ocsp = off works with #" .. strategy .. " backend", function() +describe("cluster_ocsp = off works with #" .. strategy .. " inc_sync=" .. inc_sync .. " backend", function() describe("DP certificate revoked, not checking for OCSP", function() lazy_setup(function() helpers.get_db_utils(strategy, { @@ -249,6 +256,7 @@ describe("cluster_ocsp = off works with #" .. strategy .. " backend", function() -- additional attributes for PKI: cluster_mtls = "pki", cluster_ca_cert = "spec/fixtures/ocsp_certs/ca.crt", + cluster_incremental_sync = inc_sync, })) set_ocsp_status("revoked") @@ -266,6 +274,7 @@ describe("cluster_ocsp = off works with #" .. strategy .. " backend", function() cluster_mtls = "pki", cluster_server_name = "kong_clustering", cluster_ca_cert = "spec/fixtures/ocsp_certs/ca.crt", + cluster_incremental_sync = inc_sync, })) end) @@ -297,7 +306,7 @@ describe("cluster_ocsp = off works with #" .. strategy .. " backend", function() end) end) -describe("cluster_ocsp = optional works with #" .. strategy .. " backend", function() +describe("cluster_ocsp = optional works with #" .. strategy .. " inc_sync=" .. inc_sync .. " backend", function() describe("DP certificate revoked", function() lazy_setup(function() helpers.get_db_utils(strategy, { @@ -321,6 +330,7 @@ describe("cluster_ocsp = optional works with #" .. strategy .. " backend", funct -- additional attributes for PKI: cluster_mtls = "pki", cluster_ca_cert = "spec/fixtures/ocsp_certs/ca.crt", + cluster_incremental_sync = inc_sync, })) set_ocsp_status("revoked") @@ -338,6 +348,7 @@ describe("cluster_ocsp = optional works with #" .. strategy .. " backend", funct cluster_server_name = "kong_clustering", cluster_ca_cert = "spec/fixtures/ocsp_certs/ca.crt", nginx_conf = "spec/fixtures/custom_nginx.template", + cluster_incremental_sync = inc_sync, })) end) @@ -389,6 +400,7 @@ describe("cluster_ocsp = optional works with #" .. strategy .. " backend", funct -- additional attributes for PKI: cluster_mtls = "pki", cluster_ca_cert = "spec/fixtures/ocsp_certs/ca.crt", + cluster_incremental_sync = inc_sync, })) set_ocsp_status("error") @@ -406,6 +418,7 @@ describe("cluster_ocsp = optional works with #" .. strategy .. " backend", funct cluster_server_name = "kong_clustering", cluster_ca_cert = "spec/fixtures/ocsp_certs/ca.crt", nginx_conf = "spec/fixtures/custom_nginx.template", + cluster_incremental_sync = inc_sync, })) end) @@ -440,4 +453,5 @@ describe("cluster_ocsp = optional works with #" .. strategy .. " backend", funct end) end) -end +end -- for _, strategy +end -- for inc_sync diff --git a/spec/02-integration/09-hybrid_mode/08-lazy_export_spec.lua b/spec/02-integration/09-hybrid_mode/08-lazy_export_spec.lua index 35a25a5b3ad0..b4fedacb0849 100644 --- a/spec/02-integration/09-hybrid_mode/08-lazy_export_spec.lua +++ b/spec/02-integration/09-hybrid_mode/08-lazy_export_spec.lua @@ -2,7 +2,7 @@ local helpers = require "spec.helpers" local admin_client -local function cp(strategy) +local function cp(strategy, inc_sync) helpers.get_db_utils(strategy) -- make sure the DB is fresh n' clean assert(helpers.start_kong({ role = "control_plane", @@ -14,6 +14,7 @@ local function cp(strategy) -- additional attributes for PKI: cluster_mtls = "pki", cluster_ca_cert = "spec/fixtures/ocsp_certs/ca.crt", + cluster_incremental_sync = inc_sync, })) admin_client = assert(helpers.admin_client()) end @@ -34,7 +35,7 @@ local function touch_config() })) end -local function json_dp() +local function json_dp(inc_sync) assert(helpers.start_kong({ role = "data_plane", database = "off", @@ -47,30 +48,37 @@ local function json_dp() cluster_mtls = "pki", cluster_server_name = "kong_clustering", cluster_ca_cert = "spec/fixtures/ocsp_certs/ca.crt", + cluster_incremental_sync = inc_sync, })) end +for _, inc_sync in ipairs { "on", "off" } do for _, strategy in helpers.each_strategy() do -describe("lazy_export with #".. strategy, function() +describe("lazy_export with #".. strategy .. " inc_sync=" .. inc_sync, function() describe("no DP", function () setup(function() - cp(strategy) + cp(strategy, inc_sync) end) teardown(function () helpers.stop_kong() end) it("test", function () touch_config() - assert.logfile().has.line("[clustering] skipping config push (no connected clients)", true) + if inc_sync == "on" then + assert.logfile().has.no.line("[kong.sync.v2] config push (connected client)", true) + + else + assert.logfile().has.line("[clustering] skipping config push (no connected clients)", true) + end end) end) describe("only json DP", function() setup(function() - cp(strategy) - json_dp() + cp(strategy, inc_sync) + json_dp(inc_sync) end) teardown(function () helpers.stop_kong("dp1") @@ -79,11 +87,18 @@ describe("lazy_export with #".. strategy, function() it("test", function () touch_config() - assert.logfile().has.line("[clustering] exporting config", true) - assert.logfile().has.line("[clustering] config pushed to 1 data-plane nodes", true) + if inc_sync == "on" then + assert.logfile().has.line("[kong.sync.v2] config push (connected client)", true) + assert.logfile().has.line("[kong.sync.v2] database is empty or too far behind for node_id", true) + + else + assert.logfile().has.line("[clustering] exporting config", true) + assert.logfile().has.line("[clustering] config pushed to 1 data-plane nodes", true) + end end) end) end) -end +end -- for _, strategy +end -- for inc_sync diff --git a/spec/02-integration/09-hybrid_mode/09-config-compat_spec.lua b/spec/02-integration/09-hybrid_mode/09-config-compat_spec.lua index 6d9f2842c6fe..8c2b26fba41a 100644 --- a/spec/02-integration/09-hybrid_mode/09-config-compat_spec.lua +++ b/spec/02-integration/09-hybrid_mode/09-config-compat_spec.lua @@ -76,6 +76,8 @@ local function get_sync_status(id) end +-- XXX TODO: helpers.clustering_client supports incremental sync +for _, inc_sync in ipairs { "off" } do for _, strategy in helpers.each_strategy() do describe("CP/DP config compat transformations #" .. strategy, function() @@ -101,6 +103,7 @@ describe("CP/DP config compat transformations #" .. strategy, function() cluster_listen = CP_HOST .. ":" .. CP_PORT, nginx_conf = "spec/fixtures/custom_nginx.template", plugins = "bundled", + cluster_incremental_sync = inc_sync, })) end) @@ -1198,3 +1201,4 @@ describe("CP/DP config compat transformations #" .. strategy, function() end) end -- each strategy +end -- for inc_sync diff --git a/spec/02-integration/09-hybrid_mode/09-node-id-persistence_spec.lua b/spec/02-integration/09-hybrid_mode/09-node-id-persistence_spec.lua index cd67cd502c27..02b67914c0f8 100644 --- a/spec/02-integration/09-hybrid_mode/09-node-id-persistence_spec.lua +++ b/spec/02-integration/09-hybrid_mode/09-node-id-persistence_spec.lua @@ -83,8 +83,10 @@ local function start_kong_debug(env) end +--- XXX FIXME: enable inc_sync = on +for _, inc_sync in ipairs { "off" } do for _, strategy in helpers.each_strategy() do - describe("node id persistence", function() + describe("node id persistence " .. " inc_sync=" .. inc_sync, function() local control_plane_config = { role = "control_plane", @@ -93,6 +95,7 @@ for _, strategy in helpers.each_strategy() do cluster_cert_key = "spec/fixtures/kong_clustering.key", cluster_listen = "127.0.0.1:9005", nginx_conf = "spec/fixtures/custom_nginx.template", + cluster_incremental_sync = inc_sync, } local data_plane_config = { @@ -107,6 +110,7 @@ for _, strategy in helpers.each_strategy() do database = "off", untrusted_lua = "on", nginx_conf = "spec/fixtures/custom_nginx.template", + cluster_incremental_sync = inc_sync, } local admin_client @@ -322,4 +326,5 @@ for _, strategy in helpers.each_strategy() do end) end) -end +end -- for _, strategy +end -- for inc_sync diff --git a/spec/02-integration/09-hybrid_mode/10-forward-proxy_spec.lua b/spec/02-integration/09-hybrid_mode/10-forward-proxy_spec.lua index f4f175550bb8..a7f11e41059e 100644 --- a/spec/02-integration/09-hybrid_mode/10-forward-proxy_spec.lua +++ b/spec/02-integration/09-hybrid_mode/10-forward-proxy_spec.lua @@ -71,9 +71,11 @@ local proxy_configs = { -- if existing lmdb data is set, the service/route exists and -- test run too fast before the proxy connection is established +-- XXX FIXME: enable inc_sync = on +for _, inc_sync in ipairs { "off" } do for _, strategy in helpers.each_strategy() do for proxy_desc, proxy_opts in pairs(proxy_configs) do - describe("CP/DP sync through proxy (" .. proxy_desc .. ") works with #" .. strategy .. " backend", function() + describe("CP/DP sync through proxy (" .. proxy_desc .. ") works with #" .. strategy .. " inc_sync=" .. inc_sync .. " backend", function() lazy_setup(function() helpers.get_db_utils(strategy) -- runs migrations @@ -85,6 +87,7 @@ for _, strategy in helpers.each_strategy() do db_update_frequency = 0.1, cluster_listen = "127.0.0.1:9005", nginx_conf = "spec/fixtures/custom_nginx.template", + cluster_incremental_sync = inc_sync, })) assert(helpers.start_kong({ @@ -105,6 +108,8 @@ for _, strategy in helpers.each_strategy() do proxy_server_ssl_verify = proxy_opts.proxy_server_ssl_verify, lua_ssl_trusted_certificate = proxy_opts.lua_ssl_trusted_certificate, + cluster_incremental_sync = inc_sync, + -- this is unused, but required for the template to include a stream {} block stream_listen = "0.0.0.0:5555", }, nil, nil, fixtures)) @@ -166,4 +171,5 @@ for _, strategy in helpers.each_strategy() do end) end -- proxy configs -end +end -- for _, strategy +end -- for inc_sync diff --git a/spec/02-integration/09-hybrid_mode/11-status_spec.lua b/spec/02-integration/09-hybrid_mode/11-status_spec.lua index 02b2abff9c59..5c4386847705 100644 --- a/spec/02-integration/09-hybrid_mode/11-status_spec.lua +++ b/spec/02-integration/09-hybrid_mode/11-status_spec.lua @@ -4,9 +4,10 @@ local helpers = require "spec.helpers" local cp_status_port = helpers.get_available_port() local dp_status_port = 8100 +for _, inc_sync in ipairs { "on", "off" } do for _, strategy in helpers.each_strategy() do - describe("Hybrid Mode - status ready #" .. strategy, function() + describe("Hybrid Mode - status ready #" .. strategy .. " inc_sync=" .. inc_sync, function() helpers.get_db_utils(strategy, {}) @@ -21,6 +22,7 @@ for _, strategy in helpers.each_strategy() do proxy_listen = "127.0.0.1:9002", nginx_main_worker_processes = 8, status_listen = "127.0.0.1:" .. dp_status_port, + cluster_incremental_sync = inc_sync, }) end @@ -34,6 +36,7 @@ for _, strategy in helpers.each_strategy() do cluster_listen = "127.0.0.1:9005", nginx_conf = "spec/fixtures/custom_nginx.template", status_listen = "127.0.0.1:" .. cp_status_port, + cluster_incremental_sync = inc_sync, }) end @@ -67,6 +70,8 @@ for _, strategy in helpers.each_strategy() do end) describe("dp status ready endpoint for no config", function() + -- XXX FIXME + local skip_inc_sync = inc_sync == "on" and pending or it lazy_setup(function() assert(start_kong_cp()) @@ -99,7 +104,7 @@ for _, strategy in helpers.each_strategy() do -- now dp receive config from cp, so dp should be ready - it("should return 200 on data plane after configuring", function() + skip_inc_sync("should return 200 on data plane after configuring", function() helpers.wait_until(function() local http_client = helpers.http_client('127.0.0.1', dp_status_port) @@ -156,4 +161,5 @@ for _, strategy in helpers.each_strategy() do end) end) -end +end -- for _, strategy +end -- for inc_sync diff --git a/spec/02-integration/09-hybrid_mode/12-errors_spec.lua b/spec/02-integration/09-hybrid_mode/12-errors_spec.lua index 98755b6a9e14..fbbc3049cd55 100644 --- a/spec/02-integration/09-hybrid_mode/12-errors_spec.lua +++ b/spec/02-integration/09-hybrid_mode/12-errors_spec.lua @@ -69,8 +69,10 @@ local function get_error_report(client, msg) end +-- XXX TODO: mock_cp does not support incremental sync rpc +for _, inc_sync in ipairs { "off" } do for _, strategy in helpers.each_strategy() do - describe("CP/DP sync error-reporting with #" .. strategy .. " backend", function() + describe("CP/DP sync error-reporting with #" .. strategy .. " inc_sync=" .. inc_sync .. " backend", function() local client local cluster_port local cluster_ssl_port @@ -100,6 +102,7 @@ for _, strategy in helpers.each_strategy() do -- use a small map size so that it's easy for us to max it out lmdb_map_size = "1m", plugins = "bundled,cluster-error-reporting", + cluster_incremental_sync = inc_sync, }, nil, nil, fixtures)) end) @@ -256,4 +259,5 @@ for _, strategy in helpers.each_strategy() do assert.equals("map full", e.error.message) end) end) -end +end -- for _, strategy +end -- for inc_sync diff --git a/spec/02-integration/09-hybrid_mode/13-deprecations_spec.lua b/spec/02-integration/09-hybrid_mode/13-deprecations_spec.lua index c19792ead117..8ddf85e80b36 100644 --- a/spec/02-integration/09-hybrid_mode/13-deprecations_spec.lua +++ b/spec/02-integration/09-hybrid_mode/13-deprecations_spec.lua @@ -3,8 +3,9 @@ local join = require("pl.stringx").join local ENABLED_PLUGINS = { "dummy" , "reconfiguration-completion"} +for _, inc_sync in ipairs { "on", "off" } do for _, strategy in helpers.each_strategy({"postgres"}) do - describe("deprecations are not reported on DP but on CP", function() + describe("deprecations are not reported on DP but on CP " .. " inc_sync=" .. inc_sync, function() local cp_prefix = "servroot1" local dp_prefix = "servroot2" local cp_logfile, dp_logfile, route @@ -41,6 +42,7 @@ for _, strategy in helpers.each_strategy({"postgres"}) do nginx_conf = "spec/fixtures/custom_nginx.template", admin_listen = "0.0.0.0:9001", proxy_listen = "off", + cluster_incremental_sync = inc_sync, })) assert(helpers.start_kong({ @@ -55,6 +57,7 @@ for _, strategy in helpers.each_strategy({"postgres"}) do plugins = "bundled," .. join(",", ENABLED_PLUGINS), admin_listen = "off", proxy_listen = "0.0.0.0:9002", + cluster_incremental_sync = inc_sync, })) dp_logfile = helpers.get_running_conf(dp_prefix).nginx_err_logs cp_logfile = helpers.get_running_conf(cp_prefix).nginx_err_logs @@ -111,4 +114,5 @@ for _, strategy in helpers.each_strategy({"postgres"}) do end) end) end) -end +end -- for _, strategy +end -- for inc_sync diff --git a/spec/02-integration/18-hybrid_rpc/01-rpc_spec.lua b/spec/02-integration/18-hybrid_rpc/01-rpc_spec.lua index c706b0824bca..21eda945c6d7 100644 --- a/spec/02-integration/18-hybrid_rpc/01-rpc_spec.lua +++ b/spec/02-integration/18-hybrid_rpc/01-rpc_spec.lua @@ -1,8 +1,9 @@ local helpers = require "spec.helpers" local cjson = require("cjson.safe") +for _, inc_sync in ipairs { "on", "off" } do for _, strategy in helpers.each_strategy() do - describe("Hybrid Mode RPC #" .. strategy, function() + describe("Hybrid Mode RPC #" .. strategy .. " inc_sync=" .. inc_sync, function() lazy_setup(function() helpers.get_db_utils(strategy, { @@ -15,8 +16,8 @@ for _, strategy in helpers.each_strategy() do cluster_cert_key = "spec/fixtures/kong_clustering.key", database = strategy, cluster_listen = "127.0.0.1:9005", - cluster_rpc = "on", nginx_conf = "spec/fixtures/custom_nginx.template", + cluster_incremental_sync = inc_sync, -- incremental sync })) assert(helpers.start_kong({ @@ -26,9 +27,9 @@ for _, strategy in helpers.each_strategy() do cluster_cert = "spec/fixtures/kong_clustering.crt", cluster_cert_key = "spec/fixtures/kong_clustering.key", cluster_control_plane = "127.0.0.1:9005", - cluster_rpc = "on", proxy_listen = "0.0.0.0:9002", nginx_conf = "spec/fixtures/custom_nginx.template", + cluster_incremental_sync = inc_sync, -- incremental sync })) end) @@ -38,32 +39,7 @@ for _, strategy in helpers.each_strategy() do end) describe("status API", function() - -- TODO: remove this test once cluster RPC is GA - it("no DR RPC capabilities exist", function() - -- This should time out, we expect no RPC capabilities - local status = pcall(helpers.wait_until, function() - local admin_client = helpers.admin_client() - finally(function() - admin_client:close() - end) - - local res = assert(admin_client:get("/clustering/data-planes")) - local body = assert.res_status(200, res) - local json = cjson.decode(body) - - for _, v in pairs(json.data) do - if v.ip == "127.0.0.1" and v.rpc_capabilities and #v.rpc_capabilities ~= 0 then - table.sort(v.rpc_capabilities) - assert.near(14 * 86400, v.ttl, 3) - assert.same({ "kong.debug.log_level.v1", }, v.rpc_capabilities) - return true - end - end - end, 10) - assert.is_false(status) - end) - - pending("shows DP RPC capability status", function() + it("shows DP RPC capability status", function() helpers.wait_until(function() local admin_client = helpers.admin_client() finally(function() @@ -78,7 +54,8 @@ for _, strategy in helpers.each_strategy() do if v.ip == "127.0.0.1" and v.rpc_capabilities and #v.rpc_capabilities ~= 0 then table.sort(v.rpc_capabilities) assert.near(14 * 86400, v.ttl, 3) - assert.same({ "kong.debug.log_level.v1", }, v.rpc_capabilities) + -- kong.debug.log_level.v1 should be the first rpc service + assert.same("kong.debug.log_level.v1", v.rpc_capabilities[1]) return true end end @@ -86,4 +63,5 @@ for _, strategy in helpers.each_strategy() do end) end) end) -end +end -- for _, strategy +end -- for inc_sync diff --git a/spec/02-integration/18-hybrid_rpc/02-log-level_spec.lua b/spec/02-integration/18-hybrid_rpc/02-log-level_spec.lua index 10ed37f52729..d53fc541dec0 100644 --- a/spec/02-integration/18-hybrid_rpc/02-log-level_spec.lua +++ b/spec/02-integration/18-hybrid_rpc/02-log-level_spec.lua @@ -27,8 +27,9 @@ local function obtain_dp_node_id() end +for _, inc_sync in ipairs { "on", "off" } do for _, strategy in helpers.each_strategy() do - describe("Hybrid Mode RPC #" .. strategy, function() + describe("Hybrid Mode RPC #" .. strategy .. " inc_sync=" .. inc_sync, function() lazy_setup(function() helpers.get_db_utils(strategy, { @@ -41,8 +42,8 @@ for _, strategy in helpers.each_strategy() do cluster_cert_key = "spec/fixtures/kong_clustering.key", database = strategy, cluster_listen = "127.0.0.1:9005", - cluster_rpc = "on", nginx_conf = "spec/fixtures/custom_nginx.template", + cluster_incremental_sync = inc_sync, -- incremental sync })) assert(helpers.start_kong({ @@ -52,9 +53,9 @@ for _, strategy in helpers.each_strategy() do cluster_cert = "spec/fixtures/kong_clustering.crt", cluster_cert_key = "spec/fixtures/kong_clustering.key", cluster_control_plane = "127.0.0.1:9005", - cluster_rpc = "on", proxy_listen = "0.0.0.0:9002", nginx_conf = "spec/fixtures/custom_nginx.template", + cluster_incremental_sync = inc_sync, -- incremental sync })) end) @@ -64,22 +65,7 @@ for _, strategy in helpers.each_strategy() do end) describe("Dynamic log level over RPC", function() - - -- TODO: remove when cluster RPC is GA - it("log level API is unavailable", function() - local dp_node_id = obtain_dp_node_id() - - local admin_client = helpers.admin_client() - finally(function() - admin_client:close() - end) - - local res = assert(admin_client:get("/clustering/data-planes/" .. dp_node_id .. "/log-level")) - assert.res_status(404, res) - end) - - -- TODO: enable when cluster RPC is GA - pending("can get the current log level", function() + it("can get the current log level", function() local dp_node_id = obtain_dp_node_id() local admin_client = helpers.admin_client() @@ -95,8 +81,7 @@ for _, strategy in helpers.each_strategy() do assert.equal("debug", json.original_level) end) - -- TODO: enable when cluster RPC is GA - pending("can set the current log level", function() + it("can set the current log level", function() local dp_node_id = obtain_dp_node_id() local admin_client = helpers.admin_client() @@ -124,8 +109,7 @@ for _, strategy in helpers.each_strategy() do assert.equal("debug", json.original_level) end) - -- TODO: enable when cluster RPC is GA - pending("set current log level to original_level turns off feature", function() + it("set current log level to original_level turns off feature", function() local dp_node_id = obtain_dp_node_id() local admin_client = helpers.admin_client() @@ -165,8 +149,7 @@ for _, strategy in helpers.each_strategy() do assert.equal("debug", json.original_level) end) - -- TODO: enable when cluster RPC is GA - pending("DELETE turns off feature", function() + it("DELETE turns off feature", function() local dp_node_id = obtain_dp_node_id() local admin_client = helpers.admin_client() @@ -198,4 +181,5 @@ for _, strategy in helpers.each_strategy() do end) end) end) -end +end -- for _, strategy +end -- for inc_sync diff --git a/spec/02-integration/18-hybrid_rpc/03-inert_spec.lua b/spec/02-integration/18-hybrid_rpc/03-inert_spec.lua index 4a6d73cf659c..d4ddac3a533f 100644 --- a/spec/02-integration/18-hybrid_rpc/03-inert_spec.lua +++ b/spec/02-integration/18-hybrid_rpc/03-inert_spec.lua @@ -41,7 +41,7 @@ for _, strategy in helpers.each_strategy() do cluster_cert_key = "spec/fixtures/kong_clustering.key", database = strategy, cluster_listen = "127.0.0.1:9005", - cluster_rpc = "off", + cluster_rpc = "off", -- disable rpc nginx_conf = "spec/fixtures/custom_nginx.template", })) @@ -52,7 +52,7 @@ for _, strategy in helpers.each_strategy() do cluster_cert = "spec/fixtures/kong_clustering.crt", cluster_cert_key = "spec/fixtures/kong_clustering.key", cluster_control_plane = "127.0.0.1:9005", - cluster_rpc = "off", + cluster_rpc = "off", -- disable rpc proxy_listen = "0.0.0.0:9002", nginx_conf = "spec/fixtures/custom_nginx.template", })) @@ -96,6 +96,21 @@ for _, strategy in helpers.each_strategy() do local res = assert(admin_client:get("/clustering/data-planes/" .. dp_node_id .. "/log-level")) assert.res_status(404, res) end) + + it("can not get DP RPC capability status", function() + local admin_client = helpers.admin_client() + finally(function() + admin_client:close() + end) + + local res = assert(admin_client:get("/clustering/data-planes")) + local body = assert.res_status(200, res) + local json = cjson.decode(body) + + for _, v in pairs(json.data) do + assert.equal(#v.rpc_capabilities, 0) + end + end) end) end) end diff --git a/spec/02-integration/18-hybrid_rpc/04-concentrator_spec.lua b/spec/02-integration/18-hybrid_rpc/04-concentrator_spec.lua index db7edcc5edb2..51c7a3b8a1a4 100644 --- a/spec/02-integration/18-hybrid_rpc/04-concentrator_spec.lua +++ b/spec/02-integration/18-hybrid_rpc/04-concentrator_spec.lua @@ -27,8 +27,9 @@ local function obtain_dp_node_id() end +for _, inc_sync in ipairs { "on", "off" } do for _, strategy in helpers.each_strategy() do - describe("Hybrid Mode RPC over DB concentrator #" .. strategy, function() + describe("Hybrid Mode RPC over DB concentrator #" .. strategy .. " inc_sync=" .. inc_sync, function() lazy_setup(function() helpers.get_db_utils(strategy, { @@ -43,6 +44,7 @@ for _, strategy in helpers.each_strategy() do cluster_listen = "127.0.0.1:9005", admin_listen = "127.0.0.1:" .. helpers.get_available_port(), nginx_conf = "spec/fixtures/custom_nginx.template", + cluster_incremental_sync = inc_sync, -- incremental sync })) assert(helpers.start_kong({ @@ -53,6 +55,7 @@ for _, strategy in helpers.each_strategy() do database = strategy, cluster_listen = "127.0.0.1:" .. helpers.get_available_port(), nginx_conf = "spec/fixtures/custom_nginx.template", + cluster_incremental_sync = inc_sync, -- incremental sync })) assert(helpers.start_kong({ @@ -64,6 +67,7 @@ for _, strategy in helpers.each_strategy() do cluster_control_plane = "127.0.0.1:9005", proxy_listen = "0.0.0.0:9002", nginx_conf = "spec/fixtures/custom_nginx.template", + cluster_incremental_sync = inc_sync, -- incremental sync })) end) @@ -74,7 +78,7 @@ for _, strategy in helpers.each_strategy() do end) describe("Dynamic log level over RPC", function() - pending("can get the current log level", function() + it("can get the current log level", function() local dp_node_id = obtain_dp_node_id() -- this sleep is *not* needed for the below wait_until to succeed, @@ -103,4 +107,5 @@ for _, strategy in helpers.each_strategy() do end) end) end) -end +end -- for _, strategy +end -- for inc_sync diff --git a/spec/02-integration/20-wasm/06-clustering_spec.lua b/spec/02-integration/20-wasm/06-clustering_spec.lua index 31e76bb3f1d7..9e36d4dce667 100644 --- a/spec/02-integration/20-wasm/06-clustering_spec.lua +++ b/spec/02-integration/20-wasm/06-clustering_spec.lua @@ -72,7 +72,9 @@ local function new_wasm_filter_directory() end -describe("#wasm - hybrid mode #postgres", function() +-- XXX TODO: enable inc_sync = "on" +for _, inc_sync in ipairs { "off" } do +describe("#wasm - hybrid mode #postgres" .. " inc_sync=" .. inc_sync, function() local cp_prefix = "cp" local cp_errlog = cp_prefix .. "/logs/error.log" local cp_filter_path @@ -113,6 +115,7 @@ describe("#wasm - hybrid mode #postgres", function() wasm_filters = "user", -- don't enable bundled filters for this test wasm_filters_path = cp_filter_path, nginx_main_worker_processes = 2, + cluster_incremental_sync = inc_sync, })) assert.logfile(cp_errlog).has.line([[successfully loaded "response_transformer" module]], true, 10) @@ -152,6 +155,7 @@ describe("#wasm - hybrid mode #postgres", function() wasm_filters_path = dp_filter_path, node_id = node_id, nginx_main_worker_processes = 2, + cluster_incremental_sync = inc_sync, })) assert.logfile(dp_errlog).has.line([[successfully loaded "response_transformer" module]], true, 10) @@ -307,6 +311,7 @@ describe("#wasm - hybrid mode #postgres", function() nginx_conf = "spec/fixtures/custom_nginx.template", wasm = "off", node_id = node_id, + cluster_incremental_sync = inc_sync, })) end) @@ -346,6 +351,7 @@ describe("#wasm - hybrid mode #postgres", function() wasm_filters = "user", -- don't enable bundled filters for this test wasm_filters_path = tmp_dir, node_id = node_id, + cluster_incremental_sync = inc_sync, })) end) @@ -364,3 +370,4 @@ describe("#wasm - hybrid mode #postgres", function() end) end) end) +end -- for inc_sync diff --git a/spec/02-integration/20-wasm/10-wasmtime_spec.lua b/spec/02-integration/20-wasm/10-wasmtime_spec.lua index 7a1ba07c185c..05a3f91d6a59 100644 --- a/spec/02-integration/20-wasm/10-wasmtime_spec.lua +++ b/spec/02-integration/20-wasm/10-wasmtime_spec.lua @@ -1,6 +1,7 @@ local helpers = require "spec.helpers" local fmt = string.format +for _, inc_sync in ipairs { "off", "on" } do for _, role in ipairs({"traditional", "control_plane", "data_plane"}) do describe("#wasm wasmtime (role: " .. role .. ")", function() @@ -18,9 +19,11 @@ describe("#wasm wasmtime (role: " .. role .. ")", function() role = role, cluster_cert = "spec/fixtures/kong_clustering.crt", cluster_cert_key = "spec/fixtures/kong_clustering.key", + cluster_incremental_sync = inc_sync, })) conf = assert(helpers.get_running_conf(prefix)) + conf.cluster_incremental_sync = inc_sync == "on" end) lazy_teardown(function() @@ -90,9 +93,12 @@ describe("#wasm wasmtime (role: " .. role .. ")", function() status_listen = "127.0.0.1:" .. status_port, nginx_main_worker_processes = 2, + + cluster_incremental_sync = inc_sync, })) conf = assert(helpers.get_running_conf(prefix)) + conf.cluster_incremental_sync = inc_sync == "on" -- we need to briefly spin up a control plane, or else we will get -- error.log entries when our data plane tries to connect @@ -110,6 +116,7 @@ describe("#wasm wasmtime (role: " .. role .. ")", function() cluster_cert_key = "spec/fixtures/kong_clustering.key", status_listen = "off", nginx_main_worker_processes = 2, + cluster_incremental_sync = inc_sync, })) end end) @@ -167,3 +174,4 @@ describe("#wasm wasmtime (role: " .. role .. ")", function() end) -- wasmtime end -- each role +end -- for inc_sync diff --git a/spec/03-plugins/09-key-auth/04-hybrid_mode_spec.lua b/spec/03-plugins/09-key-auth/04-hybrid_mode_spec.lua index 7fb4bd9ed0b9..67cd9abc6f30 100644 --- a/spec/03-plugins/09-key-auth/04-hybrid_mode_spec.lua +++ b/spec/03-plugins/09-key-auth/04-hybrid_mode_spec.lua @@ -1,7 +1,10 @@ local helpers = require "spec.helpers" + +-- XXX FIXME: inc_sync = on flaky +for _, inc_sync in ipairs { "off" } do for _, strategy in helpers.each_strategy({"postgres"}) do - describe("Plugin: key-auth (access) [#" .. strategy .. "] auto-expiring keys", function() + describe("Plugin: key-auth (access) [#" .. strategy .. " inc_sync=" .. inc_sync .. "] auto-expiring keys", function() -- Give a bit of time to reduce test flakyness on slow setups local ttl = 10 local inserted_at @@ -38,6 +41,7 @@ for _, strategy in helpers.each_strategy({"postgres"}) do cluster_listen = "127.0.0.1:9005", cluster_telemetry_listen = "127.0.0.1:9006", nginx_conf = "spec/fixtures/custom_nginx.template", + cluster_incremental_sync = inc_sync, })) assert(helpers.start_kong({ @@ -50,6 +54,7 @@ for _, strategy in helpers.each_strategy({"postgres"}) do cluster_control_plane = "127.0.0.1:9005", cluster_telemetry_endpoint = "127.0.0.1:9006", proxy_listen = "0.0.0.0:9002", + cluster_incremental_sync = inc_sync, })) end) @@ -120,4 +125,5 @@ for _, strategy in helpers.each_strategy({"postgres"}) do end) end) -end +end -- for _, strategy +end -- for inc_sync diff --git a/spec/03-plugins/11-correlation-id/02-schema_spec.lua b/spec/03-plugins/11-correlation-id/02-schema_spec.lua index 68a03b73a329..b02cc906505f 100644 --- a/spec/03-plugins/11-correlation-id/02-schema_spec.lua +++ b/spec/03-plugins/11-correlation-id/02-schema_spec.lua @@ -86,7 +86,9 @@ describe("Plugin: correlation-id (schema) #a [#" .. strategy .."]", function() end) end) - describe("in hybrid mode", function() + --- XXX FIXME: enable inc_sync = on + for _, inc_sync in ipairs { "off" } do + describe("in hybrid mode" .. " inc_sync=" .. inc_sync, function() local route lazy_setup(function() route = bp.routes:insert({ @@ -107,7 +109,8 @@ describe("Plugin: correlation-id (schema) #a [#" .. strategy .."]", function() COMMIT; ]], { ROUTE_ID = route.id, - CACHE_KEY = "plugins:correlation-id:"..route.id.."::::"..ws.id, + --CACHE_KEY = "plugins:correlation-id:"..route.id.."::::"..ws.id, + CACHE_KEY = "plugins|"..ws.id.."|route|"..route.id.."|"..plugin_id, ID = plugin_id, }) local _, err = db.connector:query(sql) @@ -121,6 +124,7 @@ describe("Plugin: correlation-id (schema) #a [#" .. strategy .."]", function() prefix = "servroot", cluster_listen = "127.0.0.1:9005", nginx_conf = "spec/fixtures/custom_nginx.template", + cluster_incremental_sync = inc_sync, })) assert(helpers.start_kong({ @@ -132,6 +136,7 @@ describe("Plugin: correlation-id (schema) #a [#" .. strategy .."]", function() cluster_control_plane = "127.0.0.1:9005", proxy_listen = "0.0.0.0:9002", status_listen = "127.0.0.1:9100", + cluster_incremental_sync = inc_sync, })) end) @@ -181,4 +186,5 @@ describe("Plugin: correlation-id (schema) #a [#" .. strategy .."]", function() proxy_client:close() end) end) + end -- for inc_sync end) diff --git a/spec/05-migration/db/migrations/core/024_370_to_380_spec.lua b/spec/05-migration/db/migrations/core/024_370_to_380_spec.lua new file mode 100644 index 000000000000..0563c4c83f80 --- /dev/null +++ b/spec/05-migration/db/migrations/core/024_370_to_380_spec.lua @@ -0,0 +1,17 @@ +local uh = require "spec/upgrade_helpers" + +describe("database migration", function() + uh.old_after_up("has created the \"clustering_sync_version\" table", function() + assert.database_has_relation("clustering_sync_version") + assert.table_has_column("clustering_sync_version", "version", "integer") + end) + + uh.old_after_up("has created the \"clustering_sync_delta\" table", function() + assert.database_has_relation("clustering_sync_delta") + assert.table_has_column("clustering_sync_delta", "version", "integer") + assert.table_has_column("clustering_sync_delta", "type", "text") + assert.table_has_column("clustering_sync_delta", "id", "uuid") + assert.table_has_column("clustering_sync_delta", "ws_id", "uuid") + assert.table_has_column("clustering_sync_delta", "row", "json") + end) +end) diff --git a/spec/internal/db.lua b/spec/internal/db.lua index 9895181fdeb8..8fe43da18d71 100644 --- a/spec/internal/db.lua +++ b/spec/internal/db.lua @@ -277,6 +277,8 @@ end -- } local function get_db_utils(strategy, tables, plugins, vaults, skip_migrations) strategy = strategy or conf.database + conf.database = strategy -- overwrite kong.configuration.database + if tables ~= nil and type(tables) ~= "table" then error("arg #2 must be a list of tables to truncate", 2) end @@ -314,12 +316,6 @@ local function get_db_utils(strategy, tables, plugins, vaults, skip_migrations) bootstrap_database(db) end - do - local database = conf.database - conf.database = strategy - conf.database = database - end - db:truncate("plugins") assert(db.plugins:load_plugin_schemas(conf.loaded_plugins)) assert(db.vaults:load_vault_schemas(conf.loaded_vaults))