diff --git a/kong-3.6.0-0.rockspec b/kong-3.6.0-0.rockspec index b722cafb7507..ba02564e5448 100644 --- a/kong-3.6.0-0.rockspec +++ b/kong-3.6.0-0.rockspec @@ -215,6 +215,7 @@ build = { ["kong.db.dao.workspaces"] = "kong/db/dao/workspaces.lua", ["kong.db.dao.services"] = "kong/db/dao/services.lua", ["kong.db.dao.ca_certificates"] = "kong/db/dao/ca_certificates.lua", + ["kong.db.dao.invalidate"] = "kong/db/dao/invalidate.lua", ["kong.db.declarative"] = "kong/db/declarative/init.lua", ["kong.db.declarative.marshaller"] = "kong/db/declarative/marshaller.lua", ["kong.db.declarative.export"] = "kong/db/declarative/export.lua", diff --git a/kong/clustering/config_helper.lua b/kong/clustering/config_helper.lua index 82e94b357023..2da0a008b726 100644 --- a/kong/clustering/config_helper.lua +++ b/kong/clustering/config_helper.lua @@ -246,9 +246,7 @@ function _M.update(declarative_config, msg) return nil, err end - if kong.configuration.log_level == "debug" then - ngx_log(ngx.DEBUG, _log_prefix, "loaded configuration with transaction ID " .. msg.current_transaction_id) - end + ngx_log(ngx.DEBUG, _log_prefix, "loaded configuration with transaction ID " .. msg.current_transaction_id) return true end diff --git a/kong/clustering/control_plane.lua b/kong/clustering/control_plane.lua index fb66db3fbc9f..ce01bfe17363 100644 --- a/kong/clustering/control_plane.lua +++ b/kong/clustering/control_plane.lua @@ -160,9 +160,7 @@ function _M:export_deflated_reconfigure_payload() self.current_config_hash = config_hash self.deflated_reconfigure_payload = payload - if kong.configuration.log_level == "debug" then - ngx_log(ngx_DEBUG, _log_prefix, "exported configuration with transaction id " .. current_transaction_id) - end + ngx_log(ngx_DEBUG, _log_prefix, "exported configuration with transaction id " .. current_transaction_id) return payload, nil, config_hash end diff --git a/kong/db/dao/init.lua b/kong/db/dao/init.lua index 31f6414f65e6..026e7f125b80 100644 --- a/kong/db/dao/init.lua +++ b/kong/db/dao/init.lua @@ -4,6 +4,7 @@ local utils = require "kong.tools.utils" local defaults = require "kong.db.strategies.connector".defaults local hooks = require "kong.hooks" local workspaces = require "kong.workspaces" +local invalidate = require "kong.db.dao.invalidate" local new_tab = require "table.new" local DAO_MAX_TTL = require("kong.constants").DATABASE.DAO_MAX_TTL @@ -669,7 +670,7 @@ _M._find_cascade_delete_entities = find_cascade_delete_entities local function propagate_cascade_delete_events(entries, options) for i = 1, #entries do - entries[i].dao:post_crud_event("delete", entries[i].entity, nil, options) + entries[i].dao:invalidate_cache_and_post_crud_events("delete", entries[i].entity, nil, options) end end @@ -825,7 +826,7 @@ local function generate_foreign_key_methods(schema) return nil, tostring(err_t), err_t end - self:post_crud_event("update", row, rbw_entity, options) + self:invalidate_cache_and_post_crud_events("update", row, rbw_entity, options) return row end @@ -878,9 +879,9 @@ local function generate_foreign_key_methods(schema) end if rbw_entity then - self:post_crud_event("update", row, rbw_entity, options) + self:invalidate_cache_and_post_crud_events("update", row, rbw_entity, options) else - self:post_crud_event("create", row, nil, options) + self:invalidate_cache_and_post_crud_events("create", row, nil, options) end return row @@ -932,7 +933,7 @@ local function generate_foreign_key_methods(schema) return nil, tostring(err_t), err_t end - self:post_crud_event("delete", entity, nil, options) + self:invalidate_cache_and_post_crud_events("delete", entity, nil, options) propagate_cascade_delete_events(cascade_entries, options) return true @@ -1158,7 +1159,7 @@ function DAO:insert(entity, options) return nil, tostring(err_t), err_t end - self:post_crud_event("create", row, nil, options) + self:invalidate_cache_and_post_crud_events("create", row, nil, options) return row end @@ -1211,7 +1212,7 @@ function DAO:update(pk_or_entity, entity, options) return nil, tostring(err_t), err_t end - self:post_crud_event("update", row, rbw_entity, options) + self:invalidate_cache_and_post_crud_events("update", row, rbw_entity, options) return row end @@ -1266,9 +1267,9 @@ function DAO:upsert(pk_or_entity, entity, options) end if rbw_entity then - self:post_crud_event("update", row, rbw_entity, options) + self:invalidate_cache_and_post_crud_events("update", row, rbw_entity, options) else - self:post_crud_event("create", row, nil, options) + self:invalidate_cache_and_post_crud_events("create", row, nil, options) end return row @@ -1335,7 +1336,7 @@ function DAO:delete(pk_or_entity, options) return nil, tostring(err_t), err_t end - self:post_crud_event("delete", entity, nil, options) + self:invalidate_cache_and_post_crud_events("delete", entity, nil, options) propagate_cascade_delete_events(cascade_entries, options) return true @@ -1461,28 +1462,56 @@ function DAO:row_to_entity(row, options) end -function DAO:post_crud_event(operation, entity, old_entity, options) +local function make_clean_entity(entity) + if entity then + return remove_nulls(utils.cycle_aware_deep_copy(entity, true)) + else + return nil + end +end + + +function DAO:invalidate_cache_and_post_crud_events(operation, entity, old_entity, options) + -- Ease our lives and remove the nulls from the entities early, as we need to send them in an event later + -- on anyway. + entity = make_clean_entity(entity) + old_entity = make_clean_entity(old_entity) + + invalidate(operation, options and options.workspace or nil, self.schema.name, entity, old_entity) + if options and options.no_broadcast_crud_event then + -- This option is set when we write an audit record or when importing a database. return end if self.events then - local entity_without_nulls - if entity then - entity_without_nulls = remove_nulls(utils.cycle_aware_deep_copy(entity, true)) + local event_data = { + operation = operation, + schema = self.schema, + entity = entity, + old_entity = old_entity, + } + + -- public worker events propagation + local schema = event_data.schema + local entity_channel = schema.table or schema.name + local entity_operation_channel = fmt("%s:%s", entity_channel, event_data.operation) + + -- crud:routes + local ok, err = self.events.post_local("crud", entity_channel, event_data) + if not ok then + log(ERR, "[events] could not broadcast crud event: ", err) + return end - local old_entity_without_nulls - if old_entity then - old_entity_without_nulls = remove_nulls(utils.cycle_aware_deep_copy(old_entity, true)) + -- crud:routes:create + ok, err = self.events.post_local("crud", entity_operation_channel, event_data) + if not ok then + log(ERR, "[events] could not broadcast crud event: ", err) + return end - local ok, err = self.events.post_local("dao:crud", operation, { - operation = operation, - schema = self.schema, - entity = entity_without_nulls, - old_entity = old_entity_without_nulls, - }) + local ok, err = self.events.post_local("dao:crud", operation, event_data) if not ok then log(ERR, "[db] failed to propagate CRUD operation: ", err) end diff --git a/kong/db/dao/invalidate.lua b/kong/db/dao/invalidate.lua new file mode 100644 index 000000000000..45ece33500b5 --- /dev/null +++ b/kong/db/dao/invalidate.lua @@ -0,0 +1,198 @@ +local kong_global = require "kong.global" +local workspaces = require "kong.workspaces" +local kong_pdk_vault = require "kong.pdk.vault" +local constants = require "kong.constants" + +local null = ngx.null +local log = ngx.log +local ERR = ngx.ERR +local DEBUG = ngx.DEBUG +local ENTITY_CACHE_STORE = constants.ENTITY_CACHE_STORE + + +local function certificate() + -- Need to require kong.runloop.certificate late in the game to retain testability + return require "kong.runloop.certificate" +end + + +local function invalidate_wasm_filters(schema_name, operation) + -- cache is invalidated on service/route deletion to ensure we don't + -- have orphaned filter chain data cached + local is_delete = operation == "delete" + and (schema_name == "services" + or schema_name == "routes") + + local updated = schema_name == "filter_chains" or is_delete + + if updated then + log(DEBUG, "[events] wasm filter chains updated, invalidating cache") + kong.core_cache:invalidate("filter_chains:version") + end + + return updated +end + + +local function invalidate_ca_certificates(operation, ca) + if operation ~= "update" then + return + end + + local invalidated = false + + log(DEBUG, "[events] CA certificate updated, invalidating ca certificate store caches") + + local ca_id = ca.id + + local done_keys = {} + for _, entity in ipairs(certificate().get_ca_certificate_reference_entities()) do + local elements, err = kong.db[entity]:select_by_ca_certificate(ca_id) + if err then + log(ERR, "[events] failed to select ", entity, " by ca certificate ", ca_id, ": ", err) + return + end + + if elements then + for _, e in ipairs(elements) do + local key = certificate().ca_ids_cache_key(e.ca_certificates) + + if not done_keys[key] then + done_keys[key] = true + kong.core_cache:invalidate(key) + invalidated = true + end + end + end + end + + local plugin_done_keys = {} + local plugins, err = kong.db.plugins:select_by_ca_certificate(ca_id, nil, + certificate().get_ca_certificate_reference_plugins()) + if err then + log(ERR, "[events] failed to select plugins by ca certificate ", ca_id, ": ", err) + return + end + + if plugins then + for _, e in ipairs(plugins) do + local key = certificate().ca_ids_cache_key(e.config.ca_certificates) + + if not plugin_done_keys[key] then + plugin_done_keys[key] = true + kong.cache:invalidate(key) + invalidated = true + end + end + end + + return invalidated +end + + +local function invalidate(operation, workspace, schema_name, entity, old_entity) + if not kong or not kong.core_cache or not kong.core_cache.invalidate then + return + end + + workspaces.set_workspace(workspace) + + local invalidated = false + local function invalidate_key(key) + local cache_obj = kong[ENTITY_CACHE_STORE[schema_name]] + cache_obj:invalidate(key) + invalidated = true + end + + -- invalidate this entity anywhere it is cached if it has a + -- caching key + + local cache_key = kong.db[schema_name]:cache_key(entity) + + if cache_key then + invalidate_key(cache_key) + end + + -- if we had an update, but the cache key was part of what was updated, + -- we need to invalidate the previous entity as well + + if old_entity then + local old_cache_key = kong.db[schema_name]:cache_key(old_entity) + if old_cache_key and cache_key ~= old_cache_key then + invalidate_key(old_cache_key) + end + end + + if schema_name == "routes" then + invalidate_key("router:version") + + elseif schema_name == "services" then + if operation == "update" then + + -- no need to rebuild the router if we just added a Service + -- since no Route is pointing to that Service yet. + -- ditto for deletion: if a Service if being deleted, it is + -- only allowed because no Route is pointing to it anymore. + invalidate_key("router:version") + end + + elseif schema_name == "snis" then + log(DEBUG, "[events] SNI updated, invalidating cached certificates") + + local sni = old_entity or entity + local sni_name = sni.name + local sni_wild_pref, sni_wild_suf = certificate().produce_wild_snis(sni_name) + invalidate_key("snis:" .. sni_name) + + if sni_wild_pref then + invalidate_key("snis:" .. sni_wild_pref) + end + + if sni_wild_suf then + invalidate_key("snis:" .. sni_wild_suf) + end + + elseif schema_name == "plugins" then + invalidate_key("plugins_iterator:version") + + elseif schema_name == "vaults" then + if kong_pdk_vault.invalidate_vault_entity(entity, old_entity) then + invalidated = true + end + + elseif schema_name == "consumers" then + -- As we support config.anonymous to be configured as Consumer.username, + -- so invalidate the extra cache in case of data inconsistency + local old_username + if old_entity then + old_username = old_entity.username + if old_username and old_username ~= null and old_username ~= "" then + invalidate_key(kong.db.consumers:cache_key(old_username)) + end + end + + if entity then + local username = entity.username + if username and username ~= null and username ~= "" and username ~= old_username then + invalidate_key(kong.db.consumers:cache_key(username)) + end + end + + elseif schema_name == "ca_certificates" then + if invalidate_ca_certificates(operation, entity) then + invalidated = true + end + end + + if invalidate_wasm_filters(schema_name, operation) then + invalidated = true + end + + if invalidated then + local transaction_id = kong_global.get_current_transaction_id() + ngx.ctx.transaction_id = transaction_id + ngx.shared.kong:set("test:current_transaction_id", transaction_id) + end +end + +return invalidate diff --git a/kong/db/dao/snis.lua b/kong/db/dao/snis.lua index e65e549dd9b9..2749bb41c8d0 100644 --- a/kong/db/dao/snis.lua +++ b/kong/db/dao/snis.lua @@ -13,7 +13,7 @@ local function invalidate_cache(self, old_entity, err, err_t) return nil, err, err_t end if old_entity then - self:post_crud_event("update", old_entity) + self:invalidate_cache_and_post_crud_events("update", old_entity) end end diff --git a/kong/init.lua b/kong/init.lua index 22bd31688e0b..0324bcc2c2d4 100644 --- a/kong/init.lua +++ b/kong/init.lua @@ -1831,10 +1831,6 @@ local function serve_content(module) ngx.header["Access-Control-Allow-Origin"] = ngx.req.get_headers()["Origin"] or "*" - if kong.configuration.log_level == "debug" then - ngx.header["Kong-Test-Transaction-Id"] = kong_global.get_current_transaction_id() - end - lapis.serve(module) ctx.KONG_ADMIN_CONTENT_ENDED_AT = get_updated_now_ms() @@ -1889,6 +1885,11 @@ function Kong.admin_header_filter() header[headers.SERVER] = nil end + if kong.configuration.log_level == "debug" and ngx.ctx.transaction_id then + kong.log.info("Reporting Kong-Test-Transaction-Id ", ngx.ctx.transaction_id) + ngx.header["Kong-Test-Transaction-Id"] = ngx.ctx.transaction_id + end + -- this is not used for now, but perhaps we need it later? --ctx.KONG_ADMIN_HEADER_FILTER_ENDED_AT = get_now_ms() --ctx.KONG_ADMIN_HEADER_FILTER_TIME = ctx.KONG_ADMIN_HEADER_FILTER_ENDED_AT - ctx.KONG_ADMIN_HEADER_FILTER_START diff --git a/kong/pdk/vault.lua b/kong/pdk/vault.lua index efc306d48915..72eced4008cd 100644 --- a/kong/pdk/vault.lua +++ b/kong/pdk/vault.lua @@ -1289,27 +1289,6 @@ local function new(self) -- @function handle_vault_crud_event -- @tparam table data event data local function handle_vault_crud_event(data) - local cache = self.core_cache - if cache then - local vaults = self.db.vaults - local old_entity = data.old_entity - local old_prefix - if old_entity then - old_prefix = old_entity.prefix - if old_prefix and old_prefix ~= ngx.null then - cache:invalidate(vaults:cache_key(old_prefix)) - end - end - - local entity = data.entity - if entity then - local prefix = entity.prefix - if prefix and prefix ~= ngx.null and prefix ~= old_prefix then - cache:invalidate(vaults:cache_key(prefix)) - end - end - end - LRU:flush_all() -- refresh all the secrets @@ -1603,8 +1582,40 @@ local function new(self) end +--- +-- Invalidate shared cache entries for the given vault entities +-- +-- @local +-- @function handle_vault_invalidation +-- @tparam table entity entity that was created/updated +-- @tparam table old_entity entity that was updated/deleted +local function invalidate_vault_entity(entity, old_entity) + local invalidated = false + local vaults = kong.db.vaults + local old_prefix + if old_entity then + old_prefix = old_entity.prefix + if old_prefix and old_prefix ~= ngx.null then + kong.core_cache:invalidate(vaults:cache_key(old_prefix)) + invalidated = true + end + end + + if entity then + local prefix = entity.prefix + if prefix and prefix ~= ngx.null and prefix ~= old_prefix then + kong.core_cache:invalidate(vaults:cache_key(prefix)) + invalidated = true + end + end + + return invalidated +end + + return { new = new, is_reference = is_reference, parse_reference = parse_reference, + invalidate_vault_entity = invalidate_vault_entity, } diff --git a/kong/plugins/acl/acls.lua b/kong/plugins/acl/acls.lua index 4551ccf91e47..c79208f8c5c6 100644 --- a/kong/plugins/acl/acls.lua +++ b/kong/plugins/acl/acls.lua @@ -26,13 +26,13 @@ end local _ACLs = {} -function _ACLs:post_crud_event(operation, entity, options) +function _ACLs:invalidate_cache_and_post_crud_events(operation, entity, options) local _, err, err_t = invalidate_cache(self, entity, options) if err then return nil, err, err_t end - return self.super.post_crud_event(self, operation, entity, options) + return self.super.invalidate_cache_and_post_crud_events(self, operation, entity, options) end diff --git a/kong/runloop/events.lua b/kong/runloop/events.lua index 1b0d177c0bcc..2a33ee805714 100644 --- a/kong/runloop/events.lua +++ b/kong/runloop/events.lua @@ -1,9 +1,6 @@ local utils = require "kong.tools.utils" -local constants = require "kong.constants" -local certificate = require "kong.runloop.certificate" local balancer = require "kong.runloop.balancer" local workspaces = require "kong.workspaces" -local wasm = require "kong.runloop.wasm" local kong = kong @@ -15,19 +12,14 @@ local utils_split = utils.split local ngx = ngx -local null = ngx.null local log = ngx.log local ERR = ngx.ERR local CRIT = ngx.CRIT local DEBUG = ngx.DEBUG -local ENTITY_CACHE_STORE = constants.ENTITY_CACHE_STORE - - -- init in register_events() local db -local kong_cache local core_cache local worker_events local cluster_events @@ -170,230 +162,6 @@ local function cluster_balancer_upstreams_handler(data) end -local function dao_crud_handler(data) - local schema = data.schema - if not schema then - log(ERR, "[events] missing schema in crud subscriber") - return - end - - local entity = data.entity - if not entity then - log(ERR, "[events] missing entity in crud subscriber") - return - end - - -- invalidate this entity anywhere it is cached if it has a - -- caching key - - local schema_name = schema.name - - local cache_key = db[schema_name]:cache_key(entity) - local cache_obj = kong[ENTITY_CACHE_STORE[schema_name]] - - if cache_key then - cache_obj:invalidate(cache_key) - end - - -- if we had an update, but the cache key was part of what was updated, - -- we need to invalidate the previous entity as well - - local old_entity = data.old_entity - if old_entity then - local old_cache_key = db[schema_name]:cache_key(old_entity) - if old_cache_key and cache_key ~= old_cache_key then - cache_obj:invalidate(old_cache_key) - end - end - - local operation = data.operation - if not operation then - log(ERR, "[events] missing operation in crud subscriber") - return - end - - -- public worker events propagation - - local entity_channel = schema.table or schema_name - local entity_operation_channel = fmt("%s:%s", entity_channel, operation) - - -- crud:routes - local ok, err = worker_events.post_local("crud", entity_channel, data) - if not ok then - log(ERR, "[events] could not broadcast crud event: ", err) - return - end - - -- crud:routes:create - ok, err = worker_events.post_local("crud", entity_operation_channel, data) - if not ok then - log(ERR, "[events] could not broadcast crud event: ", err) - return - end -end - - -local function crud_routes_handler() - log(DEBUG, "[events] Route updated, invalidating router") - core_cache:invalidate("router:version") -end - - -local function crud_services_handler(data) - if data.operation == "create" or data.operation == "delete" then - return - end - - -- no need to rebuild the router if we just added a Service - -- since no Route is pointing to that Service yet. - -- ditto for deletion: if a Service if being deleted, it is - -- only allowed because no Route is pointing to it anymore. - log(DEBUG, "[events] Service updated, invalidating router") - core_cache:invalidate("router:version") -end - - -local function crud_plugins_handler(data) - log(DEBUG, "[events] Plugin updated, invalidating plugins iterator") - core_cache:invalidate("plugins_iterator:version") -end - - -local function crud_snis_handler(data) - log(DEBUG, "[events] SNI updated, invalidating cached certificates") - - local sni = data.old_entity or data.entity - local sni_name = sni.name - local sni_wild_pref, sni_wild_suf = certificate.produce_wild_snis(sni_name) - core_cache:invalidate("snis:" .. sni_name) - - if sni_wild_pref then - core_cache:invalidate("snis:" .. sni_wild_pref) - end - - if sni_wild_suf then - core_cache:invalidate("snis:" .. sni_wild_suf) - end -end - - -local function crud_consumers_handler(data) - workspaces.set_workspace(data.workspace) - - local old_entity = data.old_entity - local old_username - if old_entity then - old_username = old_entity.username - if old_username and old_username ~= null and old_username ~= "" then - kong_cache:invalidate(db.consumers:cache_key(old_username)) - end - end - - local entity = data.entity - if entity then - local username = entity.username - if username and username ~= null and username ~= "" and username ~= old_username then - kong_cache:invalidate(db.consumers:cache_key(username)) - end - end -end - - -local function crud_wasm_handler(data, schema_name) - if not wasm.enabled() then - return - end - - -- cache is invalidated on service/route deletion to ensure we don't - -- have oprhaned filter chain data cached - local is_delete = data.operation == "delete" - and (schema_name == "services" - or schema_name == "routes") - - local updated = schema_name == "filter_chains" or is_delete - - if updated then - log(DEBUG, "[events] wasm filter chains updated, invalidating cache") - core_cache:invalidate("filter_chains:version") - end -end - - -local function crud_ca_certificates_handler(data) - if data.operation ~= "update" then - return - end - - log(DEBUG, "[events] CA certificate updated, invalidating ca certificate store caches") - - local ca_id = data.entity.id - - local done_keys = {} - for _, entity in ipairs(certificate.get_ca_certificate_reference_entities()) do - local elements, err = kong.db[entity]:select_by_ca_certificate(ca_id) - if err then - log(ERR, "[events] failed to select ", entity, " by ca certificate ", ca_id, ": ", err) - return - end - - if elements then - for _, e in ipairs(elements) do - local key = certificate.ca_ids_cache_key(e.ca_certificates) - - if not done_keys[key] then - done_keys[key] = true - kong.core_cache:invalidate(key) - end - end - end - end - - local plugin_done_keys = {} - local plugins, err = kong.db.plugins:select_by_ca_certificate(ca_id, nil, - certificate.get_ca_certificate_reference_plugins()) - if err then - log(ERR, "[events] failed to select plugins by ca certificate ", ca_id, ": ", err) - return - end - - if plugins then - for _, e in ipairs(plugins) do - local key = certificate.ca_ids_cache_key(e.config.ca_certificates) - - if not plugin_done_keys[key] then - plugin_done_keys[key] = true - kong.cache:invalidate(key) - end - end - end -end - - -local LOCAL_HANDLERS = { - { "dao:crud", nil , dao_crud_handler }, - - -- local events (same worker) - { "crud" , "routes" , crud_routes_handler }, - { "crud" , "services" , crud_services_handler }, - { "crud" , "plugins" , crud_plugins_handler }, - - -- SSL certs / SNIs invalidations - { "crud" , "snis" , crud_snis_handler }, - - -- Consumers invalidations - -- As we support conifg.anonymous to be configured as Consumer.username, - -- so add an event handler to invalidate the extra cache in case of data inconsistency - { "crud" , "consumers" , crud_consumers_handler }, - - { "crud" , "filter_chains" , crud_wasm_handler }, - { "crud" , "services" , crud_wasm_handler }, - { "crud" , "routes" , crud_wasm_handler }, - - -- ca certificate store caches invalidations - { "crud" , "ca_certificates" , crud_ca_certificates_handler }, -} - - local BALANCER_HANDLERS = { { "crud" , "targets" , crud_targets_handler }, { "crud" , "upstreams" , crud_upstreams_handler }, @@ -423,17 +191,6 @@ local function subscribe_cluster_events(source, handler) end -local function register_local_events() - for _, v in ipairs(LOCAL_HANDLERS) do - local source = v[1] - local event = v[2] - local handler = v[3] - - subscribe_worker_events(source, event, handler) - end -end - - local function register_balancer_events() for _, v in ipairs(BALANCER_HANDLERS) do local source = v[1] @@ -454,15 +211,11 @@ end local function register_for_db() -- initialize local local_events hooks - kong_cache = kong.cache core_cache = kong.core_cache worker_events = kong.worker_events cluster_events = kong.cluster_events -- events dispatcher - - register_local_events() - register_balancer_events() end diff --git a/kong/runloop/handler.lua b/kong/runloop/handler.lua index 8d8630d94fdb..e5af39cb575f 100644 --- a/kong/runloop/handler.lua +++ b/kong/runloop/handler.lua @@ -972,10 +972,13 @@ return { -- Before rebuiding the internal structures, retrieve the current PostgreSQL transaction ID to make it the -- current transaction ID after the rebuild has finished. - local rebuild_transaction_id, err = global.get_current_transaction_id() - if not rebuild_transaction_id then + local rebuild_transaction_id, err = kong_shm:get("test:current_transaction_id") + if err then log(ERR, err) end + if rebuild_transaction_id then + log(DEBUG, "beginning configuration processing for transaction ID " .. rebuild_transaction_id) + end local router_update_status, err = rebuild_router({ name = "router", @@ -1006,10 +1009,7 @@ return { end end - if rebuild_transaction_id then - -- Yield to process any pending invalidations - utils.yield() - + if rebuild_transaction_id and global.CURRENT_TRANSACTION_ID ~= rebuild_transaction_id then log(DEBUG, "configuration processing completed for transaction ID " .. rebuild_transaction_id) global.CURRENT_TRANSACTION_ID = rebuild_transaction_id end @@ -1116,7 +1116,8 @@ return { local if_kong_transaction_id = kong.request and kong.request.get_header('if-kong-test-transaction-id') if if_kong_transaction_id then if_kong_transaction_id = tonumber(if_kong_transaction_id) - if if_kong_transaction_id and if_kong_transaction_id >= global.CURRENT_TRANSACTION_ID then + kong.log.info((if_kong_transaction_id > global.CURRENT_TRANSACTION_ID and "REJECTED" or "SUCCESSFUL"), " conditional request for transaction id ", if_kong_transaction_id, " global is ", global.CURRENT_TRANSACTION_ID) + if if_kong_transaction_id and if_kong_transaction_id > global.CURRENT_TRANSACTION_ID then return kong.response.error( 503, "Service Unavailable", diff --git a/spec/01-unit/01-db/09-no_broadcast_crud_event_spec.lua b/spec/01-unit/01-db/09-no_broadcast_crud_event_spec.lua index 4dd87535f248..13008f200b9e 100644 --- a/spec/01-unit/01-db/09-no_broadcast_crud_event_spec.lua +++ b/spec/01-unit/01-db/09-no_broadcast_crud_event_spec.lua @@ -36,7 +36,7 @@ describe("option no_broadcast_crud_event", function() local dao = DAO.new(mock_db, entity, strategy, errors) dao.events = { - post_local = spy.new(function() end) + post_local = spy.new(function() return true end) } local row, err = dao:update({ a = 42 }, { b = "world" }, { no_broadcast_crud_event = true }) @@ -68,20 +68,20 @@ describe("option no_broadcast_crud_event", function() local dao = DAO.new(mock_db, entity, strategy, errors) dao.events = { - post_local = spy.new(function() end) + post_local = spy.new(function() return true end) } local row, err = dao:update({ a = 42 }, { b = "three" }, { no_broadcast_crud_event = false }) assert.falsy(err) assert.same({ a = 42, b = "three" }, row) - assert.spy(dao.events.post_local).was_called(1) + assert.spy(dao.events.post_local).was_called(3) local row, err = dao:update({ a = 42 }, { b = "four" }) assert.falsy(err) assert.same({ a = 42, b = "four" }, row) - assert.spy(dao.events.post_local).was_called(2) + assert.spy(dao.events.post_local).was_called(6) end) end) diff --git a/spec/02-integration/03-db/12-dao_hooks_spec.lua b/spec/02-integration/03-db/12-dao_hooks_spec.lua index df0745226214..09141bf52424 100644 --- a/spec/02-integration/03-db/12-dao_hooks_spec.lua +++ b/spec/02-integration/03-db/12-dao_hooks_spec.lua @@ -30,13 +30,13 @@ for _, strategy in helpers.each_strategy() do local post_hook = spy.new(function() end) lazy_setup(function() - hooks.register_hook("dao:page_for:pre", function() + hooks.register_hook("dao:page_for:pre", function(...) pre_hook() - return true + return (...) end) - hooks.register_hook("dao:page_for:post", function() + hooks.register_hook("dao:page_for:post", function(...) post_hook() - return true + return (...) end) end) @@ -56,13 +56,13 @@ for _, strategy in helpers.each_strategy() do local post_hook = spy.new(function() end) lazy_setup(function() - hooks.register_hook("dao:select_by:pre", function() + hooks.register_hook("dao:select_by:pre", function(...) pre_hook() - return true + return (...) end) - hooks.register_hook("dao:select_by:post", function() + hooks.register_hook("dao:select_by:post", function(...) post_hook() - return true + return (...) end) end) @@ -82,13 +82,13 @@ for _, strategy in helpers.each_strategy() do local post_hook = spy.new(function() end) lazy_setup(function() - hooks.register_hook("dao:update_by:pre", function() + hooks.register_hook("dao:update_by:pre", function(...) pre_hook() - return true + return (...) end) - hooks.register_hook("dao:update_by:post", function() + hooks.register_hook("dao:update_by:post", function(...) post_hook() - return true + return (...) end) end) @@ -110,13 +110,13 @@ for _, strategy in helpers.each_strategy() do local post_hook = spy.new(function() end) lazy_setup(function() - hooks.register_hook("dao:upsert_by:pre", function() + hooks.register_hook("dao:upsert_by:pre", function(...) pre_hook() - return true + return (...) end) - hooks.register_hook("dao:upsert_by:post", function() + hooks.register_hook("dao:upsert_by:post", function(...) post_hook() - return true + return (...) end) end) @@ -142,13 +142,13 @@ for _, strategy in helpers.each_strategy() do local post_hook = spy.new(function() end) lazy_setup(function() - hooks.register_hook("dao:delete_by:pre", function() + hooks.register_hook("dao:delete_by:pre", function(...) pre_hook() - return true + return (...) end) - hooks.register_hook("dao:delete_by:post", function() + hooks.register_hook("dao:delete_by:post", function(...) post_hook() - return true + return (...) end) end) @@ -168,13 +168,13 @@ for _, strategy in helpers.each_strategy() do local post_hook = spy.new(function() end) lazy_setup(function() - hooks.register_hook("dao:select:pre", function() + hooks.register_hook("dao:select:pre", function(...) pre_hook() - return true + return (...) end) - hooks.register_hook("dao:select:post", function() + hooks.register_hook("dao:select:post", function(...) post_hook() - return true + return (...) end) end) @@ -194,13 +194,13 @@ for _, strategy in helpers.each_strategy() do local post_hook = spy.new(function() end) lazy_setup(function() - hooks.register_hook("dao:page:pre", function() + hooks.register_hook("dao:page:pre", function(...) pre_hook() - return true + return (...) end) - hooks.register_hook("dao:page:post", function() + hooks.register_hook("dao:page:post", function(...) post_hook() - return true + return (...) end) end) @@ -220,13 +220,13 @@ for _, strategy in helpers.each_strategy() do local post_hook = spy.new(function() end) lazy_setup(function() - hooks.register_hook("dao:insert:pre", function() + hooks.register_hook("dao:insert:pre", function(...) pre_hook() - return true + return (...) end) - hooks.register_hook("dao:insert:post", function() + hooks.register_hook("dao:insert:post", function(...) post_hook() - return true + return (...) end) end) @@ -251,13 +251,13 @@ for _, strategy in helpers.each_strategy() do local post_hook = spy.new(function() end) lazy_setup(function() - hooks.register_hook("dao:update:pre", function() + hooks.register_hook("dao:update:pre", function(...) pre_hook() - return true + return (...) end) - hooks.register_hook("dao:update:post", function() + hooks.register_hook("dao:update:post", function(...) post_hook() - return true + return (...) end) end) diff --git a/spec/02-integration/04-admin_api/24-reconfiguration-completion_spec.lua b/spec/02-integration/04-admin_api/24-reconfiguration-completion_spec.lua index 8f89d9c1d721..e4737546e3ab 100644 --- a/spec/02-integration/04-admin_api/24-reconfiguration-completion_spec.lua +++ b/spec/02-integration/04-admin_api/24-reconfiguration-completion_spec.lua @@ -1,9 +1,31 @@ local helpers = require "spec.helpers" local cjson = require "cjson" -describe("Admin API - Reconfiguration Completion -", function() +local function get_log(typ, n) + local entries + helpers.wait_until(function() + local client = assert(helpers.http_client( + helpers.mock_upstream_host, + helpers.mock_upstream_port + )) + local res = client:get("/read_log/" .. typ, { + headers = { + Accept = "application/json" + } + }) + local raw = assert.res_status(200, res) + local body = cjson.decode(raw) + + entries = body.entries + return #entries > 0 + end, 10) + if n then + assert(#entries == n, "expected " .. n .. " log entries, but got " .. #entries) + end + return entries +end - local WORKER_STATE_UPDATE_FREQ = 1 +describe("Admin API - Reconfiguration Completion -", function() local admin_client local proxy_client @@ -25,7 +47,7 @@ describe("Admin API - Reconfiguration Completion -", function() res = admin_client:post("/services", { body = { name = "test-service", - url = "http://127.0.0.1", + url = helpers.mock_upstream_url, }, headers = { ["Content-Type"] = "application/json" }, }) @@ -35,19 +57,21 @@ describe("Admin API - Reconfiguration Completion -", function() -- We're running the route setup in `eventually` to cover for the unlikely case that reconfiguration completes -- between adding the route and requesting the path through the proxy path. - local next_path do + local next_path_suffix do local path_suffix = 0 - function next_path() + function next_path_suffix() path_suffix = path_suffix + 1 - return "/" .. tostring(path_suffix) + return tostring(path_suffix) end end + local path_suffix local service_path local kong_transaction_id assert.eventually(function() - service_path = next_path() + path_suffix = next_path_suffix() + service_path = "/" .. path_suffix res = admin_client:post("/services/" .. service.id .. "/routes", { body = { @@ -55,7 +79,26 @@ describe("Admin API - Reconfiguration Completion -", function() }, headers = { ["Content-Type"] = "application/json" }, }) + body = assert.res_status(201, res) + local route = cjson.decode(body) + + kong_transaction_id = res.headers['kong-test-transaction-id'] + assert.is_string(kong_transaction_id) + + res = admin_client:post("/routes/" .. route.id .. "/plugins", { + body = { + name = "http-log", + config = { + http_endpoint = "http://" .. helpers.mock_upstream_host + .. ":" + .. helpers.mock_upstream_port + .. "/post_log/reconf" .. path_suffix + } + }, + headers = { ["Content-Type"] = "application/json" }, + }) assert.res_status(201, res) + kong_transaction_id = res.headers['kong-test-transaction-id'] assert.is_string(kong_transaction_id) @@ -83,14 +126,19 @@ describe("Admin API - Reconfiguration Completion -", function() assert.equals("kong terminated the request", body) end) .has_no_error() + + get_log("reconf" .. path_suffix, 1) + end describe("#traditional mode -", function() lazy_setup(function() helpers.get_db_utils() assert(helpers.start_kong({ - worker_consistency = "eventual", - worker_state_update_frequency = WORKER_STATE_UPDATE_FREQ, + nginx_conf = "spec/fixtures/custom_nginx.template", + db_update_frequency = 0.05, + db_cache_neg_ttl = 0.01, + worker_state_update_frequency = 0.1, })) admin_client = helpers.admin_client() proxy_client = helpers.proxy_client() @@ -123,6 +171,10 @@ describe("Admin API - Reconfiguration Completion -", function() cluster_listen = "127.0.0.1:9005", cluster_telemetry_listen = "127.0.0.1:9006", nginx_conf = "spec/fixtures/custom_nginx.template", + db_update_frequency = 0.05, + db_cache_neg_ttl = 0.01, + worker_consistency = "eventual", + worker_state_update_frequency = 0.1, })) assert(helpers.start_kong({ @@ -135,6 +187,9 @@ describe("Admin API - Reconfiguration Completion -", function() cluster_control_plane = "127.0.0.1:9005", cluster_telemetry_endpoint = "127.0.0.1:9006", proxy_listen = "0.0.0.0:9002", + db_update_frequency = 0.05, + db_cache_neg_ttl = 0.01, + worker_state_update_frequency = 0.1, })) admin_client = helpers.admin_client() proxy_client = helpers.proxy_client("127.0.0.1", 9002)