Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(tests): fix reconfiguration completion detection #12117

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions kong-3.6.0-0.rockspec
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ build = {
["kong.db.dao.workspaces"] = "kong/db/dao/workspaces.lua",
["kong.db.dao.services"] = "kong/db/dao/services.lua",
["kong.db.dao.ca_certificates"] = "kong/db/dao/ca_certificates.lua",
["kong.db.dao.invalidate"] = "kong/db/dao/invalidate.lua",
["kong.db.declarative"] = "kong/db/declarative/init.lua",
["kong.db.declarative.marshaller"] = "kong/db/declarative/marshaller.lua",
["kong.db.declarative.export"] = "kong/db/declarative/export.lua",
Expand Down
4 changes: 1 addition & 3 deletions kong/clustering/config_helper.lua
Original file line number Diff line number Diff line change
Expand Up @@ -246,9 +246,7 @@ function _M.update(declarative_config, msg)
return nil, err
end

if kong.configuration.log_level == "debug" then
ngx_log(ngx.DEBUG, _log_prefix, "loaded configuration with transaction ID " .. msg.current_transaction_id)
end
ngx_log(ngx.DEBUG, _log_prefix, "loaded configuration with transaction ID " .. msg.current_transaction_id)

return true
end
Expand Down
4 changes: 1 addition & 3 deletions kong/clustering/control_plane.lua
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,7 @@ function _M:export_deflated_reconfigure_payload()
self.current_config_hash = config_hash
self.deflated_reconfigure_payload = payload

if kong.configuration.log_level == "debug" then
ngx_log(ngx_DEBUG, _log_prefix, "exported configuration with transaction id " .. current_transaction_id)
end
ngx_log(ngx_DEBUG, _log_prefix, "exported configuration with transaction id " .. current_transaction_id)

return payload, nil, config_hash
end
Expand Down
75 changes: 52 additions & 23 deletions kong/db/dao/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ local utils = require "kong.tools.utils"
local defaults = require "kong.db.strategies.connector".defaults
local hooks = require "kong.hooks"
local workspaces = require "kong.workspaces"
local invalidate = require "kong.db.dao.invalidate"
local new_tab = require "table.new"
local DAO_MAX_TTL = require("kong.constants").DATABASE.DAO_MAX_TTL

Expand Down Expand Up @@ -669,7 +670,7 @@ _M._find_cascade_delete_entities = find_cascade_delete_entities

local function propagate_cascade_delete_events(entries, options)
for i = 1, #entries do
entries[i].dao:post_crud_event("delete", entries[i].entity, nil, options)
entries[i].dao:invalidate_cache_and_post_crud_events("delete", entries[i].entity, nil, options)
end
end

Expand Down Expand Up @@ -825,7 +826,7 @@ local function generate_foreign_key_methods(schema)
return nil, tostring(err_t), err_t
end

self:post_crud_event("update", row, rbw_entity, options)
self:invalidate_cache_and_post_crud_events("update", row, rbw_entity, options)

return row
end
Expand Down Expand Up @@ -878,9 +879,9 @@ local function generate_foreign_key_methods(schema)
end

if rbw_entity then
self:post_crud_event("update", row, rbw_entity, options)
self:invalidate_cache_and_post_crud_events("update", row, rbw_entity, options)
else
self:post_crud_event("create", row, nil, options)
self:invalidate_cache_and_post_crud_events("create", row, nil, options)
end

return row
Expand Down Expand Up @@ -932,7 +933,7 @@ local function generate_foreign_key_methods(schema)
return nil, tostring(err_t), err_t
end

self:post_crud_event("delete", entity, nil, options)
self:invalidate_cache_and_post_crud_events("delete", entity, nil, options)
propagate_cascade_delete_events(cascade_entries, options)

return true
Expand Down Expand Up @@ -1158,7 +1159,7 @@ function DAO:insert(entity, options)
return nil, tostring(err_t), err_t
end

self:post_crud_event("create", row, nil, options)
self:invalidate_cache_and_post_crud_events("create", row, nil, options)

return row
end
Expand Down Expand Up @@ -1211,7 +1212,7 @@ function DAO:update(pk_or_entity, entity, options)
return nil, tostring(err_t), err_t
end

self:post_crud_event("update", row, rbw_entity, options)
self:invalidate_cache_and_post_crud_events("update", row, rbw_entity, options)

return row
end
Expand Down Expand Up @@ -1266,9 +1267,9 @@ function DAO:upsert(pk_or_entity, entity, options)
end

if rbw_entity then
self:post_crud_event("update", row, rbw_entity, options)
self:invalidate_cache_and_post_crud_events("update", row, rbw_entity, options)
else
self:post_crud_event("create", row, nil, options)
self:invalidate_cache_and_post_crud_events("create", row, nil, options)
end

return row
Expand Down Expand Up @@ -1335,7 +1336,7 @@ function DAO:delete(pk_or_entity, options)
return nil, tostring(err_t), err_t
end

self:post_crud_event("delete", entity, nil, options)
self:invalidate_cache_and_post_crud_events("delete", entity, nil, options)
propagate_cascade_delete_events(cascade_entries, options)

return true
Expand Down Expand Up @@ -1461,28 +1462,56 @@ function DAO:row_to_entity(row, options)
end


function DAO:post_crud_event(operation, entity, old_entity, options)
local function make_clean_entity(entity)
if entity then
return remove_nulls(utils.cycle_aware_deep_copy(entity, true))
else
return nil
end
end


function DAO:invalidate_cache_and_post_crud_events(operation, entity, old_entity, options)
-- Ease our lives and remove the nulls from the entities early, as we need to send them in an event later
-- on anyway.
entity = make_clean_entity(entity)
old_entity = make_clean_entity(old_entity)

invalidate(operation, options and options.workspace or nil, self.schema.name, entity, old_entity)

if options and options.no_broadcast_crud_event then
-- This option is set when we write an audit record or when importing a database.
return
end

if self.events then
local entity_without_nulls
if entity then
entity_without_nulls = remove_nulls(utils.cycle_aware_deep_copy(entity, true))
local event_data = {
operation = operation,
schema = self.schema,
entity = entity,
old_entity = old_entity,
}

-- public worker events propagation
local schema = event_data.schema
local entity_channel = schema.table or schema.name
local entity_operation_channel = fmt("%s:%s", entity_channel, event_data.operation)

-- crud:routes
local ok, err = self.events.post_local("crud", entity_channel, event_data)
if not ok then
log(ERR, "[events] could not broadcast crud event: ", err)
return
end

local old_entity_without_nulls
if old_entity then
old_entity_without_nulls = remove_nulls(utils.cycle_aware_deep_copy(old_entity, true))
-- crud:routes:create
ok, err = self.events.post_local("crud", entity_operation_channel, event_data)
if not ok then
log(ERR, "[events] could not broadcast crud event: ", err)
return
end

local ok, err = self.events.post_local("dao:crud", operation, {
operation = operation,
schema = self.schema,
entity = entity_without_nulls,
old_entity = old_entity_without_nulls,
})
local ok, err = self.events.post_local("dao:crud", operation, event_data)
if not ok then
log(ERR, "[db] failed to propagate CRUD operation: ", err)
end
Expand Down
198 changes: 198 additions & 0 deletions kong/db/dao/invalidate.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
local kong_global = require "kong.global"
local workspaces = require "kong.workspaces"
local kong_pdk_vault = require "kong.pdk.vault"
local constants = require "kong.constants"

local null = ngx.null
local log = ngx.log
local ERR = ngx.ERR
local DEBUG = ngx.DEBUG
local ENTITY_CACHE_STORE = constants.ENTITY_CACHE_STORE


local function certificate()
-- Need to require kong.runloop.certificate late in the game to retain testability
return require "kong.runloop.certificate"
end


local function invalidate_wasm_filters(schema_name, operation)
-- cache is invalidated on service/route deletion to ensure we don't
-- have orphaned filter chain data cached
local is_delete = operation == "delete"
and (schema_name == "services"
or schema_name == "routes")

local updated = schema_name == "filter_chains" or is_delete

if updated then
log(DEBUG, "[events] wasm filter chains updated, invalidating cache")
kong.core_cache:invalidate("filter_chains:version")
end

return updated
end


local function invalidate_ca_certificates(operation, ca)
if operation ~= "update" then
return
end

local invalidated = false

log(DEBUG, "[events] CA certificate updated, invalidating ca certificate store caches")

local ca_id = ca.id

local done_keys = {}
for _, entity in ipairs(certificate().get_ca_certificate_reference_entities()) do
local elements, err = kong.db[entity]:select_by_ca_certificate(ca_id)
if err then
log(ERR, "[events] failed to select ", entity, " by ca certificate ", ca_id, ": ", err)
return
end

if elements then
for _, e in ipairs(elements) do
local key = certificate().ca_ids_cache_key(e.ca_certificates)

if not done_keys[key] then
done_keys[key] = true
kong.core_cache:invalidate(key)
invalidated = true
end
end
end
end

local plugin_done_keys = {}
local plugins, err = kong.db.plugins:select_by_ca_certificate(ca_id, nil,
certificate().get_ca_certificate_reference_plugins())
if err then
log(ERR, "[events] failed to select plugins by ca certificate ", ca_id, ": ", err)
return
end

if plugins then
for _, e in ipairs(plugins) do
local key = certificate().ca_ids_cache_key(e.config.ca_certificates)

if not plugin_done_keys[key] then
plugin_done_keys[key] = true
kong.cache:invalidate(key)
invalidated = true
end
end
end

return invalidated
end


local function invalidate(operation, workspace, schema_name, entity, old_entity)
if not kong or not kong.core_cache or not kong.core_cache.invalidate then
return
end

workspaces.set_workspace(workspace)

local invalidated = false
local function invalidate_key(key)
local cache_obj = kong[ENTITY_CACHE_STORE[schema_name]]
cache_obj:invalidate(key)
invalidated = true
end

-- invalidate this entity anywhere it is cached if it has a
-- caching key

local cache_key = kong.db[schema_name]:cache_key(entity)

if cache_key then
invalidate_key(cache_key)
end

-- if we had an update, but the cache key was part of what was updated,
-- we need to invalidate the previous entity as well

if old_entity then
local old_cache_key = kong.db[schema_name]:cache_key(old_entity)
if old_cache_key and cache_key ~= old_cache_key then
invalidate_key(old_cache_key)
end
end

if schema_name == "routes" then
invalidate_key("router:version")

elseif schema_name == "services" then
if operation == "update" then

-- no need to rebuild the router if we just added a Service
-- since no Route is pointing to that Service yet.
-- ditto for deletion: if a Service if being deleted, it is
-- only allowed because no Route is pointing to it anymore.
invalidate_key("router:version")
end

elseif schema_name == "snis" then
log(DEBUG, "[events] SNI updated, invalidating cached certificates")

local sni = old_entity or entity
local sni_name = sni.name
local sni_wild_pref, sni_wild_suf = certificate().produce_wild_snis(sni_name)
invalidate_key("snis:" .. sni_name)

if sni_wild_pref then
invalidate_key("snis:" .. sni_wild_pref)
end

if sni_wild_suf then
invalidate_key("snis:" .. sni_wild_suf)
end

elseif schema_name == "plugins" then
invalidate_key("plugins_iterator:version")

elseif schema_name == "vaults" then
if kong_pdk_vault.invalidate_vault_entity(entity, old_entity) then
invalidated = true
end

elseif schema_name == "consumers" then
-- As we support config.anonymous to be configured as Consumer.username,
-- so invalidate the extra cache in case of data inconsistency
local old_username
if old_entity then
old_username = old_entity.username
if old_username and old_username ~= null and old_username ~= "" then
invalidate_key(kong.db.consumers:cache_key(old_username))
end
end

if entity then
local username = entity.username
if username and username ~= null and username ~= "" and username ~= old_username then
invalidate_key(kong.db.consumers:cache_key(username))
end
end

elseif schema_name == "ca_certificates" then
if invalidate_ca_certificates(operation, entity) then
invalidated = true
end
end

if invalidate_wasm_filters(schema_name, operation) then
invalidated = true
end

if invalidated then
local transaction_id = kong_global.get_current_transaction_id()
ngx.ctx.transaction_id = transaction_id
ngx.shared.kong:set("test:current_transaction_id", transaction_id)
end
end

return invalidate
2 changes: 1 addition & 1 deletion kong/db/dao/snis.lua
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ local function invalidate_cache(self, old_entity, err, err_t)
return nil, err, err_t
end
if old_entity then
self:post_crud_event("update", old_entity)
self:invalidate_cache_and_post_crud_events("update", old_entity)
end
end

Expand Down
Loading
Loading