diff --git a/changelog/unreleased/kong/cp-dp-rpc.yml b/changelog/unreleased/kong/cp-dp-rpc.yml new file mode 100644 index 00000000000..cb8efa9d1bc --- /dev/null +++ b/changelog/unreleased/kong/cp-dp-rpc.yml @@ -0,0 +1,3 @@ +message: "Added a remote procedure call (RPC) framework for Hybrid mode deployments." +type: feature +scope: Clustering diff --git a/changelog/unreleased/kong/dynamic-log-level-rpc.yml b/changelog/unreleased/kong/dynamic-log-level-rpc.yml new file mode 100644 index 00000000000..69096eb0afe --- /dev/null +++ b/changelog/unreleased/kong/dynamic-log-level-rpc.yml @@ -0,0 +1,6 @@ +message: | + Dynamic log level over Hybrid mode RPC which allows setting DP log level + to a different level for specified duration before reverting back + to the `kong.conf` configured value. +type: feature +scope: Clustering diff --git a/kong-3.9.0-0.rockspec b/kong-3.9.0-0.rockspec index c0cc4c02d11..b9c9d121764 100644 --- a/kong-3.9.0-0.rockspec +++ b/kong-3.9.0-0.rockspec @@ -88,7 +88,6 @@ build = { ["kong.clustering.compat.checkers"] = "kong/clustering/compat/checkers.lua", ["kong.clustering.config_helper"] = "kong/clustering/config_helper.lua", ["kong.clustering.tls"] = "kong/clustering/tls.lua", - ["kong.clustering.services.debug"] = "kong/clustering/services/debug.lua", ["kong.clustering.rpc.callbacks"] = "kong/clustering/rpc/callbacks.lua", ["kong.clustering.rpc.future"] = "kong/clustering/rpc/future.lua", @@ -99,6 +98,12 @@ build = { ["kong.clustering.rpc.utils"] = "kong/clustering/rpc/utils.lua", ["kong.clustering.rpc.concentrator"] = "kong/clustering/rpc/concentrator.lua", + ["kong.clustering.services.debug"] = "kong/clustering/services/debug.lua", + ["kong.clustering.services.sync"] = "kong/clustering/services/sync/init.lua", + ["kong.clustering.services.sync.rpc"] = "kong/clustering/services/sync/rpc.lua", + ["kong.clustering.services.sync.hooks"] = "kong/clustering/services/sync/hooks.lua", + ["kong.clustering.services.sync.strategies.postgres"] = "kong/clustering/services/sync/strategies/postgres.lua", + ["kong.cluster_events"] = "kong/cluster_events/init.lua", ["kong.cluster_events.strategies.postgres"] = "kong/cluster_events/strategies/postgres.lua", ["kong.cluster_events.strategies.off"] = "kong/cluster_events/strategies/off.lua", @@ -289,7 +294,6 @@ build = { ["kong.db.strategies.postgres.plugins"] = "kong/db/strategies/postgres/plugins.lua", ["kong.db.strategies.off"] = "kong/db/strategies/off/init.lua", ["kong.db.strategies.off.connector"] = "kong/db/strategies/off/connector.lua", - ["kong.db.strategies.off.tags"] = "kong/db/strategies/off/tags.lua", ["kong.db.migrations.state"] = "kong/db/migrations/state.lua", ["kong.db.migrations.subsystems"] = "kong/db/migrations/subsystems.lua", @@ -316,6 +320,7 @@ build = { ["kong.db.migrations.core.021_340_to_350"] = "kong/db/migrations/core/021_340_to_350.lua", ["kong.db.migrations.core.022_350_to_360"] = "kong/db/migrations/core/022_350_to_360.lua", ["kong.db.migrations.core.023_360_to_370"] = "kong/db/migrations/core/023_360_to_370.lua", + ["kong.db.migrations.core.024_370_to_380"] = "kong/db/migrations/core/024_370_to_380.lua", ["kong.db.migrations.operations.200_to_210"] = "kong/db/migrations/operations/200_to_210.lua", ["kong.db.migrations.operations.212_to_213"] = "kong/db/migrations/operations/212_to_213.lua", ["kong.db.migrations.operations.280_to_300"] = "kong/db/migrations/operations/280_to_300.lua", diff --git a/kong/clustering/rpc/manager.lua b/kong/clustering/rpc/manager.lua index 5104fdab723..7881b1661ff 100644 --- a/kong/clustering/rpc/manager.lua +++ b/kong/clustering/rpc/manager.lua @@ -42,6 +42,7 @@ function _M.new(conf, node_id) -- clients[node_id]: { socket1 => true, socket2 => true, ... } clients = {}, client_capabilities = {}, + client_ips = {}, -- store DP node's ip addr node_id = node_id, conf = conf, cluster_cert = assert(clustering_tls.get_cluster_cert(conf)), @@ -75,16 +76,18 @@ end function _M:_remove_socket(socket) - local sockets = assert(self.clients[socket.node_id]) + local node_id = socket.node_id + local sockets = assert(self.clients[node_id]) assert(sockets[socket]) sockets[socket] = nil if table_isempty(sockets) then - self.clients[socket.node_id] = nil - self.client_capabilities[socket.node_id] = nil - assert(self.concentrator:_enqueue_unsubscribe(socket.node_id)) + self.clients[node_id] = nil + self.client_ips[node_id] = nil + self.client_capabilities[node_id] = nil + assert(self.concentrator:_enqueue_unsubscribe(node_id)) end end @@ -255,6 +258,9 @@ function _M:handle_websocket() local s = socket.new(self, wb, node_id) self:_add_socket(s, rpc_capabilities) + -- store DP's ip addr + self.client_ips[node_id] = ngx_var.remote_addr + s:start() local res, err = s:join() self:_remove_socket(s) @@ -362,4 +368,9 @@ function _M:get_peers() end +function _M:get_peer_ip(node_id) + return self.client_ips[node_id] +end + + return _M diff --git a/kong/clustering/services/sync/hooks.lua b/kong/clustering/services/sync/hooks.lua new file mode 100644 index 00000000000..7a3a1402558 --- /dev/null +++ b/kong/clustering/services/sync/hooks.lua @@ -0,0 +1,179 @@ +local _M = {} +local _MT = { __index = _M, } + + +local hooks = require("kong.hooks") +local EMPTY = require("kong.tools.table").EMPTY + + +local ipairs = ipairs +local ngx_null = ngx.null +local ngx_log = ngx.log +local ngx_ERR = ngx.ERR +local ngx_DEBUG = ngx.DEBUG + + +local DEFAULT_PAGE_SIZE = 512 + + +function _M.new(strategy) + local self = { + strategy = strategy, + } + + return setmetatable(self, _MT) +end + + +local function get_all_nodes_with_sync_cap() + local res, err = kong.db.clustering_data_planes:page(DEFAULT_PAGE_SIZE) + if err then + return nil, "unable to query DB " .. err + end + + if not res then + return EMPTY + end + + local ret = {} + local ret_n = 0 + + for _, row in ipairs(res) do + for _, c in ipairs(row.rpc_capabilities) do + if c == "kong.sync.v2" then + ret_n = ret_n + 1 + ret[ret_n] = row.id + break + end + end + end + + return ret +end + + +function _M:notify_all_nodes() + local latest_version, err = self.strategy:get_latest_version() + if not latest_version then + ngx_log(ngx_ERR, "can not get the latest version: ", err) + return + end + + local msg = { default = { new_version = latest_version, }, } + + for _, node in ipairs(get_all_nodes_with_sync_cap()) do + local res, err = kong.rpc:call(node, "kong.sync.v2.notify_new_version", msg) + if not res then + if not err:find("requested capability does not exist", nil, true) then + ngx_log(ngx_ERR, "unable to notify new version: ", err) + end + + else + ngx_log(ngx_DEBUG, "notified ", node, " ", latest_version) + end + end +end + + +function _M:entity_delta_writer(row, name, options, ws_id, is_delete) + local deltas = { + { + type = name, + id = row.id, + ws_id = ws_id, + row = is_delete and ngx_null or row, + }, + } + + local res, err = self.strategy:insert_delta(deltas) + if not res then + self.strategy:cancel_txn() + return nil, err + end + + res, err = self.strategy:commit_txn() + if not res then + self.strategy:cancel_txn() + return nil, err + end + + self:notify_all_nodes() + + return row -- for other hooks +end + + +-- only control plane has these delta operations +function _M:register_dao_hooks() + local function is_db_export(name) + local db_export = kong.db[name].schema.db_export + return db_export == nil or db_export == true + end + + -- common hook functions (pre/fail/post) + + local function pre_hook_func(entity, name, options) + if not is_db_export(name) then + return true + end + + return self.strategy:begin_txn() + end + + local function fail_hook_func(err, entity, name) + if not is_db_export(name) then + return + end + + local res, err = self.strategy:cancel_txn() + if not res then + ngx_log(ngx_ERR, "unable to cancel cancel_txn: ", tostring(err)) + end + end + + local function post_hook_writer_func(row, name, options, ws_id) + if not is_db_export(name) then + return row + end + + return self:entity_delta_writer(row, name, options, ws_id) + end + + local function post_hook_delete_func(row, name, options, ws_id, cascade_entries) + if not is_db_export(name) then + return row + end + + -- set lmdb value to ngx_null then return row + return self:entity_delta_writer(row, name, options, ws_id, true) + end + + local dao_hooks = { + -- dao:insert + ["dao:insert:pre"] = pre_hook_func, + ["dao:insert:fail"] = fail_hook_func, + ["dao:insert:post"] = post_hook_writer_func, + + -- dao:delete + ["dao:delete:pre"] = pre_hook_func, + ["dao:delete:fail"] = fail_hook_func, + ["dao:delete:post"] = post_hook_delete_func, + + -- dao:update + ["dao:update:pre"] = pre_hook_func, + ["dao:update:fail"] = fail_hook_func, + ["dao:update:post"] = post_hook_writer_func, + + -- dao:upsert + ["dao:upsert:pre"] = pre_hook_func, + ["dao:upsert:fail"] = fail_hook_func, + ["dao:upsert:post"] = post_hook_writer_func, + } + + for ev, func in pairs(dao_hooks) do + hooks.register_hook(ev, func) + end +end + + +return _M diff --git a/kong/clustering/services/sync/init.lua b/kong/clustering/services/sync/init.lua new file mode 100644 index 00000000000..40f1b836241 --- /dev/null +++ b/kong/clustering/services/sync/init.lua @@ -0,0 +1,63 @@ +local _M = {} +local _MT = { __index = _M, } + + +local events = require("kong.clustering.events") +local strategy = require("kong.clustering.services.sync.strategies.postgres") +local rpc = require("kong.clustering.services.sync.rpc") + + +-- TODO: what is the proper value? +local FIRST_SYNC_DELAY = 0.5 -- seconds +local EACH_SYNC_DELAY = 30 -- seconds + + +function _M.new(db, is_cp) + local strategy = strategy.new(db) + + local self = { + db = db, + strategy = strategy, + rpc = rpc.new(strategy), + is_cp = is_cp, + } + + -- only cp needs hooks + if is_cp then + self.hooks = require("kong.clustering.services.sync.hooks").new(strategy) + end + + return setmetatable(self, _MT) +end + + +function _M:init(manager) + if self.hooks then + self.hooks:register_dao_hooks() + end + self.rpc:init(manager, self.is_cp) +end + + +function _M:init_worker() + -- is CP, enable clustering broadcasts + if self.is_cp then + events.init() + + self.strategy:init_worker() + return + end + + -- is DP, sync only in worker 0 + if ngx.worker.id() ~= 0 then + return + end + + -- sync to CP ASAP + assert(self.rpc:sync_once(FIRST_SYNC_DELAY)) + + assert(self.rpc:sync_every(EACH_SYNC_DELAY)) +end + + +return _M diff --git a/kong/clustering/services/sync/rpc.lua b/kong/clustering/services/sync/rpc.lua new file mode 100644 index 00000000000..3d3ec536089 --- /dev/null +++ b/kong/clustering/services/sync/rpc.lua @@ -0,0 +1,354 @@ +local _M = {} +local _MT = { __index = _M, } + + +local txn = require("resty.lmdb.transaction") +local declarative = require("kong.db.declarative") +local constants = require("kong.constants") +local concurrency = require("kong.concurrency") + + +local insert_entity_for_txn = declarative.insert_entity_for_txn +local delete_entity_for_txn = declarative.delete_entity_for_txn +local DECLARATIVE_HASH_KEY = constants.DECLARATIVE_HASH_KEY +local CLUSTERING_SYNC_STATUS = constants.CLUSTERING_SYNC_STATUS +local SYNC_MUTEX_OPTS = { name = "get_delta", timeout = 0, } + + +local pairs = pairs +local ipairs = ipairs +local fmt = string.format +local ngx_null = ngx.null +local ngx_log = ngx.log +local ngx_ERR = ngx.ERR +local ngx_INFO = ngx.INFO +local ngx_DEBUG = ngx.DEBUG + + +-- number of versions behind before a full sync is forced +local FULL_SYNC_THRESHOLD = 512 + + +function _M.new(strategy) + local self = { + strategy = strategy, + } + + return setmetatable(self, _MT) +end + + +function _M:init_cp(manager) + -- CP + -- Method: kong.sync.v2.get_delta + -- Params: versions: list of current versions of the database + -- { { namespace = "default", version = 1000, }, } + local purge_delay = manager.conf.cluster_data_plane_purge_delay + + local function gen_delta_result(res, wipe) + return { default = { deltas = res, wipe = wipe, }, } + end + + manager.callbacks:register("kong.sync.v2.get_delta", function(node_id, current_versions) + ngx_log(ngx_DEBUG, "[kong.sync.v2] config push (connected client)") + + local rpc_peers + if kong.rpc then + rpc_peers = kong.rpc:get_peers() + end + + local default_namespace + for namespace, v in pairs(current_versions) do + if namespace == "default" then + default_namespace = v + break + end + end + + if not default_namespace then + return nil, "default namespace does not exist inside params" + end + + -- { { namespace = "default", version = 1000, }, } + local default_namespace_version = default_namespace.version + + -- XXX TODO: follow update_sync_status() in control_plane.lua + local ok, err = kong.db.clustering_data_planes:upsert({ id = node_id }, { + last_seen = ngx.time(), + hostname = node_id, + ip = kong.rpc:get_peer_ip(node_id), -- try to get the correct ip + version = "3.8.0.0", -- XXX TODO: get from rpc call + sync_status = CLUSTERING_SYNC_STATUS.NORMAL, + config_hash = fmt("%032d", default_namespace_version), + rpc_capabilities = rpc_peers and rpc_peers[node_id] or {}, + }, { ttl = purge_delay }) + if not ok then + ngx_log(ngx_ERR, "unable to update clustering data plane status: ", err) + end + + local latest_version, err = self.strategy:get_latest_version() + if not latest_version then + return nil, err + end + + -- is the node empty? If so, just do a full sync to bring it up to date faster + if default_namespace_version == 0 or + latest_version - default_namespace_version > FULL_SYNC_THRESHOLD + then + -- we need to full sync because holes are found + + ngx_log(ngx_INFO, + "[kong.sync.v2] database is empty or too far behind for node_id: ", node_id, + ", current_version: ", default_namespace_version, + ", forcing a full sync") + + + local deltas, err = declarative.export_config_sync() + if not deltas then + return nil, err + end + + -- wipe dp lmdb, full sync + return gen_delta_result(deltas, true) + end + + local res, err = self.strategy:get_delta(default_namespace_version) + if not res then + return nil, err + end + + if #res == 0 then + ngx_log(ngx_DEBUG, + "[kong.sync.v2] no delta for node_id: ", node_id, + ", current_version: ", default_namespace_version, + ", node is already up to date" ) + return gen_delta_result(res, false) + end + + -- some deltas are returned, are they contiguous? + if res[1].version == default_namespace.version + 1 then + -- doesn't wipe dp lmdb, incremental sync + return gen_delta_result(res, false) + end + + -- we need to full sync because holes are found + -- in the delta, meaning the oldest version is no longer + -- available + + ngx_log(ngx_INFO, + "[kong.sync.v2] delta for node_id no longer available: ", node_id, + ", current_version: ", default_namespace_version, + ", forcing a full sync") + + local deltas, err = declarative.export_config_sync() + if not deltas then + return nil, err + end + + -- wipe dp lmdb, full sync + return gen_delta_result(deltas, true) + end) +end + + +function _M:init_dp(manager) + -- DP + -- Method: kong.sync.v2.notify_new_version + -- Params: new_versions: list of namespaces and their new versions, like: + -- { { new_version = 1000, }, }, possible field: namespace = "default" + manager.callbacks:register("kong.sync.v2.notify_new_version", function(node_id, new_versions) + -- TODO: currently only default is supported, and anything else is ignored + local default_new_version = new_versions.default + if not default_new_version then + return nil, "default namespace does not exist inside params" + end + + local version = default_new_version.new_version + if not version then + return nil, "'new_version' key does not exist" + end + + local lmdb_ver = tonumber(declarative.get_current_hash()) or 0 + if lmdb_ver < version then + return self:sync_once() + end + + return true + end) +end + + +function _M:init(manager, is_cp) + if is_cp then + self:init_cp(manager) + else + self:init_dp(manager) + end +end + + +local function do_sync() + local ns_deltas, err = kong.rpc:call("control_plane", "kong.sync.v2.get_delta", + { default = + { version = + tonumber(declarative.get_current_hash()) or 0, + }, + }) + if not ns_deltas then + ngx_log(ngx_ERR, "sync get_delta error: ", err) + return true + end + + local ns_delta + + for namespace, delta in pairs(ns_deltas) do + if namespace == "default" then + ns_delta = delta + break -- should we break here? + end + end + + if not ns_delta then + return nil, "default namespace does not exist inside params" + end + + if #ns_delta.deltas == 0 then + ngx_log(ngx_DEBUG, "no delta to sync") + return true + end + + local t = txn.begin(512) + + local wipe = ns_delta.wipe + if wipe then + t:db_drop(false) + end + + local db = kong.db + + local version = 0 + local crud_events = {} + local crud_events_n = 0 + + for _, delta in ipairs(ns_delta.deltas) do + local delta_type = delta.type + local delta_row = delta.row + local ev + + if delta_row ~= ngx_null then + -- upsert the entity + -- does the entity already exists? + local old_entity, err = db[delta_type]:select(delta_row) + if err then + return nil, err + end + + local crud_event_type = old_entity and "update" or "create" + + -- If we will wipe lmdb, we don't need to delete it from lmdb. + if old_entity and not wipe then + local res, err = delete_entity_for_txn(t, delta_type, old_entity, nil) + if not res then + return nil, err + end + end + + local res, err = insert_entity_for_txn(t, delta_type, delta_row, nil) + if not res then + return nil, err + end + + ev = { delta_type, crud_event_type, delta_row, old_entity, } + + else + -- delete the entity + local old_entity, err = kong.db[delta_type]:select({ id = delta.id, }) -- TODO: composite key + if err then + return nil, err + end + + -- If we will wipe lmdb, we don't need to delete it from lmdb. + if old_entity and not wipe then + local res, err = delete_entity_for_txn(t, delta_type, old_entity, nil) + if not res then + return nil, err + end + end + + ev = { delta_type, "delete", old_entity, } + end + + crud_events_n = crud_events_n + 1 + crud_events[crud_events_n] = ev + + -- XXX TODO: could delta.version be nil or ngx.null + if type(delta.version) == "number" and delta.version ~= version then + version = delta.version + end + end -- for _, delta + + t:set(DECLARATIVE_HASH_KEY, fmt("%032d", version)) + local ok, err = t:commit() + if not ok then + return nil, err + end + + if wipe then + kong.core_cache:purge() + kong.cache:purge() + + else + for _, event in ipairs(crud_events) do + -- delta_type, crud_event_type, delta.row, old_entity + db[event[1]]:post_crud_event(event[2], event[3], event[4]) + end + end + + return true +end + + +local function sync_handler(premature) + if premature then + return + end + + local res, err = concurrency.with_worker_mutex(SYNC_MUTEX_OPTS, function() + -- here must be 2 times + for _ = 1, 2 do + local ok, err = do_sync() + if not ok then + return nil, err + end + end -- for + + return true + end) + if not res and err ~= "timeout" then + ngx_log(ngx_ERR, "unable to create worker mutex and sync: ", err) + end +end + + +local function start_sync_timer(timer_func, delay) + local hdl, err = timer_func(delay, sync_handler) + + if not hdl then + return nil, err + end + + return true +end + + +function _M:sync_once(delay) + return start_sync_timer(ngx.timer.at, delay or 0) +end + + +function _M:sync_every(delay) + return start_sync_timer(ngx.timer.every, delay) +end + + +return _M diff --git a/kong/clustering/services/sync/strategies/postgres.lua b/kong/clustering/services/sync/strategies/postgres.lua new file mode 100644 index 00000000000..39c550b8ffb --- /dev/null +++ b/kong/clustering/services/sync/strategies/postgres.lua @@ -0,0 +1,130 @@ +local _M = {} +local _MT = { __index = _M } + + +local cjson = require("cjson.safe") +local buffer = require("string.buffer") + + +local string_format = string.format +local cjson_encode = cjson.encode +local ngx_log = ngx.log +local ngx_ERR = ngx.ERR +local ngx_DEBUG = ngx.DEBUG + + +local CLEANUP_VERSION_COUNT = 100 +local CLEANUP_TIME_DELAY = 3600 -- 1 hour + + +function _M.new(db) + local self = { + connector = db.connector, + } + + return setmetatable(self, _MT) +end + + +local PURGE_QUERY = [[ + DELETE FROM clustering_sync_version + WHERE "version" < ( + SELECT MAX("version") - %d + FROM clustering_sync_version + ); +]] + + +function _M:init_worker() + local function cleanup_handler(premature) + if premature then + ngx_log(ngx_DEBUG, "[incremental] worker exiting, killing incremental cleanup timer") + + return + end + + local res, err = self.connector:query(string_format(PURGE_QUERY, CLEANUP_VERSION_COUNT)) + if not res then + ngx_log(ngx_ERR, + "[incremental] unable to purge old data from incremental delta table, err: ", + err) + + return + end + + ngx_log(ngx_DEBUG, + "[incremental] successfully purged old data from incremental delta table") + end + + assert(ngx.timer.every(CLEANUP_TIME_DELAY, cleanup_handler)) +end + + +local NEW_VERSION_QUERY = [[ + DO $$ + DECLARE + new_version integer; + BEGIN + INSERT INTO clustering_sync_version DEFAULT VALUES RETURNING version INTO new_version; + INSERT INTO clustering_sync_delta (version, type, id, ws_id, row) VALUES %s; + END $$; +]] + + +-- deltas: { +-- { type = "service", "id" = "d78eb00f-8702-4d6a-bfd9-e005f904ae3e", "ws_id" = "73478cf6-964f-412d-b1c4-8ac88d9e85e9", row = "JSON", } +-- { type = "route", "id" = "0a5bac5c-b795-4981-95d2-919ba3390b7e", "ws_id" = "73478cf6-964f-412d-b1c4-8ac88d9e85e9", row = "JSON", } +-- } +function _M:insert_delta(deltas) + local buf = buffer.new() + for _, d in ipairs(deltas) do + buf:putf("(new_version, %s, %s, %s, %s)", + self.connector:escape_literal(d.type), + self.connector:escape_literal(d.id), + self.connector:escape_literal(d.ws_id), + self.connector:escape_literal(cjson_encode(d.row))) + end + + local sql = string_format(NEW_VERSION_QUERY, buf:get()) + + return self.connector:query(sql) +end + + +function _M:get_latest_version() + local sql = "SELECT MAX(version) AS max_version FROM clustering_sync_version" + + local res, err = self.connector:query(sql) + if not res then + return nil, err + end + + return res[1] and res[1].max_version +end + + +function _M:get_delta(version) + local sql = "SELECT * FROM clustering_sync_delta" .. + " WHERE version > " .. self.connector:escape_literal(version) .. + " ORDER BY version ASC" + return self.connector:query(sql) +end + + +function _M:begin_txn() + return self.connector:query("BEGIN;") +end + + +function _M:commit_txn() + return self.connector:query("COMMIT;") +end + + +function _M:cancel_txn() + -- we will close the connection, not execute 'ROLLBACK' + return self.connector:close() +end + + +return _M diff --git a/kong/conf_loader/constants.lua b/kong/conf_loader/constants.lua index 21326a588e3..76cbb36394c 100644 --- a/kong/conf_loader/constants.lua +++ b/kong/conf_loader/constants.lua @@ -510,6 +510,7 @@ local CONF_PARSERS = { cluster_use_proxy = { typ = "boolean" }, cluster_dp_labels = { typ = "array" }, cluster_rpc = { typ = "boolean" }, + cluster_incremental_sync = { typ = "boolean" }, cluster_cjson = { typ = "boolean" }, kic = { typ = "boolean" }, diff --git a/kong/conf_loader/init.lua b/kong/conf_loader/init.lua index 96ff04522ac..51ea979d2cc 100644 --- a/kong/conf_loader/init.lua +++ b/kong/conf_loader/init.lua @@ -953,10 +953,9 @@ local function load(path, custom_conf, opts) end end - -- TODO: remove this when cluster_rpc is ready for GA - if conf.cluster_rpc then - log.warn("Cluster RPC has been forcibly disabled") - conf.cluster_rpc = "off" + if not conf.cluster_rpc then + log.warn("Cluster incremental sync has been forcibly disabled") + conf.cluster_incremental_sync = false end -- initialize the dns client, so the globally patched tcp.connect method diff --git a/kong/constants.lua b/kong/constants.lua index 6de799f980b..7a05f24cf53 100644 --- a/kong/constants.lua +++ b/kong/constants.lua @@ -206,8 +206,11 @@ local constants = { PROTOCOLS = protocols, PROTOCOLS_WITH_SUBSYSTEM = protocols_with_subsystem, + DECLARATIVE_DEFAULT_WORKSPACE_ID = "0dc6f45b-8f8d-40d2-a504-473544ee190b", + DECLARATIVE_LOAD_KEY = "declarative_config:loaded", DECLARATIVE_HASH_KEY = "declarative_config:hash", + DECLARATIVE_DEFAULT_WORKSPACE_KEY = "declarative_config:default_workspace", PLUGINS_REBUILD_COUNTER_KEY = "readiness_probe_config:plugins_rebuild_counter", ROUTERS_REBUILD_COUNTER_KEY = "readiness_probe_config:routers_rebuild_counter", DECLARATIVE_EMPTY_CONFIG_HASH = string.rep("0", 32), diff --git a/kong/db/dao/init.lua b/kong/db/dao/init.lua index 515e6c0c719..4305be4a96f 100644 --- a/kong/db/dao/init.lua +++ b/kong/db/dao/init.lua @@ -1156,12 +1156,14 @@ function DAO:insert(entity, options) local row, err_t = self.strategy:insert(entity_to_insert, options) if not row then + run_hook("dao:insert:fail", err_t, entity, self.schema.name, options) return nil, tostring(err_t), err_t end local ws_id = row.ws_id row, err, err_t = self:row_to_entity(row, options) if not row then + run_hook("dao:insert:fail", err, entity, self.schema.name, options) return nil, err, err_t end @@ -1209,12 +1211,14 @@ function DAO:update(pk_or_entity, entity, options) local row, err_t = self.strategy:update(primary_key, entity_to_update, options) if not row then + run_hook("dao:update:fail", err_t, entity_to_update, self.schema.name, options) return nil, tostring(err_t), err_t end local ws_id = row.ws_id row, err, err_t = self:row_to_entity(row, options) if not row then + run_hook("dao:update:fail", err_t, entity_to_update, self.schema.name, options) return nil, err, err_t end @@ -1337,9 +1341,11 @@ function DAO:delete(pk_or_entity, options) local rows_affected rows_affected, err_t = self.strategy:delete(primary_key, options) if err_t then + run_hook("dao:delete:fail", err_t, entity, self.schema.name, options) return nil, tostring(err_t), err_t elseif not rows_affected then + run_hook("dao:delete:post", nil, self.schema.name, options, ws_id, nil) return nil end diff --git a/kong/db/dao/workspaces.lua b/kong/db/dao/workspaces.lua index f42832014ab..f9353129246 100644 --- a/kong/db/dao/workspaces.lua +++ b/kong/db/dao/workspaces.lua @@ -1,6 +1,14 @@ local Workspaces = {} +local constants = require("kong.constants") +local lmdb = require("resty.lmdb") + + +local DECLARATIVE_DEFAULT_WORKSPACE_KEY = constants.DECLARATIVE_DEFAULT_WORKSPACE_KEY +local DECLARATIVE_DEFAULT_WORKSPACE_ID = constants.DECLARATIVE_DEFAULT_WORKSPACE_ID + + function Workspaces:truncate() self.super.truncate(self) if kong.configuration.database == "off" then @@ -18,4 +26,19 @@ function Workspaces:truncate() end +function Workspaces:select_by_name(key, options) + if kong.configuration.database == "off" and key == "default" then + -- TODO: Currently, only Kong workers load the declarative config into lmdb. + -- The Kong master doesn't get the default workspace from lmdb, so we + -- return the default constant value. It would be better to have the + -- Kong master load the declarative config into lmdb in the future. + -- + -- it should be a table, not a single string + return { id = lmdb.get(DECLARATIVE_DEFAULT_WORKSPACE_KEY) or DECLARATIVE_DEFAULT_WORKSPACE_ID, } + end + + return self.super.select_by_name(self, key, options) +end + + return Workspaces diff --git a/kong/db/declarative/export.lua b/kong/db/declarative/export.lua index c3b6b8c1366..6c1c66ede7f 100644 --- a/kong/db/declarative/export.lua +++ b/kong/db/declarative/export.lua @@ -117,9 +117,20 @@ local function export_from_db_impl(emitter, skip_ws, skip_disabled_entities, exp return nil, err end + local sync_version + if emitter.want_sync_version then + ok, err = db.connector:query("SELECT max(version) from clustering_sync_version", "read") + if not ok then + return nil, err + end + + sync_version = assert(ok[1].max) + end + emitter:emit_toplevel({ _format_version = "3.0", _transform = false, + _sync_version = sync_version, -- only used by sync emitter, DP doesn't care about this }) local disabled_services = {} @@ -339,6 +350,34 @@ local function sanitize_output(entities) end +local sync_emitter = { + emit_toplevel = function(self, tbl) + self.out = {} + self.out_n = 0 + self.sync_version = tbl._sync_version + end, + + emit_entity = function(self, entity_name, entity_data) + self.out_n = self.out_n + 1 + self.out[self.out_n] = { type = entity_name , row = entity_data, version = self.sync_version, } + end, + + done = function(self) + return self.out + end, +} + + +function sync_emitter.new() + return setmetatable({ want_sync_version = true, }, { __index = sync_emitter }) +end + + +local function export_config_sync() + return export_from_db_impl(sync_emitter.new(), false, false, true) +end + + return { convert_nulls = convert_nulls, to_yaml_string = to_yaml_string, @@ -347,6 +386,7 @@ return { export_from_db = export_from_db, export_config = export_config, export_config_proto = export_config_proto, + export_config_sync = export_config_sync, sanitize_output = sanitize_output, } diff --git a/kong/db/declarative/import.lua b/kong/db/declarative/import.lua index 454af9edb12..ea8f23a546d 100644 --- a/kong/db/declarative/import.lua +++ b/kong/db/declarative/import.lua @@ -11,19 +11,57 @@ local yield = require("kong.tools.yield").yield local marshall = require("kong.db.declarative.marshaller").marshall local schema_topological_sort = require("kong.db.schema.topological_sort") local nkeys = require("table.nkeys") +local sha256_hex = require("kong.tools.sha256").sha256_hex +local pk_string = declarative_config.pk_string +local EMPTY = require("kong.tools.table").EMPTY local assert = assert -local sort = table.sort local type = type local pairs = pairs -local next = next local insert = table.insert +local string_format = string.format local null = ngx.null local get_phase = ngx.get_phase +local get_workspace_id = workspaces.get_workspace_id local DECLARATIVE_HASH_KEY = constants.DECLARATIVE_HASH_KEY local DECLARATIVE_EMPTY_CONFIG_HASH = constants.DECLARATIVE_EMPTY_CONFIG_HASH +local DECLARATIVE_DEFAULT_WORKSPACE_KEY = constants.DECLARATIVE_DEFAULT_WORKSPACE_KEY + + +-- Generates the appropriate workspace ID for current operating context +-- depends on schema settings +-- +-- Non-workspaceable entities are always placed under the "default" +-- workspace +-- +-- If the query explicitly set options.workspace == null, then "default" +-- workspace shall be used +-- +-- If the query explicitly set options.workspace == "some UUID", then +-- it will be returned +-- +-- Otherwise, the current workspace ID will be returned +local function workspace_id(schema, options) + if not schema.workspaceable then + return kong.default_workspace + end + + -- options.workspace does not exist + if not options or not options.workspace then + return get_workspace_id() + end + + -- options.workspace == null must be handled by caller by querying + -- all available workspaces one by one + if options.workspace == null then + return kong.default_workspace + end + + -- options.workspace is a UUID + return options.workspace +end local function find_or_create_current_workspace(name) @@ -133,6 +171,7 @@ local function remove_nulls(tbl) return tbl end + --- Restore all nulls for declarative config. -- Declarative config is a huge table. Use iteration -- instead of recursion to improve performance. @@ -174,26 +213,42 @@ local function restore_nulls(original_tbl, transformed_tbl) return transformed_tbl end + local function get_current_hash() return lmdb.get(DECLARATIVE_HASH_KEY) end -local function find_default_ws(entities) - for _, v in pairs(entities.workspaces or {}) do - if v.name == "default" then +local function find_ws(entities, name) + for _, v in pairs(entities.workspaces or EMPTY) do + if v.name == name then return v.id end end end -local function unique_field_key(schema_name, ws_id, field, value, unique_across_ws) - if unique_across_ws then - ws_id = "" - end +local function unique_field_key(schema_name, ws_id, field, value) + return string_format("%s|%s|%s|%s", schema_name, ws_id, field, sha256_hex(value)) +end + + +local function foreign_field_key_prefix(schema_name, ws_id, field, foreign_id) + return string_format("%s|%s|%s|%s|", schema_name, ws_id, field, foreign_id) +end + + +local function foreign_field_key(schema_name, ws_id, field, foreign_id, pk) + return foreign_field_key_prefix(schema_name, ws_id, field, foreign_id) .. pk +end + +local function item_key_prefix(schema_name, ws_id) + return string_format("%s|%s|*|", schema_name, ws_id) +end + - return schema_name .. "|" .. ws_id .. "|" .. field .. ":" .. value +local function item_key(schema_name, ws_id, pk_str) + return item_key_prefix(schema_name, ws_id) .. pk_str end @@ -203,6 +258,122 @@ local function config_is_empty(entities) end +-- common implementation for +-- insert_entity_for_txn() and delete_entity_for_txn() +local function _set_entity_for_txn(t, entity_name, item, options, is_delete) + local dao = kong.db[entity_name] + local schema = dao.schema + local pk = pk_string(schema, item) + local ws_id = workspace_id(schema, options) + + local itm_key = item_key(entity_name, ws_id, pk) + + -- if we are deleting, item_value and idx_value should be nil + local itm_value, idx_value + + -- if we are inserting or updating + -- itm_value is serialized entity + -- idx_value is the lmdb item_key + if not is_delete then + local err + + -- serialize item with possible nulls + itm_value, err = marshall(item) + if not itm_value then + return nil, err + end + + idx_value = itm_key + end + + -- store serialized entity into lmdb + t:set(itm_key, itm_value) + + -- select_by_cache_key + if schema.cache_key then + local cache_key = dao:cache_key(item) + local key = unique_field_key(entity_name, ws_id, "cache_key", cache_key) + + -- store item_key or nil into lmdb + t:set(key, idx_value) + end + + for fname, fdata in schema:each_field() do + local is_foreign = fdata.type == "foreign" + local fdata_reference = fdata.reference + local value = item[fname] + + -- value may be null, we should skip it + if not value or value == null then + goto continue + end + + -- value should be a string or table + + local value_str + + if fdata.unique then + -- unique and not a foreign key, or is a foreign key, but non-composite + -- see: validate_foreign_key_is_single_primary_key, composite foreign + -- key is currently unsupported by the DAO + if type(value) == "table" then + assert(is_foreign) + value_str = pk_string(kong.db[fdata_reference].schema, value) + end + + if fdata.unique_across_ws then + ws_id = kong.default_workspace + end + + local key = unique_field_key(entity_name, ws_id, fname, value_str or value) + + -- store item_key or nil into lmdb + t:set(key, idx_value) + end + + if is_foreign then + -- is foreign, generate page_for_foreign_field indexes + assert(type(value) == "table") + + value_str = pk_string(kong.db[fdata_reference].schema, value) + + local key = foreign_field_key(entity_name, ws_id, fname, value_str, pk) + + -- store item_key or nil into lmdb + t:set(key, idx_value) + end + + ::continue:: + end -- for fname, fdata in schema:each_field() + + return true +end + + +-- Serialize and set keys for a single validated entity into +-- the provided LMDB txn object, this operation is only safe +-- is the entity does not already exist inside the LMDB database +-- +-- This function sets the following: +-- * ||*| => serialized item +-- * |||sha256(field_value) => ||*| +-- * |||| -> ||*| +-- +-- DO NOT touch `item`, or else the entity will be changed +local function insert_entity_for_txn(t, entity_name, item, options) + return _set_entity_for_txn(t, entity_name, item, options, false) +end + + +-- Serialize and remove keys for a single validated entity into +-- the provided LMDB txn object, this operation is safe whether the provided +-- entity exists inside LMDB or not, but the provided entity must contains the +-- correct field value so indexes can be deleted correctly +local function delete_entity_for_txn(t, entity_name, item, options) + return _set_entity_for_txn(t, entity_name, item, options, true) +end + + -- entities format: -- { -- services: { @@ -217,35 +388,24 @@ end -- _transform: true, -- } local function load_into_cache(entities, meta, hash) - -- Array of strings with this format: - -- "||". - -- For example, a service tagged "admin" would produce - -- "admin|services|" - local tags = {} - meta = meta or {} + local default_workspace_id = assert(find_ws(entities, "default")) + local should_transform = meta._transform == nil and true or meta._transform - local default_workspace = assert(find_default_ws(entities)) - local fallback_workspace = default_workspace + assert(type(default_workspace_id) == "string") - assert(type(fallback_workspace) == "string") + -- set it for insert_entity_for_txn() + kong.default_workspace = default_workspace_id if not hash or hash == "" or config_is_empty(entities) then hash = DECLARATIVE_EMPTY_CONFIG_HASH end - -- Keys: tag name, like "admin" - -- Values: array of encoded tags, similar to the `tags` variable, - -- but filtered for a given tag - local tags_by_name = {} - local db = kong.db - local t = txn.begin(128) + local t = txn.begin(512) t:db_drop(false) local phase = get_phase() - yield(false, phase) -- XXX - local transform = meta._transform == nil and true or meta._transform for entity_name, items in pairs(entities) do yield(true, phase) @@ -256,63 +416,14 @@ local function load_into_cache(entities, meta, hash) end local schema = dao.schema - -- Keys: tag_name, eg "admin" - -- Values: dictionary of keys associated to this tag, - -- for a specific entity type - -- i.e. "all the services associated to the 'admin' tag" - -- The ids are keys, and the values are `true` - local taggings = {} - - local uniques = {} - local page_for = {} - local foreign_fields = {} - for fname, fdata in schema:each_field() do - local is_foreign = fdata.type == "foreign" - local fdata_reference = fdata.reference - - if fdata.unique then - if is_foreign then - if #db[fdata_reference].schema.primary_key == 1 then - insert(uniques, fname) - end - - else - insert(uniques, fname) - end - end - if is_foreign then - page_for[fdata_reference] = {} - foreign_fields[fname] = fdata_reference - end - end - - local keys_by_ws = { - -- map of keys for global queries - ["*"] = {} - } - for id, item in pairs(items) do - -- When loading the entities, when we load the default_ws, we - -- set it to the current. But this only works in the worker that - -- is doing the loading (0), other ones still won't have it - - yield(true, phase) - - assert(type(fallback_workspace) == "string") - - local ws_id = "" - if schema.workspaceable then - local item_ws_id = item.ws_id - if item_ws_id == null or item_ws_id == nil then - item_ws_id = fallback_workspace - end - item.ws_id = item_ws_id - ws_id = item_ws_id + for _, item in pairs(items) do + if not schema.workspaceable or item.ws_id == null or item.ws_id == nil then + item.ws_id = default_workspace_id end - assert(type(ws_id) == "string") + assert(type(item.ws_id) == "string") - local cache_key = dao:cache_key(id, nil, nil, nil, nil, item.ws_id) - if transform and schema:has_transformations(item) then + if should_transform and schema:has_transformations(item) then local transformed_item = cycle_aware_deep_copy(item) remove_nulls(transformed_item) @@ -328,161 +439,16 @@ local function load_into_cache(entities, meta, hash) end end - local item_marshalled, err = marshall(item) - if not item_marshalled then - return nil, err - end - - t:set(cache_key, item_marshalled) - - local global_query_cache_key = dao:cache_key(id, nil, nil, nil, nil, "*") - t:set(global_query_cache_key, item_marshalled) - - -- insert individual entry for global query - insert(keys_by_ws["*"], cache_key) - - -- insert individual entry for workspaced query - if ws_id ~= "" then - keys_by_ws[ws_id] = keys_by_ws[ws_id] or {} - local keys = keys_by_ws[ws_id] - insert(keys, cache_key) - end - - if schema.cache_key then - local cache_key = dao:cache_key(item) - t:set(cache_key, item_marshalled) - end - - for i = 1, #uniques do - local unique = uniques[i] - local unique_key = item[unique] - if unique_key and unique_key ~= null then - if type(unique_key) == "table" then - local _ - -- this assumes that foreign keys are not composite - _, unique_key = next(unique_key) - end - - local key = unique_field_key(entity_name, ws_id, unique, unique_key, - schema.fields[unique].unique_across_ws) - - t:set(key, item_marshalled) - end - end - - for fname, ref in pairs(foreign_fields) do - local item_fname = item[fname] - if item_fname and item_fname ~= null then - local fschema = db[ref].schema - - local fid = declarative_config.pk_string(fschema, item_fname) - - -- insert paged search entry for global query - page_for[ref]["*"] = page_for[ref]["*"] or {} - page_for[ref]["*"][fid] = page_for[ref]["*"][fid] or {} - insert(page_for[ref]["*"][fid], cache_key) - - -- insert paged search entry for workspaced query - page_for[ref][ws_id] = page_for[ref][ws_id] or {} - page_for[ref][ws_id][fid] = page_for[ref][ws_id][fid] or {} - insert(page_for[ref][ws_id][fid], cache_key) - end - end - - local item_tags = item.tags - if item_tags and item_tags ~= null then - local ws = schema.workspaceable and ws_id or "" - for i = 1, #item_tags do - local tag_name = item_tags[i] - insert(tags, tag_name .. "|" .. entity_name .. "|" .. id) - - tags_by_name[tag_name] = tags_by_name[tag_name] or {} - insert(tags_by_name[tag_name], tag_name .. "|" .. entity_name .. "|" .. id) - - taggings[tag_name] = taggings[tag_name] or {} - taggings[tag_name][ws] = taggings[tag_name][ws] or {} - taggings[tag_name][ws][cache_key] = true - end - end - end - - for ws_id, keys in pairs(keys_by_ws) do - local entity_prefix = entity_name .. "|" .. (schema.workspaceable and ws_id or "") - - local keys, err = marshall(keys) - if not keys then + -- nil means no extra options + local res, err = insert_entity_for_txn(t, entity_name, item, nil) + if not res then return nil, err end + end -- for for _, item + end -- for entity_name, items - t:set(entity_prefix .. "|@list", keys) - - for ref, wss in pairs(page_for) do - local fids = wss[ws_id] - if fids then - for fid, entries in pairs(fids) do - local key = entity_prefix .. "|" .. ref .. "|" .. fid .. "|@list" - - local entries, err = marshall(entries) - if not entries then - return nil, err - end - - t:set(key, entries) - end - end - end - end - - -- taggings:admin|services|ws_id|@list -> uuids of services tagged "admin" on workspace ws_id - for tag_name, workspaces_dict in pairs(taggings) do - for ws_id, keys_dict in pairs(workspaces_dict) do - local key = "taggings:" .. tag_name .. "|" .. entity_name .. "|" .. ws_id .. "|@list" - - -- transform the dict into a sorted array - local arr = {} - local len = 0 - for id in pairs(keys_dict) do - len = len + 1 - arr[len] = id - end - -- stay consistent with pagination - sort(arr) - - local arr, err = marshall(arr) - if not arr then - return nil, err - end - - t:set(key, arr) - end - end - end - - for tag_name, tags in pairs(tags_by_name) do - yield(true, phase) - - -- tags:admin|@list -> all tags tagged "admin", regardless of the entity type - -- each tag is encoded as a string with the format "admin|services|uuid", where uuid is the service uuid - local key = "tags:" .. tag_name .. "|@list" - local tags, err = marshall(tags) - if not tags then - return nil, err - end - - t:set(key, tags) - end - - -- tags||@list -> all tags, with no distinction of tag name or entity type. - -- each tag is encoded as a string with the format "admin|services|uuid", where uuid is the service uuid - local tags, err = marshall(tags) - if not tags then - return nil, err - end - - t:set("tags||@list", tags) t:set(DECLARATIVE_HASH_KEY, hash) - - kong.default_workspace = default_workspace + t:set(DECLARATIVE_DEFAULT_WORKSPACE_KEY, default_workspace_id) local ok, err = t:commit() if not ok then @@ -492,9 +458,7 @@ local function load_into_cache(entities, meta, hash) kong.core_cache:purge() kong.cache:purge() - yield(false, phase) - - return true, nil, default_workspace + return true, nil, default_workspace_id end @@ -599,8 +563,15 @@ end return { get_current_hash = get_current_hash, unique_field_key = unique_field_key, + foreign_field_key = foreign_field_key, + foreign_field_key_prefix = foreign_field_key_prefix, + item_key = item_key, + item_key_prefix = item_key_prefix, + workspace_id = workspace_id, load_into_db = load_into_db, load_into_cache = load_into_cache, load_into_cache_with_events = load_into_cache_with_events, + insert_entity_for_txn = insert_entity_for_txn, + delete_entity_for_txn = delete_entity_for_txn, } diff --git a/kong/db/declarative/init.lua b/kong/db/declarative/init.lua index a7dd6d2b073..73a2704f51e 100644 --- a/kong/db/declarative/init.lua +++ b/kong/db/declarative/init.lua @@ -245,15 +245,22 @@ _M.to_yaml_file = declarative_export.to_yaml_file _M.export_from_db = declarative_export.export_from_db _M.export_config = declarative_export.export_config _M.export_config_proto = declarative_export.export_config_proto +_M.export_config_sync = declarative_export.export_config_sync _M.sanitize_output = declarative_export.sanitize_output -- import _M.get_current_hash = declarative_import.get_current_hash _M.unique_field_key = declarative_import.unique_field_key +_M.item_key = declarative_import.item_key +_M.item_key_prefix = declarative_import.item_key_prefix +_M.foreign_field_key_prefix = declarative_import.foreign_field_key_prefix _M.load_into_db = declarative_import.load_into_db _M.load_into_cache = declarative_import.load_into_cache _M.load_into_cache_with_events = declarative_import.load_into_cache_with_events +_M.insert_entity_for_txn = declarative_import.insert_entity_for_txn +_M.delete_entity_for_txn = declarative_import.delete_entity_for_txn +_M.workspace_id = declarative_import.workspace_id return _M diff --git a/kong/db/migrations/core/024_370_to_380.lua b/kong/db/migrations/core/024_370_to_380.lua new file mode 100644 index 00000000000..9d78807962c --- /dev/null +++ b/kong/db/migrations/core/024_370_to_380.lua @@ -0,0 +1,22 @@ +return { + postgres = { + up = [[ + DO $$ + BEGIN + CREATE TABLE IF NOT EXISTS clustering_sync_version ( + "version" SERIAL PRIMARY KEY + ); + CREATE TABLE IF NOT EXISTS clustering_sync_delta ( + "version" INT NOT NULL, + "type" TEXT NOT NULL, + "id" UUID NOT NULL, + "ws_id" UUID NOT NULL, + "row" JSON, + FOREIGN KEY (version) REFERENCES clustering_sync_version(version) ON DELETE CASCADE + ); + CREATE INDEX IF NOT EXISTS clustering_sync_delta_version_idx ON clustering_sync_delta (version); + END; + $$; + ]] + } +} diff --git a/kong/db/migrations/core/init.lua b/kong/db/migrations/core/init.lua index 2f18b1cb5f7..394f13bf382 100644 --- a/kong/db/migrations/core/init.lua +++ b/kong/db/migrations/core/init.lua @@ -21,4 +21,5 @@ return { "021_340_to_350", "022_350_to_360", "023_360_to_370", + "024_370_to_380", } diff --git a/kong/db/schema/others/declarative_config.lua b/kong/db/schema/others/declarative_config.lua index 4e56cb24826..6d7c47e4d50 100644 --- a/kong/db/schema/others/declarative_config.lua +++ b/kong/db/schema/others/declarative_config.lua @@ -39,15 +39,33 @@ local foreign_references = {} local foreign_children = {} -function DeclarativeConfig.pk_string(schema, object) - if #schema.primary_key == 1 then - return tostring(object[schema.primary_key[1]]) - else - local out = {} - for _, k in ipairs(schema.primary_key) do - insert(out, tostring(object[k])) +do + local tb_nkeys = require("table.nkeys") + local request_aware_table = require("kong.tools.request_aware_table") + + local CACHED_OUT + + -- Generate a stable and unique string key from primary key defined inside + -- schema, supports both non-composite and composite primary keys + function DeclarativeConfig.pk_string(schema, object) + local primary_key = schema.primary_key + local count = tb_nkeys(primary_key) + + if count == 1 then + return tostring(object[primary_key[1]]) + end + + if not CACHED_OUT then + CACHED_OUT = request_aware_table.new() + end + + CACHED_OUT.clear() + for i = 1, count do + local k = primary_key[i] + insert(CACHED_OUT, tostring(object[k])) end - return concat(out, ":") + + return concat(CACHED_OUT, ":") end end @@ -725,7 +743,7 @@ end local function insert_default_workspace_if_not_given(_, entities) - local default_workspace = find_default_ws(entities) or "0dc6f45b-8f8d-40d2-a504-473544ee190b" + local default_workspace = find_default_ws(entities) or constants.DECLARATIVE_DEFAULT_WORKSPACE_ID if not entities.workspaces then entities.workspaces = {} diff --git a/kong/db/strategies/connector.lua b/kong/db/strategies/connector.lua index 3f03cddc11f..719ef8078ca 100644 --- a/kong/db/strategies/connector.lua +++ b/kong/db/strategies/connector.lua @@ -5,8 +5,8 @@ local fmt = string.format local Connector = { defaults = { pagination = { - page_size = 1000, - max_page_size = 50000, + page_size = 512, -- work with lmdb + max_page_size = 512, -- work with lmdb }, }, } diff --git a/kong/db/strategies/off/init.lua b/kong/db/strategies/off/init.lua index c984510877a..1ab27cddf56 100644 --- a/kong/db/strategies/off/init.lua +++ b/kong/db/strategies/off/init.lua @@ -1,26 +1,31 @@ -local declarative_config = require "kong.db.schema.others.declarative_config" -local workspaces = require "kong.workspaces" +local declarative_config = require("kong.db.schema.others.declarative_config") local lmdb = require("resty.lmdb") +local lmdb_prefix = require("resty.lmdb.prefix") local marshaller = require("kong.db.declarative.marshaller") -local yield = require("kong.tools.yield").yield -local unique_field_key = require("kong.db.declarative").unique_field_key +local declarative = require("kong.db.declarative") local kong = kong local fmt = string.format local type = type local next = next -local sort = table.sort -local pairs = pairs -local match = string.match local assert = assert -local tostring = tostring -local tonumber = tonumber local encode_base64 = ngx.encode_base64 local decode_base64 = ngx.decode_base64 local null = ngx.null local unmarshall = marshaller.unmarshall local lmdb_get = lmdb.get -local get_workspace_id = workspaces.get_workspace_id +local pk_string = declarative_config.pk_string +local unique_field_key = declarative.unique_field_key +local item_key = declarative.item_key +local item_key_prefix = declarative.item_key_prefix +local workspace_id = declarative.workspace_id +local foreign_field_key_prefix = declarative.foreign_field_key_prefix + + +local PROCESS_AUTO_FIELDS_OPTS = { + no_defaults = true, + show_ws_id = true, +} local off = {} @@ -30,21 +35,16 @@ local _mt = {} _mt.__index = _mt -local function ws(schema, options) - if not schema.workspaceable then - return "" - end +local UNINIT_WORKSPACE_ID = "00000000-0000-0000-0000-000000000000" - if options then - if options.workspace == null then - return "*" - end - if options.workspace then - return options.workspace - end + +local function get_default_workspace() + if kong.default_workspace == UNINIT_WORKSPACE_ID then + local res = kong.db.workspaces:select_by_name("default") + kong.default_workspace = assert(res and res.id) end - return get_workspace_id() + return kong.default_workspace end @@ -53,229 +53,163 @@ local function process_ttl_field(entity) local ttl_value = entity.ttl - ngx.time() if ttl_value > 0 then entity.ttl = ttl_value + else entity = nil -- do not return the expired entity end end + return entity end --- Returns a dict of entity_ids tagged according to the given criteria. --- Currently only the following kinds of keys are supported: --- * A key like `services||@list` will only return service keys --- @tparam string the key to be used when filtering --- @tparam table tag_names an array of tag names (strings) --- @tparam string|nil tags_cond either "or", "and". `nil` means "or" --- @treturn table|nil returns a table with entity_ids as values, and `true` as keys -local function get_entity_ids_tagged(key, tag_names, tags_cond) - local tag_name, list, err - local dict = {} -- keys are entity_ids, values are true - - for i = 1, #tag_names do - tag_name = tag_names[i] - list, err = unmarshall(lmdb_get("taggings:" .. tag_name .. "|" .. key)) - if err then - return nil, err - end - - yield(true) +local function construct_entity(schema, value) + local entity, err = unmarshall(value) + if not entity then + return nil, err + end - list = list or {} + if schema.ttl then + entity = process_ttl_field(entity) + if not entity then + return nil + end + end - if i > 1 and tags_cond == "and" then - local list_len = #list - -- optimization: exit early when tags_cond == "and" and one of the tags does not return any entities - if list_len == 0 then - return {} - end + entity = schema:process_auto_fields(entity, "select", true, PROCESS_AUTO_FIELDS_OPTS) - local and_dict = {} - local new_tag_id - for i = 1, list_len do - new_tag_id = list[i] - and_dict[new_tag_id] = dict[new_tag_id] -- either true or nil - end - dict = and_dict + return entity +end - -- optimization: exit early when tags_cond == "and" and current list is empty - if not next(dict) then - return {} - end - else -- tags_cond == "or" or first iteration - -- the first iteration is the same for both "or" and "and": put all ids into dict - for i = 1, #list do - dict[list[i]] = true - end +-- select item by primary key, if follow is true, then one indirection +-- will be followed indirection means the value of `key` is not the actual +-- serialized item, but rather the value is a pointer to the key where +-- actual serialized item is located. This way this function can be shared +-- by both primary key lookup as well as unique key lookup without needing +-- to duplicate the item content +local function select_by_key(schema, key, follow) + if follow then + local actual_key, err = lmdb_get(key) + if not actual_key then + return nil, err end + + return select_by_key(schema, actual_key, false) end - local arr = {} - local len = 0 - for entity_id in pairs(dict) do - len = len + 1 - arr[len] = entity_id + local entity, err = construct_entity(schema, lmdb_get(key)) + if not entity then + return nil, err end - sort(arr) -- consistency when paginating results - return arr + return entity end -local function page_for_key(self, key, size, offset, options) +local function page_for_prefix(self, prefix, size, offset, options, follow) if not size then size = self.connector:get_page_size(options) end - if offset then - local token = decode_base64(offset) - if not token then - return nil, self.errors:invalid_offset(offset, "bad base64 encoding") - end - - local number = tonumber(token) - if not number then - return nil, self.errors:invalid_offset(offset, "invalid offset") - end + offset = offset or prefix - offset = number - - else - offset = 1 + local res, err_or_more = lmdb_prefix.page(offset, prefix, nil, size) + if not res then + return nil, err_or_more end - local list, err - if options and options.tags then - list, err = get_entity_ids_tagged(key, options.tags, options.tags_cond) - if err then - return nil, err - end - - else - list, err = unmarshall(lmdb_get(key)) - if err then - return nil, err - end - - list = list or {} - end - - yield() - local ret = {} - local ret_idx = 1 + local ret_idx = 0 local schema = self.schema - local schema_name = schema.name - - local item - for i = offset, offset + size - 1 do - item = list[i] - if not item then - offset = nil - break - end + local last_key - -- Tags are stored in the cache entries "tags||@list" and "tags:|@list" - -- The contents of both of these entries is an array of strings - -- Each of these strings has the form "||" - -- For example "admin|services|" - -- This loop transforms each individual string into tables. - if schema_name == "tags" then - local tag_name, entity_name, uuid = match(item, "^([^|]+)|([^|]+)|(.+)$") - if not tag_name then - return nil, "Could not parse tag from cache: " .. tostring(item) - end + for _, kv in ipairs(res) do + last_key = kv.key + local item, err - item = { tag = tag_name, entity_name = entity_name, entity_id = uuid } + if follow then + item, err = select_by_key(schema, kv.value, false) - -- The rest of entities' lists (i.e. "services||@list") only contain ids, so in order to - -- get the entities we must do an additional cache access per entry else - item, err = unmarshall(lmdb_get(item)) - if err then - return nil, err - end + item, err = construct_entity(schema, kv.value) end - if not item then - return nil, "stale data detected while paginating" - end - - if schema.ttl then - item = process_ttl_field(item) + if err then + return nil, err end - if item then - ret[ret_idx] = item - ret_idx = ret_idx + 1 - end + ret_idx = ret_idx + 1 + ret[ret_idx] = item end - if offset then - return ret, nil, encode_base64(tostring(offset + size), true) + -- more need to query + if err_or_more then + return ret, nil, encode_base64(last_key .. "\x00", true) end return ret end -local function select_by_key(schema, key) - local entity, err = unmarshall(lmdb_get(key)) - if not entity then - return nil, err - end +local function page(self, size, offset, options) + local schema = self.schema + local ws_id = workspace_id(schema, options) + local prefix = item_key_prefix(schema.name, ws_id) - if schema.ttl then - entity = process_ttl_field(entity) - if not entity then - return nil + if offset then + local token = decode_base64(offset) + if not token then + return nil, self.errors:invalid_offset(offset, "bad base64 encoding") end - end - - return entity -end + offset = token + end -local function page(self, size, offset, options) - local schema = self.schema - local ws_id = ws(schema, options) - local key = schema.name .. "|" .. ws_id .. "|@list" - return page_for_key(self, key, size, offset, options) + return page_for_prefix(self, prefix, size, offset, options, false) end +-- select by primary key local function select(self, pk, options) local schema = self.schema - local ws_id = ws(schema, options) - local id = declarative_config.pk_string(schema, pk) - local key = schema.name .. ":" .. id .. ":::::" .. ws_id - return select_by_key(schema, key) + local ws_id = workspace_id(schema, options) + local pk = pk_string(schema, pk) + local key = item_key(schema.name, ws_id, pk) + return select_by_key(schema, key, false) end +-- select by unique field (including select_by_cache_key) +-- the DAO guarantees this method only gets called for unique fields +-- see: validate_foreign_key_is_single_primary_key local function select_by_field(self, field, value, options) + local schema = self.schema + if type(value) == "table" then + -- select by foreign, DAO only support one key for now (no composites) + local fdata = schema.fields[field] + assert(fdata.type == "foreign") + assert(#kong.db[fdata.reference].schema.primary_key == 1) + local _ _, value = next(value) end - local schema = self.schema - local ws_id = ws(schema, options) + local ws_id = workspace_id(schema, options) local key - if field ~= "cache_key" then - local unique_across_ws = schema.fields[field].unique_across_ws - -- only accept global query by field if field is unique across workspaces - assert(not options or options.workspace ~= null or unique_across_ws) - - key = unique_field_key(schema.name, ws_id, field, value, unique_across_ws) - else - -- if select_by_cache_key, use the provided cache_key as key directly - key = value + local unique_across_ws = schema.fields[field].unique_across_ws + -- only accept global query by field if field is unique across workspaces + assert(not options or options.workspace ~= null or unique_across_ws) + + if unique_across_ws then + ws_id = get_default_workspace() end - return select_by_key(schema, key) + key = unique_field_key(schema.name, ws_id, field, value) + + return select_by_key(schema, key, true) end @@ -307,8 +241,6 @@ do _mt.upsert_by_field = unsupported_by("create or update") _mt.delete_by_field = unsupported_by("remove") _mt.truncate = function() return true end - -- off-strategy specific methods: - _mt.page_for_key = page_for_key end @@ -323,18 +255,17 @@ function off.new(connector, schema, errors) -- This is not the id for the default workspace in DB-less. -- This is a sentinel value for the init() phase before -- the declarative config is actually loaded. - kong.default_workspace = "00000000-0000-0000-0000-000000000000" + kong.default_workspace = UNINIT_WORKSPACE_ID end local name = schema.name for fname, fdata in schema:each_field() do if fdata.type == "foreign" then - local entity = fdata.reference local method = "page_for_" .. fname self[method] = function(_, foreign_key, size, offset, options) - local ws_id = ws(schema, options) - local key = name .. "|" .. ws_id .. "|" .. entity .. "|" .. foreign_key.id .. "|@list" - return page_for_key(self, key, size, offset, options) + local ws_id = workspace_id(schema, options) + local prefix = foreign_field_key_prefix(name, ws_id, fname, foreign_key.id) + return page_for_prefix(self, prefix, size, offset, options, true) end end end diff --git a/kong/db/strategies/off/tags.lua b/kong/db/strategies/off/tags.lua deleted file mode 100644 index 15498957df5..00000000000 --- a/kong/db/strategies/off/tags.lua +++ /dev/null @@ -1,11 +0,0 @@ -local Tags = {} - --- Used by /tags/:tag endpoint --- @tparam string tag_pk the tag value --- @treturn table|nil,err,offset -function Tags:page_by_tag(tag, size, offset, options) - local key = "tags:" .. tag .. "|list" - return self:page_for_key(key, size, offset, options) -end - -return Tags diff --git a/kong/init.lua b/kong/init.lua index 70abad8b59c..8fcfc592473 100644 --- a/kong/init.lua +++ b/kong/init.lua @@ -695,6 +695,11 @@ function Kong.init() if config.cluster_rpc then kong.rpc = require("kong.clustering.rpc.manager").new(config, kong.node.get_id()) + if config.cluster_incremental_sync then + kong.sync = require("kong.clustering.services.sync").new(db, is_control_plane(config)) + kong.sync:init(kong.rpc) + end + if is_data_plane(config) then require("kong.clustering.services.debug").init(kong.rpc) end @@ -757,7 +762,7 @@ function Kong.init() require("resty.kong.var").patch_metatable() - if config.dedicated_config_processing and is_data_plane(config) then + if config.dedicated_config_processing and is_data_plane(config) and not kong.sync then -- TODO: figure out if there is better value than 4096 -- 4096 is for the cocurrency of the lua-resty-timer-ng local ok, err = process.enable_privileged_agent(4096) @@ -874,8 +879,9 @@ function Kong.init_worker() kong.cache:invalidate_local(constants.ADMIN_GUI_KCONFIG_CACHE_KEY) end - if process.type() == "privileged agent" then + if process.type() == "privileged agent" and not kong.sync then if kong.clustering then + -- full sync cp/dp kong.clustering:init_worker() end return @@ -910,6 +916,7 @@ function Kong.init_worker() end elseif declarative_entities then + ok, err = load_declarative_config(kong.configuration, declarative_entities, declarative_meta, @@ -975,11 +982,18 @@ function Kong.init_worker() end if kong.clustering then - kong.clustering:init_worker() - local cluster_tls = require("kong.clustering.tls") + -- full sync cp/dp + if not kong.sync then + kong.clustering:init_worker() + end + -- rpc and incremental sync if kong.rpc and is_http_module then + + -- only available in http subsystem + local cluster_tls = require("kong.clustering.tls") + if is_data_plane(kong.configuration) then ngx.timer.at(0, function(premature) kong.rpc:connect(premature, @@ -992,6 +1006,11 @@ function Kong.init_worker() else -- control_plane kong.rpc.concentrator:start() end + + -- init incremental sync + if kong.sync then + kong.sync:init_worker() + end end end diff --git a/kong/pdk/vault.lua b/kong/pdk/vault.lua index 14bf4f8bbba..ed7f421b45a 100644 --- a/kong/pdk/vault.lua +++ b/kong/pdk/vault.lua @@ -1439,11 +1439,21 @@ local function new(self) end + local function should_register_crud_event() + local conf = self.configuration + + local not_dbless = conf.database ~= "off" -- postgres + local dp_with_inc_sync = conf.role == "data_plane" and + conf.cluster_incremental_sync + + return not_dbless or dp_with_inc_sync + end + local initialized --- -- Initializes vault. -- - -- Registers event handlers (on non-dbless nodes) and starts a recurring secrets + -- Registers event handlers and starts a recurring secrets -- rotation timer. It does nothing on control planes. -- -- @local @@ -1455,7 +1465,7 @@ local function new(self) initialized = true - if self.configuration.database ~= "off" then + if should_register_crud_event() then self.worker_events.register(handle_vault_crud_event, "crud", "vaults") end diff --git a/kong/runloop/events.lua b/kong/runloop/events.lua index b7b64b16181..dfc9718af3c 100644 --- a/kong/runloop/events.lua +++ b/kong/runloop/events.lua @@ -490,7 +490,11 @@ local function register_events(reconfigure_handler) if db.strategy == "off" then -- declarative config updates register_for_dbless(reconfigure_handler) - return + + -- dbless (not dataplane) has no other events + if not kong.sync then + return + end end register_for_db() diff --git a/kong/runloop/handler.lua b/kong/runloop/handler.lua index 3f6b08afb10..11986d08d11 100644 --- a/kong/runloop/handler.lua +++ b/kong/runloop/handler.lua @@ -358,7 +358,13 @@ local function new_router(version) end end - local detect_changes = db.strategy ~= "off" and kong.core_cache + local detect_changes = kong.core_cache and true + + -- for dbless we will not check changes when initing + if db.strategy == "off" and get_phase() == "init_worker" then + detect_changes = false + end + local counter = 0 local page_size = db.routes.pagination.max_page_size for route, err in db.routes:each(page_size, GLOBAL_QUERY_OPTS) do @@ -961,51 +967,87 @@ return { end end - if strategy ~= "off" then + do -- start some rebuild timers local worker_state_update_frequency = kong.configuration.worker_state_update_frequency or 1 - local function rebuild_timer(premature) + local router_async_opts = { + name = "router", + timeout = 0, + on_timeout = "return_true", + } + + local function rebuild_router_timer(premature) if premature then return end - local router_update_status, err = rebuild_router({ - name = "router", - timeout = 0, - on_timeout = "return_true", - }) - if not router_update_status then + -- Don't wait for the semaphore (timeout = 0) when updating via the + -- timer. + -- If the semaphore is locked, that means that the rebuild is + -- already ongoing. + local ok, err = rebuild_router(router_async_opts) + if not ok then log(ERR, "could not rebuild router via timer: ", err) end + end - local plugins_iterator_update_status, err = rebuild_plugins_iterator({ - name = "plugins_iterator", - timeout = 0, - on_timeout = "return_true", - }) - if not plugins_iterator_update_status then + local _, err = kong.timer:named_every("router-rebuild", + worker_state_update_frequency, + rebuild_router_timer) + if err then + log(ERR, "could not schedule timer to rebuild router: ", err) + end + + local plugins_iterator_async_opts = { + name = "plugins_iterator", + timeout = 0, + on_timeout = "return_true", + } + + local function rebuild_plugins_iterator_timer(premature) + if premature then + return + end + + local _, err = rebuild_plugins_iterator(plugins_iterator_async_opts) + if err then log(ERR, "could not rebuild plugins iterator via timer: ", err) end + end - if wasm.enabled() then - local wasm_update_status, err = rebuild_wasm_state({ - name = "wasm", - timeout = 0, - on_timeout = "return_true", - }) - if not wasm_update_status then + local _, err = kong.timer:named_every("plugins-iterator-rebuild", + worker_state_update_frequency, + rebuild_plugins_iterator_timer) + if err then + log(ERR, "could not schedule timer to rebuild plugins iterator: ", err) + end + + if wasm.enabled() then + local wasm_async_opts = { + name = "wasm", + timeout = 0, + on_timeout = "return_true", + } + + local function rebuild_wasm_filter_chains_timer(premature) + if premature then + return + end + + local _, err = rebuild_wasm_state(wasm_async_opts) + if err then log(ERR, "could not rebuild wasm filter chains via timer: ", err) end end - end - local _, err = kong.timer:named_every("rebuild", - worker_state_update_frequency, - rebuild_timer) - if err then - log(ERR, "could not schedule timer to rebuild: ", err) + local _, err = kong.timer:named_every("wasm-rebuild", + worker_state_update_frequency, + rebuild_wasm_filter_chains_timer) + if err then + log(ERR, "could not schedule timer to rebuild WASM filter chains: ", err) + end end - end + end -- rebuild timer do block end, }, preread = { diff --git a/kong/templates/kong_defaults.lua b/kong/templates/kong_defaults.lua index f2cc4e0f13a..4ffb2b24adf 100644 --- a/kong/templates/kong_defaults.lua +++ b/kong/templates/kong_defaults.lua @@ -41,7 +41,8 @@ cluster_ocsp = off cluster_max_payload = 16777216 cluster_use_proxy = off cluster_dp_labels = NONE -cluster_rpc = off +cluster_rpc = on +cluster_incremental_sync = off cluster_cjson = off lmdb_environment_path = dbless.lmdb diff --git a/spec/01-unit/01-db/04-dao_spec.lua b/spec/01-unit/01-db/04-dao_spec.lua index a7417805fd8..517db296456 100644 --- a/spec/01-unit/01-db/04-dao_spec.lua +++ b/spec/01-unit/01-db/04-dao_spec.lua @@ -659,7 +659,7 @@ describe("DAO", function() dao:delete({ id = 1 }) dao:delete({ id = 1 }) - assert.spy(post_hook).was_called(1) + assert.spy(post_hook).was_called(2) end) end) diff --git a/spec/01-unit/01-db/10-declarative_spec.lua b/spec/01-unit/01-db/10-declarative_spec.lua index 59020becffe..be683a2df37 100644 --- a/spec/01-unit/01-db/10-declarative_spec.lua +++ b/spec/01-unit/01-db/10-declarative_spec.lua @@ -48,16 +48,19 @@ keyauth_credentials: describe("unique_field_key()", function() local unique_field_key = declarative.unique_field_key + local sha256_hex = require("kong.tools.sha256").sha256_hex it("utilizes the schema name, workspace id, field name, and checksum of the field value", function() local key = unique_field_key("services", "123", "fieldname", "test", false) assert.is_string(key) - assert.equals("services|123|fieldname:test", key) + assert.equals("services|123|fieldname|" .. sha256_hex("test"), key) end) - it("omits the workspace id when 'unique_across_ws' is 'true'", function() + -- since incremental sync the param `unique_across_ws` is useless + -- this test case is just for compatibility + it("does not omits the workspace id when 'unique_across_ws' is 'true'", function() local key = unique_field_key("services", "123", "fieldname", "test", true) - assert.equals("services||fieldname:test", key) + assert.equals("services|123|fieldname|" .. sha256_hex("test"), key) end) end) diff --git a/spec/01-unit/01-db/11-declarative_lmdb_spec.lua b/spec/01-unit/01-db/11-declarative_lmdb_spec.lua index 42756078b8a..6fbe9181c96 100644 --- a/spec/01-unit/01-db/11-declarative_lmdb_spec.lua +++ b/spec/01-unit/01-db/11-declarative_lmdb_spec.lua @@ -201,11 +201,13 @@ describe("#off preserve nulls", function() assert(declarative.load_into_cache(entities, meta, current_hash)) local id, item = next(entities.basicauth_credentials) + + -- format changed after incremental sync local cache_key = concat({ - "basicauth_credentials:", - id, - ":::::", - item.ws_id + "basicauth_credentials|", + item.ws_id, + "|*|", + id }) local lmdb = require "resty.lmdb" @@ -222,17 +224,23 @@ describe("#off preserve nulls", function() for _, plugin in pairs(entities.plugins) do if plugin.name == PLUGIN_NAME then + + -- format changed after incremental sync cache_key = concat({ - "plugins:" .. PLUGIN_NAME .. ":", + "plugins|", + plugin.ws_id, + "|route|", plugin.route.id, - "::::", - plugin.ws_id + "|", + plugin.id }) value, err, hit_lvl = lmdb.get(cache_key) assert.is_nil(err) assert.are_equal(hit_lvl, 1) - cached_item = buffer.decode(value) + -- get value by the index key + cached_item = buffer.decode(lmdb.get(value)) + assert.are_same(cached_item, plugin) assert.are_equal(cached_item.config.large, null) assert.are_equal(cached_item.config.ttl, null) diff --git a/spec/01-unit/04-prefix_handler_spec.lua b/spec/01-unit/04-prefix_handler_spec.lua index c1e36f8060f..226949ff8b9 100644 --- a/spec/01-unit/04-prefix_handler_spec.lua +++ b/spec/01-unit/04-prefix_handler_spec.lua @@ -309,8 +309,7 @@ describe("NGINX conf compiler", function() assert.not_matches("ssl_dhparam", kong_nginx_conf) end) - -- TODO: enable when cluster RPC is GA - pending("renders RPC server", function() + it("renders RPC server", function() local conf = assert(conf_loader(helpers.test_conf_path, { role = "control_plane", cluster_cert = "spec/fixtures/kong_clustering.crt", @@ -322,20 +321,6 @@ describe("NGINX conf compiler", function() assert.matches("location = /v2/outlet {", kong_nginx_conf) end) - -- TODO: remove when cluster RPC is GA - it("does not render RPC server, even when cluster_rpc enabled", function() - local conf = assert(conf_loader(helpers.test_conf_path, { - role = "control_plane", - cluster_cert = "spec/fixtures/kong_clustering.crt", - cluster_cert_key = "spec/fixtures/kong_clustering.key", - cluster_rpc = "on", - cluster_listen = "127.0.0.1:9005", - nginx_conf = "spec/fixtures/custom_nginx.template", - })) - local kong_nginx_conf = prefix_handler.compile_kong_conf(conf) - assert.matches("location = /v2/outlet {", kong_nginx_conf) - end) - it("does not renders RPC server when inert", function() local conf = assert(conf_loader(helpers.test_conf_path, { role = "control_plane", diff --git a/spec/02-integration/04-admin_api/15-off_spec.lua b/spec/02-integration/04-admin_api/15-off_spec.lua index cfc6102ed51..554b445fced 100644 --- a/spec/02-integration/04-admin_api/15-off_spec.lua +++ b/spec/02-integration/04-admin_api/15-off_spec.lua @@ -3100,15 +3100,37 @@ describe("Admin API #off with Unique Foreign #unique", function() assert.equal(references.data[1].note, "note") assert.equal(references.data[1].unique_foreign.id, foreigns.data[1].id) + -- get default workspace id in lmdb + local cmd = string.format( + [[resty --main-conf "lmdb_environment_path %s/%s;" spec/fixtures/dump_lmdb_key.lua %q]], + TEST_CONF.prefix, TEST_CONF.lmdb_environment_path, + require("kong.constants").DECLARATIVE_DEFAULT_WORKSPACE_KEY) + + local handle = io.popen(cmd) + local ws_id = handle:read("*a") + handle:close() + + -- get unique_field_key local declarative = require "kong.db.declarative" - local key = declarative.unique_field_key("unique_references", "", "unique_foreign", + local key = declarative.unique_field_key("unique_references", ws_id, "unique_foreign", foreigns.data[1].id, true) - local cmd = string.format( [[resty --main-conf "lmdb_environment_path %s/%s;" spec/fixtures/dump_lmdb_key.lua %q]], TEST_CONF.prefix, TEST_CONF.lmdb_environment_path, key) + local handle = io.popen(cmd) + local unique_field_key = handle:read("*a") + handle:close() + + assert.is_string(unique_field_key, "non-string result from unique lookup") + assert.not_equals("", unique_field_key, "empty result from unique lookup") + + -- get the entity value + local cmd = string.format( + [[resty --main-conf "lmdb_environment_path %s/%s;" spec/fixtures/dump_lmdb_key.lua %q]], + TEST_CONF.prefix, TEST_CONF.lmdb_environment_path, unique_field_key) + local handle = io.popen(cmd) local result = handle:read("*a") handle:close() @@ -3116,11 +3138,15 @@ describe("Admin API #off with Unique Foreign #unique", function() assert.not_equals("", result, "empty result from unique lookup") local cached_reference = assert(require("kong.db.declarative.marshaller").unmarshall(result)) + + -- NOTE: we have changed internl LDMB storage format, and dao does not has this field(ws_id) + cached_reference.ws_id = nil + assert.same(cached_reference, references.data[1]) local cache = { get = function(_, k) - if k ~= "unique_references||unique_foreign:" .. foreigns.data[1].id then + if k ~= "unique_references|" ..ws_id .. "|unique_foreign:" .. foreigns.data[1].id then return nil end diff --git a/spec/02-integration/07-sdk/03-cluster_spec.lua b/spec/02-integration/07-sdk/03-cluster_spec.lua index b7af4481cf5..55d27beb732 100644 --- a/spec/02-integration/07-sdk/03-cluster_spec.lua +++ b/spec/02-integration/07-sdk/03-cluster_spec.lua @@ -40,8 +40,9 @@ fixtures_cp.http_mock.my_server_block = [[ } ]] +for _, inc_sync in ipairs { "on", "off" } do for _, strategy in helpers.each_strategy() do - describe("PDK: kong.cluster for #" .. strategy, function() + describe("PDK: kong.cluster for #" .. strategy .. " inc_sync=" .. inc_sync, function() local proxy_client lazy_setup(function() @@ -61,6 +62,7 @@ for _, strategy in helpers.each_strategy() do db_update_frequency = 0.1, cluster_listen = "127.0.0.1:9005", nginx_conf = "spec/fixtures/custom_nginx.template", + cluster_incremental_sync = inc_sync, }, nil, nil, fixtures_cp)) assert(helpers.start_kong({ @@ -72,6 +74,7 @@ for _, strategy in helpers.each_strategy() do cluster_control_plane = "127.0.0.1:9005", proxy_listen = "0.0.0.0:9002", nginx_conf = "spec/fixtures/custom_nginx.template", + cluster_incremental_sync = inc_sync, }, nil, nil, fixtures_dp)) end) @@ -108,4 +111,5 @@ for _, strategy in helpers.each_strategy() do end, 10) end) end) -end +end -- for _, strategy +end -- for inc_sync diff --git a/spec/02-integration/09-hybrid_mode/01-sync_spec.lua b/spec/02-integration/09-hybrid_mode/01-sync_spec.lua index 71bb46ead47..a1d2f9b3764 100644 --- a/spec/02-integration/09-hybrid_mode/01-sync_spec.lua +++ b/spec/02-integration/09-hybrid_mode/01-sync_spec.lua @@ -12,9 +12,11 @@ local uuid = require("kong.tools.uuid").uuid local KEY_AUTH_PLUGIN +--- XXX FIXME: enable inc_sync = on +for _, inc_sync in ipairs { "off" } do for _, strategy in helpers.each_strategy() do -describe("CP/DP communication #" .. strategy, function() +describe("CP/DP communication #" .. strategy .. " inc_sync=" .. inc_sync, function() lazy_setup(function() helpers.get_db_utils(strategy) -- runs migrations @@ -27,6 +29,7 @@ describe("CP/DP communication #" .. strategy, function() db_update_frequency = 0.1, cluster_listen = "127.0.0.1:9005", nginx_conf = "spec/fixtures/custom_nginx.template", + cluster_incremental_sync = inc_sync, })) assert(helpers.start_kong({ @@ -38,6 +41,7 @@ describe("CP/DP communication #" .. strategy, function() cluster_control_plane = "127.0.0.1:9005", proxy_listen = "0.0.0.0:9002", nginx_conf = "spec/fixtures/custom_nginx.template", + cluster_incremental_sync = inc_sync, })) for _, plugin in ipairs(helpers.get_plugins_list()) do @@ -263,7 +267,13 @@ describe("CP/DP communication #" .. strategy, function() method = "GET", path = "/soon-to-be-disabled", })) - assert.res_status(404, res) + + if inc_sync == "on" then + -- XXX incremental sync does not skip_disabled_services by default + assert.res_status(200, res) + else + assert.res_status(404, res) + end proxy_client:close() end) @@ -357,6 +367,7 @@ describe("CP/DP #version check #" .. strategy, function() cluster_listen = "127.0.0.1:9005", nginx_conf = "spec/fixtures/custom_nginx.template", cluster_version_check = "major_minor", + cluster_incremental_sync = inc_sync, })) for _, plugin in ipairs(helpers.get_plugins_list()) do @@ -624,6 +635,7 @@ describe("CP/DP config sync #" .. strategy, function() database = strategy, db_update_frequency = 3, cluster_listen = "127.0.0.1:9005", + cluster_incremental_sync = inc_sync, })) assert(helpers.start_kong({ @@ -634,6 +646,7 @@ describe("CP/DP config sync #" .. strategy, function() cluster_cert_key = "spec/fixtures/kong_clustering.key", cluster_control_plane = "127.0.0.1:9005", proxy_listen = "0.0.0.0:9002", + cluster_incremental_sync = inc_sync, })) end) @@ -736,6 +749,7 @@ describe("CP/DP labels #" .. strategy, function() db_update_frequency = 0.1, cluster_listen = "127.0.0.1:9005", nginx_conf = "spec/fixtures/custom_nginx.template", + cluster_incremental_sync = inc_sync, })) assert(helpers.start_kong({ @@ -748,6 +762,7 @@ describe("CP/DP labels #" .. strategy, function() proxy_listen = "0.0.0.0:9002", nginx_conf = "spec/fixtures/custom_nginx.template", cluster_dp_labels="deployment:mycloud,region:us-east-1", + cluster_incremental_sync = inc_sync, })) end) @@ -796,6 +811,7 @@ describe("CP/DP cert details(cluster_mtls = shared) #" .. strategy, function() db_update_frequency = 0.1, cluster_listen = "127.0.0.1:9005", nginx_conf = "spec/fixtures/custom_nginx.template", + cluster_incremental_sync = inc_sync, })) assert(helpers.start_kong({ @@ -808,6 +824,7 @@ describe("CP/DP cert details(cluster_mtls = shared) #" .. strategy, function() proxy_listen = "0.0.0.0:9002", nginx_conf = "spec/fixtures/custom_nginx.template", cluster_dp_labels="deployment:mycloud,region:us-east-1", + cluster_incremental_sync = inc_sync, })) end) @@ -854,6 +871,7 @@ describe("CP/DP cert details(cluster_mtls = pki) #" .. strategy, function() -- additional attributes for PKI: cluster_mtls = "pki", cluster_ca_cert = "spec/fixtures/kong_clustering_ca.crt", + cluster_incremental_sync = inc_sync, })) assert(helpers.start_kong({ @@ -869,6 +887,7 @@ describe("CP/DP cert details(cluster_mtls = pki) #" .. strategy, function() cluster_mtls = "pki", cluster_server_name = "kong_clustering", cluster_ca_cert = "spec/fixtures/kong_clustering.crt", + cluster_incremental_sync = inc_sync, })) end) @@ -900,4 +919,5 @@ describe("CP/DP cert details(cluster_mtls = pki) #" .. strategy, function() end) end) -end +end -- for _, strategy +end -- for inc_sync diff --git a/spec/02-integration/09-hybrid_mode/03-pki_spec.lua b/spec/02-integration/09-hybrid_mode/03-pki_spec.lua index 00d8b483cdc..ec6912d0760 100644 --- a/spec/02-integration/09-hybrid_mode/03-pki_spec.lua +++ b/spec/02-integration/09-hybrid_mode/03-pki_spec.lua @@ -2,9 +2,10 @@ local helpers = require "spec.helpers" local cjson = require "cjson.safe" +for _, inc_sync in ipairs { "on", "off" } do for _, strategy in helpers.each_strategy() do -describe("CP/DP PKI sync #" .. strategy, function() +describe("CP/DP PKI sync #" .. strategy .. " inc_sync=" .. inc_sync, function() lazy_setup(function() helpers.get_db_utils(strategy, { @@ -25,6 +26,7 @@ describe("CP/DP PKI sync #" .. strategy, function() -- additional attributes for PKI: cluster_mtls = "pki", cluster_ca_cert = "spec/fixtures/kong_clustering_ca.crt", + cluster_incremental_sync = inc_sync, })) assert(helpers.start_kong({ @@ -40,6 +42,7 @@ describe("CP/DP PKI sync #" .. strategy, function() cluster_mtls = "pki", cluster_server_name = "kong_clustering", cluster_ca_cert = "spec/fixtures/kong_clustering.crt", + cluster_incremental_sync = inc_sync, })) end) @@ -88,9 +91,12 @@ describe("CP/DP PKI sync #" .. strategy, function() end) describe("sync works", function() + -- XXX FIXME + local skip_inc_sync = inc_sync == "on" and pending or it + local route_id - it("proxy on DP follows CP config", function() + skip_inc_sync("proxy on DP follows CP config", function() local admin_client = helpers.admin_client(10000) finally(function() admin_client:close() @@ -127,7 +133,7 @@ describe("CP/DP PKI sync #" .. strategy, function() end, 10) end) - it("cache invalidation works on config change", function() + skip_inc_sync("cache invalidation works on config change", function() local admin_client = helpers.admin_client() finally(function() admin_client:close() @@ -158,4 +164,5 @@ describe("CP/DP PKI sync #" .. strategy, function() end) end) -end +end -- for _, strategy +end -- for inc_sync diff --git a/spec/02-integration/09-hybrid_mode/04-cp_cluster_sync_spec.lua b/spec/02-integration/09-hybrid_mode/04-cp_cluster_sync_spec.lua index 2e593dd885f..da00efcf6d8 100644 --- a/spec/02-integration/09-hybrid_mode/04-cp_cluster_sync_spec.lua +++ b/spec/02-integration/09-hybrid_mode/04-cp_cluster_sync_spec.lua @@ -19,8 +19,10 @@ local function find_in_file(filepath, pat) return found end + +for _, inc_sync in ipairs { "on", "off" } do for _, strategy in helpers.each_strategy() do - describe("CP/CP sync works with #" .. strategy .. " backend", function() + describe("CP/CP sync works with #" .. strategy .. " inc_sync=" .. inc_sync .. " backend", function() lazy_setup(function() helpers.get_db_utils(strategy, { "routes", "services" }) @@ -34,6 +36,7 @@ for _, strategy in helpers.each_strategy() do cluster_cert = "spec/fixtures/kong_clustering.crt", cluster_cert_key = "spec/fixtures/kong_clustering.key", database = strategy, + cluster_incremental_sync = inc_sync, })) assert(helpers.start_kong({ @@ -46,6 +49,7 @@ for _, strategy in helpers.each_strategy() do cluster_cert = "spec/fixtures/kong_clustering.crt", cluster_cert_key = "spec/fixtures/kong_clustering.key", database = strategy, + cluster_incremental_sync = inc_sync, })) end) @@ -77,4 +81,5 @@ for _, strategy in helpers.each_strategy() do end, 10) end) end) -end +end -- for _, strategy +end -- for inc_sync diff --git a/spec/02-integration/09-hybrid_mode/05-ocsp_spec.lua b/spec/02-integration/09-hybrid_mode/05-ocsp_spec.lua index d297a6ab6b8..254b09555f8 100644 --- a/spec/02-integration/09-hybrid_mode/05-ocsp_spec.lua +++ b/spec/02-integration/09-hybrid_mode/05-ocsp_spec.lua @@ -14,9 +14,10 @@ local function set_ocsp_status(status) end +for _, inc_sync in ipairs { "on", "off" } do for _, strategy in helpers.each_strategy() do -describe("cluster_ocsp = on works #" .. strategy, function() +describe("cluster_ocsp = on works #" .. strategy .. " inc_sync=" .. inc_sync, function() describe("DP certificate good", function() lazy_setup(function() helpers.get_db_utils(strategy, { @@ -40,6 +41,7 @@ describe("cluster_ocsp = on works #" .. strategy, function() -- additional attributes for PKI: cluster_mtls = "pki", cluster_ca_cert = "spec/fixtures/ocsp_certs/ca.crt", + cluster_incremental_sync = inc_sync, })) set_ocsp_status("good") @@ -57,6 +59,7 @@ describe("cluster_ocsp = on works #" .. strategy, function() cluster_mtls = "pki", cluster_server_name = "kong_clustering", cluster_ca_cert = "spec/fixtures/ocsp_certs/ca.crt", + cluster_incremental_sync = inc_sync, })) end) @@ -110,6 +113,7 @@ describe("cluster_ocsp = on works #" .. strategy, function() -- additional attributes for PKI: cluster_mtls = "pki", cluster_ca_cert = "spec/fixtures/ocsp_certs/ca.crt", + cluster_incremental_sync = inc_sync, })) set_ocsp_status("revoked") @@ -127,6 +131,7 @@ describe("cluster_ocsp = on works #" .. strategy, function() cluster_server_name = "kong_clustering", cluster_ca_cert = "spec/fixtures/ocsp_certs/ca.crt", nginx_conf = "spec/fixtures/custom_nginx.template", + cluster_incremental_sync = inc_sync, })) end) @@ -178,6 +183,7 @@ describe("cluster_ocsp = on works #" .. strategy, function() -- additional attributes for PKI: cluster_mtls = "pki", cluster_ca_cert = "spec/fixtures/ocsp_certs/ca.crt", + cluster_incremental_sync = inc_sync, })) set_ocsp_status("error") @@ -195,6 +201,7 @@ describe("cluster_ocsp = on works #" .. strategy, function() cluster_server_name = "kong_clustering", cluster_ca_cert = "spec/fixtures/ocsp_certs/ca.crt", nginx_conf = "spec/fixtures/custom_nginx.template", + cluster_incremental_sync = inc_sync, })) end) @@ -225,7 +232,7 @@ describe("cluster_ocsp = on works #" .. strategy, function() end) end) -describe("cluster_ocsp = off works with #" .. strategy .. " backend", function() +describe("cluster_ocsp = off works with #" .. strategy .. " inc_sync=" .. inc_sync .. " backend", function() describe("DP certificate revoked, not checking for OCSP", function() lazy_setup(function() helpers.get_db_utils(strategy, { @@ -249,6 +256,7 @@ describe("cluster_ocsp = off works with #" .. strategy .. " backend", function() -- additional attributes for PKI: cluster_mtls = "pki", cluster_ca_cert = "spec/fixtures/ocsp_certs/ca.crt", + cluster_incremental_sync = inc_sync, })) set_ocsp_status("revoked") @@ -266,6 +274,7 @@ describe("cluster_ocsp = off works with #" .. strategy .. " backend", function() cluster_mtls = "pki", cluster_server_name = "kong_clustering", cluster_ca_cert = "spec/fixtures/ocsp_certs/ca.crt", + cluster_incremental_sync = inc_sync, })) end) @@ -297,7 +306,7 @@ describe("cluster_ocsp = off works with #" .. strategy .. " backend", function() end) end) -describe("cluster_ocsp = optional works with #" .. strategy .. " backend", function() +describe("cluster_ocsp = optional works with #" .. strategy .. " inc_sync=" .. inc_sync .. " backend", function() describe("DP certificate revoked", function() lazy_setup(function() helpers.get_db_utils(strategy, { @@ -321,6 +330,7 @@ describe("cluster_ocsp = optional works with #" .. strategy .. " backend", funct -- additional attributes for PKI: cluster_mtls = "pki", cluster_ca_cert = "spec/fixtures/ocsp_certs/ca.crt", + cluster_incremental_sync = inc_sync, })) set_ocsp_status("revoked") @@ -338,6 +348,7 @@ describe("cluster_ocsp = optional works with #" .. strategy .. " backend", funct cluster_server_name = "kong_clustering", cluster_ca_cert = "spec/fixtures/ocsp_certs/ca.crt", nginx_conf = "spec/fixtures/custom_nginx.template", + cluster_incremental_sync = inc_sync, })) end) @@ -389,6 +400,7 @@ describe("cluster_ocsp = optional works with #" .. strategy .. " backend", funct -- additional attributes for PKI: cluster_mtls = "pki", cluster_ca_cert = "spec/fixtures/ocsp_certs/ca.crt", + cluster_incremental_sync = inc_sync, })) set_ocsp_status("error") @@ -406,6 +418,7 @@ describe("cluster_ocsp = optional works with #" .. strategy .. " backend", funct cluster_server_name = "kong_clustering", cluster_ca_cert = "spec/fixtures/ocsp_certs/ca.crt", nginx_conf = "spec/fixtures/custom_nginx.template", + cluster_incremental_sync = inc_sync, })) end) @@ -440,4 +453,5 @@ describe("cluster_ocsp = optional works with #" .. strategy .. " backend", funct end) end) -end +end -- for _, strategy +end -- for inc_sync diff --git a/spec/02-integration/09-hybrid_mode/08-lazy_export_spec.lua b/spec/02-integration/09-hybrid_mode/08-lazy_export_spec.lua index 35a25a5b3ad..b4fedacb084 100644 --- a/spec/02-integration/09-hybrid_mode/08-lazy_export_spec.lua +++ b/spec/02-integration/09-hybrid_mode/08-lazy_export_spec.lua @@ -2,7 +2,7 @@ local helpers = require "spec.helpers" local admin_client -local function cp(strategy) +local function cp(strategy, inc_sync) helpers.get_db_utils(strategy) -- make sure the DB is fresh n' clean assert(helpers.start_kong({ role = "control_plane", @@ -14,6 +14,7 @@ local function cp(strategy) -- additional attributes for PKI: cluster_mtls = "pki", cluster_ca_cert = "spec/fixtures/ocsp_certs/ca.crt", + cluster_incremental_sync = inc_sync, })) admin_client = assert(helpers.admin_client()) end @@ -34,7 +35,7 @@ local function touch_config() })) end -local function json_dp() +local function json_dp(inc_sync) assert(helpers.start_kong({ role = "data_plane", database = "off", @@ -47,30 +48,37 @@ local function json_dp() cluster_mtls = "pki", cluster_server_name = "kong_clustering", cluster_ca_cert = "spec/fixtures/ocsp_certs/ca.crt", + cluster_incremental_sync = inc_sync, })) end +for _, inc_sync in ipairs { "on", "off" } do for _, strategy in helpers.each_strategy() do -describe("lazy_export with #".. strategy, function() +describe("lazy_export with #".. strategy .. " inc_sync=" .. inc_sync, function() describe("no DP", function () setup(function() - cp(strategy) + cp(strategy, inc_sync) end) teardown(function () helpers.stop_kong() end) it("test", function () touch_config() - assert.logfile().has.line("[clustering] skipping config push (no connected clients)", true) + if inc_sync == "on" then + assert.logfile().has.no.line("[kong.sync.v2] config push (connected client)", true) + + else + assert.logfile().has.line("[clustering] skipping config push (no connected clients)", true) + end end) end) describe("only json DP", function() setup(function() - cp(strategy) - json_dp() + cp(strategy, inc_sync) + json_dp(inc_sync) end) teardown(function () helpers.stop_kong("dp1") @@ -79,11 +87,18 @@ describe("lazy_export with #".. strategy, function() it("test", function () touch_config() - assert.logfile().has.line("[clustering] exporting config", true) - assert.logfile().has.line("[clustering] config pushed to 1 data-plane nodes", true) + if inc_sync == "on" then + assert.logfile().has.line("[kong.sync.v2] config push (connected client)", true) + assert.logfile().has.line("[kong.sync.v2] database is empty or too far behind for node_id", true) + + else + assert.logfile().has.line("[clustering] exporting config", true) + assert.logfile().has.line("[clustering] config pushed to 1 data-plane nodes", true) + end end) end) end) -end +end -- for _, strategy +end -- for inc_sync diff --git a/spec/02-integration/09-hybrid_mode/09-config-compat_spec.lua b/spec/02-integration/09-hybrid_mode/09-config-compat_spec.lua index 6d9f2842c6f..8c2b26fba41 100644 --- a/spec/02-integration/09-hybrid_mode/09-config-compat_spec.lua +++ b/spec/02-integration/09-hybrid_mode/09-config-compat_spec.lua @@ -76,6 +76,8 @@ local function get_sync_status(id) end +-- XXX TODO: helpers.clustering_client supports incremental sync +for _, inc_sync in ipairs { "off" } do for _, strategy in helpers.each_strategy() do describe("CP/DP config compat transformations #" .. strategy, function() @@ -101,6 +103,7 @@ describe("CP/DP config compat transformations #" .. strategy, function() cluster_listen = CP_HOST .. ":" .. CP_PORT, nginx_conf = "spec/fixtures/custom_nginx.template", plugins = "bundled", + cluster_incremental_sync = inc_sync, })) end) @@ -1198,3 +1201,4 @@ describe("CP/DP config compat transformations #" .. strategy, function() end) end -- each strategy +end -- for inc_sync diff --git a/spec/02-integration/09-hybrid_mode/09-node-id-persistence_spec.lua b/spec/02-integration/09-hybrid_mode/09-node-id-persistence_spec.lua index cd67cd502c2..02b67914c0f 100644 --- a/spec/02-integration/09-hybrid_mode/09-node-id-persistence_spec.lua +++ b/spec/02-integration/09-hybrid_mode/09-node-id-persistence_spec.lua @@ -83,8 +83,10 @@ local function start_kong_debug(env) end +--- XXX FIXME: enable inc_sync = on +for _, inc_sync in ipairs { "off" } do for _, strategy in helpers.each_strategy() do - describe("node id persistence", function() + describe("node id persistence " .. " inc_sync=" .. inc_sync, function() local control_plane_config = { role = "control_plane", @@ -93,6 +95,7 @@ for _, strategy in helpers.each_strategy() do cluster_cert_key = "spec/fixtures/kong_clustering.key", cluster_listen = "127.0.0.1:9005", nginx_conf = "spec/fixtures/custom_nginx.template", + cluster_incremental_sync = inc_sync, } local data_plane_config = { @@ -107,6 +110,7 @@ for _, strategy in helpers.each_strategy() do database = "off", untrusted_lua = "on", nginx_conf = "spec/fixtures/custom_nginx.template", + cluster_incremental_sync = inc_sync, } local admin_client @@ -322,4 +326,5 @@ for _, strategy in helpers.each_strategy() do end) end) -end +end -- for _, strategy +end -- for inc_sync diff --git a/spec/02-integration/09-hybrid_mode/10-forward-proxy_spec.lua b/spec/02-integration/09-hybrid_mode/10-forward-proxy_spec.lua index f4f175550bb..a7f11e41059 100644 --- a/spec/02-integration/09-hybrid_mode/10-forward-proxy_spec.lua +++ b/spec/02-integration/09-hybrid_mode/10-forward-proxy_spec.lua @@ -71,9 +71,11 @@ local proxy_configs = { -- if existing lmdb data is set, the service/route exists and -- test run too fast before the proxy connection is established +-- XXX FIXME: enable inc_sync = on +for _, inc_sync in ipairs { "off" } do for _, strategy in helpers.each_strategy() do for proxy_desc, proxy_opts in pairs(proxy_configs) do - describe("CP/DP sync through proxy (" .. proxy_desc .. ") works with #" .. strategy .. " backend", function() + describe("CP/DP sync through proxy (" .. proxy_desc .. ") works with #" .. strategy .. " inc_sync=" .. inc_sync .. " backend", function() lazy_setup(function() helpers.get_db_utils(strategy) -- runs migrations @@ -85,6 +87,7 @@ for _, strategy in helpers.each_strategy() do db_update_frequency = 0.1, cluster_listen = "127.0.0.1:9005", nginx_conf = "spec/fixtures/custom_nginx.template", + cluster_incremental_sync = inc_sync, })) assert(helpers.start_kong({ @@ -105,6 +108,8 @@ for _, strategy in helpers.each_strategy() do proxy_server_ssl_verify = proxy_opts.proxy_server_ssl_verify, lua_ssl_trusted_certificate = proxy_opts.lua_ssl_trusted_certificate, + cluster_incremental_sync = inc_sync, + -- this is unused, but required for the template to include a stream {} block stream_listen = "0.0.0.0:5555", }, nil, nil, fixtures)) @@ -166,4 +171,5 @@ for _, strategy in helpers.each_strategy() do end) end -- proxy configs -end +end -- for _, strategy +end -- for inc_sync diff --git a/spec/02-integration/09-hybrid_mode/11-status_spec.lua b/spec/02-integration/09-hybrid_mode/11-status_spec.lua index 02b2abff9c5..5c438684770 100644 --- a/spec/02-integration/09-hybrid_mode/11-status_spec.lua +++ b/spec/02-integration/09-hybrid_mode/11-status_spec.lua @@ -4,9 +4,10 @@ local helpers = require "spec.helpers" local cp_status_port = helpers.get_available_port() local dp_status_port = 8100 +for _, inc_sync in ipairs { "on", "off" } do for _, strategy in helpers.each_strategy() do - describe("Hybrid Mode - status ready #" .. strategy, function() + describe("Hybrid Mode - status ready #" .. strategy .. " inc_sync=" .. inc_sync, function() helpers.get_db_utils(strategy, {}) @@ -21,6 +22,7 @@ for _, strategy in helpers.each_strategy() do proxy_listen = "127.0.0.1:9002", nginx_main_worker_processes = 8, status_listen = "127.0.0.1:" .. dp_status_port, + cluster_incremental_sync = inc_sync, }) end @@ -34,6 +36,7 @@ for _, strategy in helpers.each_strategy() do cluster_listen = "127.0.0.1:9005", nginx_conf = "spec/fixtures/custom_nginx.template", status_listen = "127.0.0.1:" .. cp_status_port, + cluster_incremental_sync = inc_sync, }) end @@ -67,6 +70,8 @@ for _, strategy in helpers.each_strategy() do end) describe("dp status ready endpoint for no config", function() + -- XXX FIXME + local skip_inc_sync = inc_sync == "on" and pending or it lazy_setup(function() assert(start_kong_cp()) @@ -99,7 +104,7 @@ for _, strategy in helpers.each_strategy() do -- now dp receive config from cp, so dp should be ready - it("should return 200 on data plane after configuring", function() + skip_inc_sync("should return 200 on data plane after configuring", function() helpers.wait_until(function() local http_client = helpers.http_client('127.0.0.1', dp_status_port) @@ -156,4 +161,5 @@ for _, strategy in helpers.each_strategy() do end) end) -end +end -- for _, strategy +end -- for inc_sync diff --git a/spec/02-integration/09-hybrid_mode/12-errors_spec.lua b/spec/02-integration/09-hybrid_mode/12-errors_spec.lua index 98755b6a9e1..fbbc3049cd5 100644 --- a/spec/02-integration/09-hybrid_mode/12-errors_spec.lua +++ b/spec/02-integration/09-hybrid_mode/12-errors_spec.lua @@ -69,8 +69,10 @@ local function get_error_report(client, msg) end +-- XXX TODO: mock_cp does not support incremental sync rpc +for _, inc_sync in ipairs { "off" } do for _, strategy in helpers.each_strategy() do - describe("CP/DP sync error-reporting with #" .. strategy .. " backend", function() + describe("CP/DP sync error-reporting with #" .. strategy .. " inc_sync=" .. inc_sync .. " backend", function() local client local cluster_port local cluster_ssl_port @@ -100,6 +102,7 @@ for _, strategy in helpers.each_strategy() do -- use a small map size so that it's easy for us to max it out lmdb_map_size = "1m", plugins = "bundled,cluster-error-reporting", + cluster_incremental_sync = inc_sync, }, nil, nil, fixtures)) end) @@ -256,4 +259,5 @@ for _, strategy in helpers.each_strategy() do assert.equals("map full", e.error.message) end) end) -end +end -- for _, strategy +end -- for inc_sync diff --git a/spec/02-integration/09-hybrid_mode/13-deprecations_spec.lua b/spec/02-integration/09-hybrid_mode/13-deprecations_spec.lua index c19792ead11..8ddf85e80b3 100644 --- a/spec/02-integration/09-hybrid_mode/13-deprecations_spec.lua +++ b/spec/02-integration/09-hybrid_mode/13-deprecations_spec.lua @@ -3,8 +3,9 @@ local join = require("pl.stringx").join local ENABLED_PLUGINS = { "dummy" , "reconfiguration-completion"} +for _, inc_sync in ipairs { "on", "off" } do for _, strategy in helpers.each_strategy({"postgres"}) do - describe("deprecations are not reported on DP but on CP", function() + describe("deprecations are not reported on DP but on CP " .. " inc_sync=" .. inc_sync, function() local cp_prefix = "servroot1" local dp_prefix = "servroot2" local cp_logfile, dp_logfile, route @@ -41,6 +42,7 @@ for _, strategy in helpers.each_strategy({"postgres"}) do nginx_conf = "spec/fixtures/custom_nginx.template", admin_listen = "0.0.0.0:9001", proxy_listen = "off", + cluster_incremental_sync = inc_sync, })) assert(helpers.start_kong({ @@ -55,6 +57,7 @@ for _, strategy in helpers.each_strategy({"postgres"}) do plugins = "bundled," .. join(",", ENABLED_PLUGINS), admin_listen = "off", proxy_listen = "0.0.0.0:9002", + cluster_incremental_sync = inc_sync, })) dp_logfile = helpers.get_running_conf(dp_prefix).nginx_err_logs cp_logfile = helpers.get_running_conf(cp_prefix).nginx_err_logs @@ -111,4 +114,5 @@ for _, strategy in helpers.each_strategy({"postgres"}) do end) end) end) -end +end -- for _, strategy +end -- for inc_sync diff --git a/spec/02-integration/18-hybrid_rpc/01-rpc_spec.lua b/spec/02-integration/18-hybrid_rpc/01-rpc_spec.lua index c706b0824bc..21eda945c6d 100644 --- a/spec/02-integration/18-hybrid_rpc/01-rpc_spec.lua +++ b/spec/02-integration/18-hybrid_rpc/01-rpc_spec.lua @@ -1,8 +1,9 @@ local helpers = require "spec.helpers" local cjson = require("cjson.safe") +for _, inc_sync in ipairs { "on", "off" } do for _, strategy in helpers.each_strategy() do - describe("Hybrid Mode RPC #" .. strategy, function() + describe("Hybrid Mode RPC #" .. strategy .. " inc_sync=" .. inc_sync, function() lazy_setup(function() helpers.get_db_utils(strategy, { @@ -15,8 +16,8 @@ for _, strategy in helpers.each_strategy() do cluster_cert_key = "spec/fixtures/kong_clustering.key", database = strategy, cluster_listen = "127.0.0.1:9005", - cluster_rpc = "on", nginx_conf = "spec/fixtures/custom_nginx.template", + cluster_incremental_sync = inc_sync, -- incremental sync })) assert(helpers.start_kong({ @@ -26,9 +27,9 @@ for _, strategy in helpers.each_strategy() do cluster_cert = "spec/fixtures/kong_clustering.crt", cluster_cert_key = "spec/fixtures/kong_clustering.key", cluster_control_plane = "127.0.0.1:9005", - cluster_rpc = "on", proxy_listen = "0.0.0.0:9002", nginx_conf = "spec/fixtures/custom_nginx.template", + cluster_incremental_sync = inc_sync, -- incremental sync })) end) @@ -38,32 +39,7 @@ for _, strategy in helpers.each_strategy() do end) describe("status API", function() - -- TODO: remove this test once cluster RPC is GA - it("no DR RPC capabilities exist", function() - -- This should time out, we expect no RPC capabilities - local status = pcall(helpers.wait_until, function() - local admin_client = helpers.admin_client() - finally(function() - admin_client:close() - end) - - local res = assert(admin_client:get("/clustering/data-planes")) - local body = assert.res_status(200, res) - local json = cjson.decode(body) - - for _, v in pairs(json.data) do - if v.ip == "127.0.0.1" and v.rpc_capabilities and #v.rpc_capabilities ~= 0 then - table.sort(v.rpc_capabilities) - assert.near(14 * 86400, v.ttl, 3) - assert.same({ "kong.debug.log_level.v1", }, v.rpc_capabilities) - return true - end - end - end, 10) - assert.is_false(status) - end) - - pending("shows DP RPC capability status", function() + it("shows DP RPC capability status", function() helpers.wait_until(function() local admin_client = helpers.admin_client() finally(function() @@ -78,7 +54,8 @@ for _, strategy in helpers.each_strategy() do if v.ip == "127.0.0.1" and v.rpc_capabilities and #v.rpc_capabilities ~= 0 then table.sort(v.rpc_capabilities) assert.near(14 * 86400, v.ttl, 3) - assert.same({ "kong.debug.log_level.v1", }, v.rpc_capabilities) + -- kong.debug.log_level.v1 should be the first rpc service + assert.same("kong.debug.log_level.v1", v.rpc_capabilities[1]) return true end end @@ -86,4 +63,5 @@ for _, strategy in helpers.each_strategy() do end) end) end) -end +end -- for _, strategy +end -- for inc_sync diff --git a/spec/02-integration/18-hybrid_rpc/02-log-level_spec.lua b/spec/02-integration/18-hybrid_rpc/02-log-level_spec.lua index 10ed37f5272..d53fc541dec 100644 --- a/spec/02-integration/18-hybrid_rpc/02-log-level_spec.lua +++ b/spec/02-integration/18-hybrid_rpc/02-log-level_spec.lua @@ -27,8 +27,9 @@ local function obtain_dp_node_id() end +for _, inc_sync in ipairs { "on", "off" } do for _, strategy in helpers.each_strategy() do - describe("Hybrid Mode RPC #" .. strategy, function() + describe("Hybrid Mode RPC #" .. strategy .. " inc_sync=" .. inc_sync, function() lazy_setup(function() helpers.get_db_utils(strategy, { @@ -41,8 +42,8 @@ for _, strategy in helpers.each_strategy() do cluster_cert_key = "spec/fixtures/kong_clustering.key", database = strategy, cluster_listen = "127.0.0.1:9005", - cluster_rpc = "on", nginx_conf = "spec/fixtures/custom_nginx.template", + cluster_incremental_sync = inc_sync, -- incremental sync })) assert(helpers.start_kong({ @@ -52,9 +53,9 @@ for _, strategy in helpers.each_strategy() do cluster_cert = "spec/fixtures/kong_clustering.crt", cluster_cert_key = "spec/fixtures/kong_clustering.key", cluster_control_plane = "127.0.0.1:9005", - cluster_rpc = "on", proxy_listen = "0.0.0.0:9002", nginx_conf = "spec/fixtures/custom_nginx.template", + cluster_incremental_sync = inc_sync, -- incremental sync })) end) @@ -64,22 +65,7 @@ for _, strategy in helpers.each_strategy() do end) describe("Dynamic log level over RPC", function() - - -- TODO: remove when cluster RPC is GA - it("log level API is unavailable", function() - local dp_node_id = obtain_dp_node_id() - - local admin_client = helpers.admin_client() - finally(function() - admin_client:close() - end) - - local res = assert(admin_client:get("/clustering/data-planes/" .. dp_node_id .. "/log-level")) - assert.res_status(404, res) - end) - - -- TODO: enable when cluster RPC is GA - pending("can get the current log level", function() + it("can get the current log level", function() local dp_node_id = obtain_dp_node_id() local admin_client = helpers.admin_client() @@ -95,8 +81,7 @@ for _, strategy in helpers.each_strategy() do assert.equal("debug", json.original_level) end) - -- TODO: enable when cluster RPC is GA - pending("can set the current log level", function() + it("can set the current log level", function() local dp_node_id = obtain_dp_node_id() local admin_client = helpers.admin_client() @@ -124,8 +109,7 @@ for _, strategy in helpers.each_strategy() do assert.equal("debug", json.original_level) end) - -- TODO: enable when cluster RPC is GA - pending("set current log level to original_level turns off feature", function() + it("set current log level to original_level turns off feature", function() local dp_node_id = obtain_dp_node_id() local admin_client = helpers.admin_client() @@ -165,8 +149,7 @@ for _, strategy in helpers.each_strategy() do assert.equal("debug", json.original_level) end) - -- TODO: enable when cluster RPC is GA - pending("DELETE turns off feature", function() + it("DELETE turns off feature", function() local dp_node_id = obtain_dp_node_id() local admin_client = helpers.admin_client() @@ -198,4 +181,5 @@ for _, strategy in helpers.each_strategy() do end) end) end) -end +end -- for _, strategy +end -- for inc_sync diff --git a/spec/02-integration/18-hybrid_rpc/03-inert_spec.lua b/spec/02-integration/18-hybrid_rpc/03-inert_spec.lua index 4a6d73cf659..d4ddac3a533 100644 --- a/spec/02-integration/18-hybrid_rpc/03-inert_spec.lua +++ b/spec/02-integration/18-hybrid_rpc/03-inert_spec.lua @@ -41,7 +41,7 @@ for _, strategy in helpers.each_strategy() do cluster_cert_key = "spec/fixtures/kong_clustering.key", database = strategy, cluster_listen = "127.0.0.1:9005", - cluster_rpc = "off", + cluster_rpc = "off", -- disable rpc nginx_conf = "spec/fixtures/custom_nginx.template", })) @@ -52,7 +52,7 @@ for _, strategy in helpers.each_strategy() do cluster_cert = "spec/fixtures/kong_clustering.crt", cluster_cert_key = "spec/fixtures/kong_clustering.key", cluster_control_plane = "127.0.0.1:9005", - cluster_rpc = "off", + cluster_rpc = "off", -- disable rpc proxy_listen = "0.0.0.0:9002", nginx_conf = "spec/fixtures/custom_nginx.template", })) @@ -96,6 +96,21 @@ for _, strategy in helpers.each_strategy() do local res = assert(admin_client:get("/clustering/data-planes/" .. dp_node_id .. "/log-level")) assert.res_status(404, res) end) + + it("can not get DP RPC capability status", function() + local admin_client = helpers.admin_client() + finally(function() + admin_client:close() + end) + + local res = assert(admin_client:get("/clustering/data-planes")) + local body = assert.res_status(200, res) + local json = cjson.decode(body) + + for _, v in pairs(json.data) do + assert.equal(#v.rpc_capabilities, 0) + end + end) end) end) end diff --git a/spec/02-integration/18-hybrid_rpc/04-concentrator_spec.lua b/spec/02-integration/18-hybrid_rpc/04-concentrator_spec.lua index db7edcc5edb..51c7a3b8a1a 100644 --- a/spec/02-integration/18-hybrid_rpc/04-concentrator_spec.lua +++ b/spec/02-integration/18-hybrid_rpc/04-concentrator_spec.lua @@ -27,8 +27,9 @@ local function obtain_dp_node_id() end +for _, inc_sync in ipairs { "on", "off" } do for _, strategy in helpers.each_strategy() do - describe("Hybrid Mode RPC over DB concentrator #" .. strategy, function() + describe("Hybrid Mode RPC over DB concentrator #" .. strategy .. " inc_sync=" .. inc_sync, function() lazy_setup(function() helpers.get_db_utils(strategy, { @@ -43,6 +44,7 @@ for _, strategy in helpers.each_strategy() do cluster_listen = "127.0.0.1:9005", admin_listen = "127.0.0.1:" .. helpers.get_available_port(), nginx_conf = "spec/fixtures/custom_nginx.template", + cluster_incremental_sync = inc_sync, -- incremental sync })) assert(helpers.start_kong({ @@ -53,6 +55,7 @@ for _, strategy in helpers.each_strategy() do database = strategy, cluster_listen = "127.0.0.1:" .. helpers.get_available_port(), nginx_conf = "spec/fixtures/custom_nginx.template", + cluster_incremental_sync = inc_sync, -- incremental sync })) assert(helpers.start_kong({ @@ -64,6 +67,7 @@ for _, strategy in helpers.each_strategy() do cluster_control_plane = "127.0.0.1:9005", proxy_listen = "0.0.0.0:9002", nginx_conf = "spec/fixtures/custom_nginx.template", + cluster_incremental_sync = inc_sync, -- incremental sync })) end) @@ -74,7 +78,7 @@ for _, strategy in helpers.each_strategy() do end) describe("Dynamic log level over RPC", function() - pending("can get the current log level", function() + it("can get the current log level", function() local dp_node_id = obtain_dp_node_id() -- this sleep is *not* needed for the below wait_until to succeed, @@ -103,4 +107,5 @@ for _, strategy in helpers.each_strategy() do end) end) end) -end +end -- for _, strategy +end -- for inc_sync diff --git a/spec/02-integration/20-wasm/06-clustering_spec.lua b/spec/02-integration/20-wasm/06-clustering_spec.lua index 31e76bb3f1d..9e36d4dce66 100644 --- a/spec/02-integration/20-wasm/06-clustering_spec.lua +++ b/spec/02-integration/20-wasm/06-clustering_spec.lua @@ -72,7 +72,9 @@ local function new_wasm_filter_directory() end -describe("#wasm - hybrid mode #postgres", function() +-- XXX TODO: enable inc_sync = "on" +for _, inc_sync in ipairs { "off" } do +describe("#wasm - hybrid mode #postgres" .. " inc_sync=" .. inc_sync, function() local cp_prefix = "cp" local cp_errlog = cp_prefix .. "/logs/error.log" local cp_filter_path @@ -113,6 +115,7 @@ describe("#wasm - hybrid mode #postgres", function() wasm_filters = "user", -- don't enable bundled filters for this test wasm_filters_path = cp_filter_path, nginx_main_worker_processes = 2, + cluster_incremental_sync = inc_sync, })) assert.logfile(cp_errlog).has.line([[successfully loaded "response_transformer" module]], true, 10) @@ -152,6 +155,7 @@ describe("#wasm - hybrid mode #postgres", function() wasm_filters_path = dp_filter_path, node_id = node_id, nginx_main_worker_processes = 2, + cluster_incremental_sync = inc_sync, })) assert.logfile(dp_errlog).has.line([[successfully loaded "response_transformer" module]], true, 10) @@ -307,6 +311,7 @@ describe("#wasm - hybrid mode #postgres", function() nginx_conf = "spec/fixtures/custom_nginx.template", wasm = "off", node_id = node_id, + cluster_incremental_sync = inc_sync, })) end) @@ -346,6 +351,7 @@ describe("#wasm - hybrid mode #postgres", function() wasm_filters = "user", -- don't enable bundled filters for this test wasm_filters_path = tmp_dir, node_id = node_id, + cluster_incremental_sync = inc_sync, })) end) @@ -364,3 +370,4 @@ describe("#wasm - hybrid mode #postgres", function() end) end) end) +end -- for inc_sync diff --git a/spec/02-integration/20-wasm/10-wasmtime_spec.lua b/spec/02-integration/20-wasm/10-wasmtime_spec.lua index 7a1ba07c185..05a3f91d6a5 100644 --- a/spec/02-integration/20-wasm/10-wasmtime_spec.lua +++ b/spec/02-integration/20-wasm/10-wasmtime_spec.lua @@ -1,6 +1,7 @@ local helpers = require "spec.helpers" local fmt = string.format +for _, inc_sync in ipairs { "off", "on" } do for _, role in ipairs({"traditional", "control_plane", "data_plane"}) do describe("#wasm wasmtime (role: " .. role .. ")", function() @@ -18,9 +19,11 @@ describe("#wasm wasmtime (role: " .. role .. ")", function() role = role, cluster_cert = "spec/fixtures/kong_clustering.crt", cluster_cert_key = "spec/fixtures/kong_clustering.key", + cluster_incremental_sync = inc_sync, })) conf = assert(helpers.get_running_conf(prefix)) + conf.cluster_incremental_sync = inc_sync == "on" end) lazy_teardown(function() @@ -90,9 +93,12 @@ describe("#wasm wasmtime (role: " .. role .. ")", function() status_listen = "127.0.0.1:" .. status_port, nginx_main_worker_processes = 2, + + cluster_incremental_sync = inc_sync, })) conf = assert(helpers.get_running_conf(prefix)) + conf.cluster_incremental_sync = inc_sync == "on" -- we need to briefly spin up a control plane, or else we will get -- error.log entries when our data plane tries to connect @@ -110,6 +116,7 @@ describe("#wasm wasmtime (role: " .. role .. ")", function() cluster_cert_key = "spec/fixtures/kong_clustering.key", status_listen = "off", nginx_main_worker_processes = 2, + cluster_incremental_sync = inc_sync, })) end end) @@ -167,3 +174,4 @@ describe("#wasm wasmtime (role: " .. role .. ")", function() end) -- wasmtime end -- each role +end -- for inc_sync diff --git a/spec/03-plugins/09-key-auth/04-hybrid_mode_spec.lua b/spec/03-plugins/09-key-auth/04-hybrid_mode_spec.lua index 7fb4bd9ed0b..67cd9abc6f3 100644 --- a/spec/03-plugins/09-key-auth/04-hybrid_mode_spec.lua +++ b/spec/03-plugins/09-key-auth/04-hybrid_mode_spec.lua @@ -1,7 +1,10 @@ local helpers = require "spec.helpers" + +-- XXX FIXME: inc_sync = on flaky +for _, inc_sync in ipairs { "off" } do for _, strategy in helpers.each_strategy({"postgres"}) do - describe("Plugin: key-auth (access) [#" .. strategy .. "] auto-expiring keys", function() + describe("Plugin: key-auth (access) [#" .. strategy .. " inc_sync=" .. inc_sync .. "] auto-expiring keys", function() -- Give a bit of time to reduce test flakyness on slow setups local ttl = 10 local inserted_at @@ -38,6 +41,7 @@ for _, strategy in helpers.each_strategy({"postgres"}) do cluster_listen = "127.0.0.1:9005", cluster_telemetry_listen = "127.0.0.1:9006", nginx_conf = "spec/fixtures/custom_nginx.template", + cluster_incremental_sync = inc_sync, })) assert(helpers.start_kong({ @@ -50,6 +54,7 @@ for _, strategy in helpers.each_strategy({"postgres"}) do cluster_control_plane = "127.0.0.1:9005", cluster_telemetry_endpoint = "127.0.0.1:9006", proxy_listen = "0.0.0.0:9002", + cluster_incremental_sync = inc_sync, })) end) @@ -120,4 +125,5 @@ for _, strategy in helpers.each_strategy({"postgres"}) do end) end) -end +end -- for _, strategy +end -- for inc_sync diff --git a/spec/03-plugins/11-correlation-id/02-schema_spec.lua b/spec/03-plugins/11-correlation-id/02-schema_spec.lua index 68a03b73a32..b02cc906505 100644 --- a/spec/03-plugins/11-correlation-id/02-schema_spec.lua +++ b/spec/03-plugins/11-correlation-id/02-schema_spec.lua @@ -86,7 +86,9 @@ describe("Plugin: correlation-id (schema) #a [#" .. strategy .."]", function() end) end) - describe("in hybrid mode", function() + --- XXX FIXME: enable inc_sync = on + for _, inc_sync in ipairs { "off" } do + describe("in hybrid mode" .. " inc_sync=" .. inc_sync, function() local route lazy_setup(function() route = bp.routes:insert({ @@ -107,7 +109,8 @@ describe("Plugin: correlation-id (schema) #a [#" .. strategy .."]", function() COMMIT; ]], { ROUTE_ID = route.id, - CACHE_KEY = "plugins:correlation-id:"..route.id.."::::"..ws.id, + --CACHE_KEY = "plugins:correlation-id:"..route.id.."::::"..ws.id, + CACHE_KEY = "plugins|"..ws.id.."|route|"..route.id.."|"..plugin_id, ID = plugin_id, }) local _, err = db.connector:query(sql) @@ -121,6 +124,7 @@ describe("Plugin: correlation-id (schema) #a [#" .. strategy .."]", function() prefix = "servroot", cluster_listen = "127.0.0.1:9005", nginx_conf = "spec/fixtures/custom_nginx.template", + cluster_incremental_sync = inc_sync, })) assert(helpers.start_kong({ @@ -132,6 +136,7 @@ describe("Plugin: correlation-id (schema) #a [#" .. strategy .."]", function() cluster_control_plane = "127.0.0.1:9005", proxy_listen = "0.0.0.0:9002", status_listen = "127.0.0.1:9100", + cluster_incremental_sync = inc_sync, })) end) @@ -181,4 +186,5 @@ describe("Plugin: correlation-id (schema) #a [#" .. strategy .."]", function() proxy_client:close() end) end) + end -- for inc_sync end) diff --git a/spec/05-migration/db/migrations/core/024_370_to_380_spec.lua b/spec/05-migration/db/migrations/core/024_370_to_380_spec.lua new file mode 100644 index 00000000000..0563c4c83f8 --- /dev/null +++ b/spec/05-migration/db/migrations/core/024_370_to_380_spec.lua @@ -0,0 +1,17 @@ +local uh = require "spec/upgrade_helpers" + +describe("database migration", function() + uh.old_after_up("has created the \"clustering_sync_version\" table", function() + assert.database_has_relation("clustering_sync_version") + assert.table_has_column("clustering_sync_version", "version", "integer") + end) + + uh.old_after_up("has created the \"clustering_sync_delta\" table", function() + assert.database_has_relation("clustering_sync_delta") + assert.table_has_column("clustering_sync_delta", "version", "integer") + assert.table_has_column("clustering_sync_delta", "type", "text") + assert.table_has_column("clustering_sync_delta", "id", "uuid") + assert.table_has_column("clustering_sync_delta", "ws_id", "uuid") + assert.table_has_column("clustering_sync_delta", "row", "json") + end) +end) diff --git a/spec/internal/db.lua b/spec/internal/db.lua index 9895181fdeb..8fe43da18d7 100644 --- a/spec/internal/db.lua +++ b/spec/internal/db.lua @@ -277,6 +277,8 @@ end -- } local function get_db_utils(strategy, tables, plugins, vaults, skip_migrations) strategy = strategy or conf.database + conf.database = strategy -- overwrite kong.configuration.database + if tables ~= nil and type(tables) ~= "table" then error("arg #2 must be a list of tables to truncate", 2) end @@ -314,12 +316,6 @@ local function get_db_utils(strategy, tables, plugins, vaults, skip_migrations) bootstrap_database(db) end - do - local database = conf.database - conf.database = strategy - conf.database = database - end - db:truncate("plugins") assert(db.plugins:load_plugin_schemas(conf.loaded_plugins)) assert(db.vaults:load_vault_schemas(conf.loaded_vaults))