Skip to content

Commit

Permalink
Make all cache invalidations synchronous
Browse files Browse the repository at this point in the history
  • Loading branch information
hanshuebner committed Nov 30, 2023
1 parent deb0b9d commit 2cee947
Show file tree
Hide file tree
Showing 3 changed files with 206 additions and 221 deletions.
184 changes: 174 additions & 10 deletions kong/db/dao/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@ local cjson = require "cjson"
local iteration = require "kong.db.iteration"
local utils = require "kong.tools.utils"
local kong_global = require "kong.global"
local constants = require "kong.constants"
local defaults = require "kong.db.strategies.connector".defaults
local hooks = require "kong.hooks"
local workspaces = require "kong.workspaces"
local kong_pdk_vault = require "kong.pdk.vault"
local certificate = require "kong.runloop.certificate"
local new_tab = require "table.new"
local DAO_MAX_TTL = require("kong.constants").DATABASE.DAO_MAX_TTL

Expand Down Expand Up @@ -1462,24 +1465,185 @@ function DAO:row_to_entity(row, options)
end


local function invalidate(schema_name)
local key
local function invalidate_wasm_filters(schema_name, operation)
-- cache is invalidated on service/route deletion to ensure we don't
-- have orphaned filter chain data cached
local is_delete = operation == "delete"
and (schema_name == "services"
or schema_name == "routes")

local updated = schema_name == "filter_chains" or is_delete

if updated then
log(DEBUG, "[events] wasm filter chains updated, invalidating cache")
core_cache:invalidate("filter_chains:version")
end

return updated
end


local function invalidate_ca_certificates(operation, ca)
if operation ~= "update" then
return
end

local invalidated = false

log(DEBUG, "[events] CA certificate updated, invalidating ca certificate store caches")

local ca_id = ca.id

local done_keys = {}
for _, entity in ipairs(certificate.get_ca_certificate_reference_entities()) do
local elements, err = kong.db[entity]:select_by_ca_certificate(ca_id)
if err then
log(ERR, "[events] failed to select ", entity, " by ca certificate ", ca_id, ": ", err)
return
end

if elements then
for _, e in ipairs(elements) do
local key = certificate.ca_ids_cache_key(e.ca_certificates)

if not done_keys[key] then
done_keys[key] = true
kong.core_cache:invalidate(key)
invalidated = true
end
end
end
end

local plugin_done_keys = {}
local plugins, err = kong.db.plugins:select_by_ca_certificate(ca_id, nil,
certificate.get_ca_certificate_reference_plugins())
if err then
log(ERR, "[events] failed to select plugins by ca certificate ", ca_id, ": ", err)
return
end

if plugins then
for _, e in ipairs(plugins) do
local key = certificate.ca_ids_cache_key(e.config.ca_certificates)

if not plugin_done_keys[key] then
plugin_done_keys[key] = true
kong.cache:invalidate(key)
invalidated = true
end
end
end

return invalidated
end



local function invalidate(operation, workspace, schema_name, entity, old_entity)
workspaces.set_workspace(workspace)

local invalidated = false
local function invalidate_key(key)
kong.core_cache:invalidate(key)
invalidated = true
end

-- invalidate this entity anywhere it is cached if it has a
-- caching key

local cache_key = kong.db[schema_name]:cache_key(entity)
local cache_obj = kong[constants.ENTITY_CACHE_STORE[schema_name]]

if cache_key then
invalidate_key(cache_key)
end

-- if we had an update, but the cache key was part of what was updated,
-- we need to invalidate the previous entity as well

if old_entity then
local old_cache_key = db[schema_name]:cache_key(old_entity)
if old_cache_key and cache_key ~= old_cache_key then
invalidate_key(old_cache_key)
end
end

if schema_name == "routes" then
key = "router:version"
invalidate_key("router:version")

elseif schema_name == "routes" then
if operation == "update" then

-- no need to rebuild the router if we just added a Service
-- since no Route is pointing to that Service yet.
-- ditto for deletion: if a Service if being deleted, it is
-- only allowed because no Route is pointing to it anymore.
invalidate_key("router:version")
end

elseif schema_name == "snis" then
log(DEBUG, "[events] SNI updated, invalidating cached certificates")

local sni = old_entity or entity
local sni_name = sni.name
local sni_wild_pref, sni_wild_suf = certificate.produce_wild_snis(sni_name)
invalidate_key("snis:" .. sni_name)

if sni_wild_pref then
invalidate_key("snis:" .. sni_wild_pref)
end

if sni_wild_suf then
invalidate_key("snis:" .. sni_wild_suf)
end

elseif schema_name == "plugins" then
key = "plugins_iterator:version"
invalidate_key("plugins_iterator:version")

elseif schema_name == "vaults" then
if kong_pdk_vault.invalidate_vault_entity(entity, old_entity) then
invalidated = true
end

elseif schema_name == "consumers" then
-- As we support config.anonymous to be configured as Consumer.username,
-- so invalidate the extra cache in case of data inconsistency
local old_username
if old_entity then
old_username = old_entity.username
if old_username and old_username ~= null and old_username ~= "" then
invalidate_key(db.consumers:cache_key(old_username))
end
end

if entity then
local username = entity.username
if username and username ~= null and username ~= "" and username ~= old_username then
invalidate_key(db.consumers:cache_key(username))
end
end

elseif schema_name == "ca_certificates" then
if invalidate_ca_certificates(operation, entity) then
invalidated = true
end
end
if key then
kong.core_cache:invalidate(key)

if invalidate_wasm_filters(schema_name, operation) then
invalidated = true
end

if invalidated then
local transaction_id = kong_global.get_current_transaction_id()
ngx.ctx.transaction_id = transaction_id
ngx.shared.kong:set("test:current_transaction_id", transaction_id)
end
local transaction_id = kong_global.get_current_transaction_id()
ngx.ctx.transaction_id = transaction_id
ngx.shared.kong:set("test:current_transaction_id", transaction_id)
end


function DAO:post_crud_event(operation, entity, old_entity, options)
invalidate(self.schema.name)
invalidate(operation, options.workspace, self.schema.name, entity, old_entity)

if options and options.no_broadcast_crud_event then
return
Expand Down
52 changes: 31 additions & 21 deletions kong/pdk/vault.lua
Original file line number Diff line number Diff line change
Expand Up @@ -1280,6 +1280,36 @@ local function new(self)
end


---
-- Invalidate shared cache entries for the given vault entities
--
-- @local
-- @function handle_vault_invalidation
-- @tparam table entity entity that was created/updated
-- @tparam table old_entity entity that was updated/deleted
function invalidate_vault_entity(entity, old_entity)
local invalidated = false
local vaults = self.db.vaults
local old_prefix
if old_entity then
old_prefix = old_entity.prefix
if old_prefix and old_prefix ~= ngx.null then
cache:invalidate(vaults:cache_key(old_prefix))
invalidated = true
end
end

if entity then
local prefix = entity.prefix
if prefix and prefix ~= ngx.null and prefix ~= old_prefix then
cache:invalidate(vaults:cache_key(prefix))
invalidated = true
end
end

return invalidated
end

---
-- Flushes LRU caches and forcibly rotates the secrets.
--
Expand All @@ -1289,27 +1319,6 @@ local function new(self)
-- @function handle_vault_crud_event
-- @tparam table data event data
local function handle_vault_crud_event(data)
local cache = self.core_cache
if cache then
local vaults = self.db.vaults
local old_entity = data.old_entity
local old_prefix
if old_entity then
old_prefix = old_entity.prefix
if old_prefix and old_prefix ~= ngx.null then
cache:invalidate(vaults:cache_key(old_prefix))
end
end

local entity = data.entity
if entity then
local prefix = entity.prefix
if prefix and prefix ~= ngx.null and prefix ~= old_prefix then
cache:invalidate(vaults:cache_key(prefix))
end
end
end

LRU:flush_all()

-- refresh all the secrets
Expand Down Expand Up @@ -1607,4 +1616,5 @@ return {
new = new,
is_reference = is_reference,
parse_reference = parse_reference,
invalidate_vault_entity = invalidate_vault_entity,
}
Loading

0 comments on commit 2cee947

Please sign in to comment.