Skip to content

Commit

Permalink
feat(testing): add reconfiguration completion detection mechanism
Browse files Browse the repository at this point in the history
This change adds a new response header Kong-Transaction-Id to the
Admin API.  It contains the (ever incrementing) PostgreSQL transaction
ID of the change that was made.  The value can then be put into the
If-Kong-Transaction-Id variable in a request to the proxy path.  The
request will be rejected with a 503 error if the proxy path has not
been reconfigured yet with this or a later transaction id.

The mechanism is useful in testing, when changes are made through the
Admin API and the effects on the proxy path are then to be verified.
Rather than waiting for a static period or retrying the proxy path
request until the expected result is received, the proxy path client
specifies the last transaction ID received from the Admin API in the
If-Kong-Transaction-Id header and retries the request if a 503 error
is received.

Both the generation of the Kong-Transaction-Id header and the check
for If-Kong-Transaction-Id are enabled only when Kong is running in
debug mode.
  • Loading branch information
hanshuebner committed Nov 10, 2023
1 parent 13d3d57 commit 416e6e6
Show file tree
Hide file tree
Showing 11 changed files with 269 additions and 77 deletions.
3 changes: 3 additions & 0 deletions changelog/unreleased/reconfiguration-completion-detection.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
message: Provide mechanism to detect completion of reconfiguration on the proxy path
type: feature
scope: Core
13 changes: 11 additions & 2 deletions kong/clustering/config_helper.lua
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,12 @@ local function fill_empty_hashes(hashes)
end
end

function _M.update(declarative_config, config_table, config_hash, hashes)
function _M.update(declarative_config, msg)

local config_table = msg.config_table
local config_hash = msg.config_hash
local hashes = msg.hashes

assert(type(config_table) == "table")

if not config_hash then
Expand Down Expand Up @@ -236,11 +241,15 @@ function _M.update(declarative_config, config_table, config_hash, hashes)
-- executed by worker 0

local res
res, err = declarative.load_into_cache_with_events(entities, meta, new_hash, hashes)
res, err = declarative.load_into_cache_with_events(entities, meta, new_hash, hashes, msg.current_transaction_id)
if not res then
return nil, err
end

if kong.configuration.log_level == "debug" then
ngx_log(ngx.DEBUG, _log_prefix, "loaded configuration with transaction ID " .. msg.current_transaction_id)
end

return true
end

Expand Down
11 changes: 11 additions & 0 deletions kong/clustering/control_plane.lua
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ local compat = require("kong.clustering.compat")
local constants = require("kong.constants")
local events = require("kong.clustering.events")
local calculate_config_hash = require("kong.clustering.config_helper").calculate_config_hash
local global = require("kong.global")


local string = string
Expand Down Expand Up @@ -123,6 +124,12 @@ function _M:export_deflated_reconfigure_payload()
hashes = hashes,
}

local current_transaction_id
if kong.configuration.log_level == "debug" then
current_transaction_id = global.get_current_transaction_id()
payload.current_transaction_id = current_transaction_id
end

self.reconfigure_payload = payload

payload, err = cjson_encode(payload)
Expand All @@ -143,6 +150,10 @@ function _M:export_deflated_reconfigure_payload()
self.current_config_hash = config_hash
self.deflated_reconfigure_payload = payload

if kong.configuration.log_level == "debug" then
ngx_log(ngx_DEBUG, _log_prefix, "exported configuration with transaction id " .. current_transaction_id)
end

return payload, nil, config_hash
end

Expand Down
5 changes: 1 addition & 4 deletions kong/clustering/data_plane.lua
Original file line number Diff line number Diff line change
Expand Up @@ -213,10 +213,7 @@ function _M:communicate(premature)
msg.timestamp and " with timestamp: " .. msg.timestamp or "",
log_suffix)

local config_table = assert(msg.config_table)

local pok, res, err = pcall(config_helper.update, self.declarative_config,
config_table, msg.config_hash, msg.hashes)
local pok, res, err = pcall(config_helper.update, self.declarative_config, msg)
if pok then
ping_immediately = true
end
Expand Down
7 changes: 6 additions & 1 deletion kong/db/declarative/import.lua
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,7 @@ do
local DECLARATIVE_LOCK_KEY = "declarative:lock"

-- make sure no matter which path it exits, we released the lock.
load_into_cache_with_events = function(entities, meta, hash, hashes)
load_into_cache_with_events = function(entities, meta, hash, hashes, transaction_id)
local kong_shm = ngx.shared.kong

local ok, err = kong_shm:add(DECLARATIVE_LOCK_KEY, 0, DECLARATIVE_LOCK_TTL)
Expand All @@ -522,6 +522,11 @@ do
end

ok, err = load_into_cache_with_events_no_lock(entities, meta, hash, hashes)

if ok and transaction_id then
ok, err = kong_shm:set("declarative:current_transaction_id", transaction_id)
end

kong_shm:delete(DECLARATIVE_LOCK_KEY)

return ok, err
Expand Down
13 changes: 12 additions & 1 deletion kong/global.lua
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ end


local _GLOBAL = {
phases = phase_checker.phases,
phases = phase_checker.phases,
CURRENT_TRANSACTION_ID = 0,
}


Expand Down Expand Up @@ -294,4 +295,14 @@ function _GLOBAL.init_timing()
end


function _GLOBAL.get_current_transaction_id()
local rows, err = kong.db.connector:query("select txid_current() as _pg_transaction_id")
if not rows then
return nil, "could not query postgres for current transaction id: " .. err
else
return tonumber(rows[1]._pg_transaction_id)
end
end


return _GLOBAL
4 changes: 4 additions & 0 deletions kong/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -1831,6 +1831,10 @@ local function serve_content(module)

ngx.header["Access-Control-Allow-Origin"] = ngx.req.get_headers()["Origin"] or "*"

if kong.configuration.log_level == "debug" then
ngx.header["Kong-Transaction-Id"] = kong_global.get_current_transaction_id()
end

lapis.serve(module)

ctx.KONG_ADMIN_CONTENT_ENDED_AT = get_updated_now_ms()
Expand Down
131 changes: 62 additions & 69 deletions kong/runloop/handler.lua
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@ local concurrency = require "kong.concurrency"
local lrucache = require "resty.lrucache"
local ktls = require "resty.kong.tls"
local request_id = require "kong.tracing.request_id"


local global = require "kong.global"


local PluginsIterator = require "kong.runloop.plugins_iterator"
Expand Down Expand Up @@ -748,6 +747,8 @@ do
wasm.set_state(wasm_state)
end

global.CURRENT_TRANSACTION_ID = kong_shm:get("declarative:current_transaction_id") or 0

return true
end) -- concurrency.with_coroutine_mutex

Expand All @@ -765,11 +766,6 @@ do
end


local function register_events()
events.register_events(reconfigure_handler)
end


local balancer_prepare
do
local function sleep_once_for_balancer_init()
Expand Down Expand Up @@ -921,7 +917,7 @@ return {
return
end

register_events()
events.register_events(reconfigure_handler)

-- initialize balancers for active healthchecks
timer_at(0, function()
Expand Down Expand Up @@ -967,84 +963,62 @@ return {
if strategy ~= "off" then
local worker_state_update_frequency = kong.configuration.worker_state_update_frequency or 1

local router_async_opts = {
name = "router",
timeout = 0,
on_timeout = "return_true",
}

local function rebuild_router_timer(premature)
local function rebuild_timer(premature)
if premature then
return
end

-- Don't wait for the semaphore (timeout = 0) when updating via the
-- timer.
-- If the semaphore is locked, that means that the rebuild is
-- already ongoing.
local ok, err = rebuild_router(router_async_opts)
if not ok then
log(ERR, "could not rebuild router via timer: ", err)
end
end

local _, err = kong.timer:named_every("router-rebuild",
worker_state_update_frequency,
rebuild_router_timer)
if err then
log(ERR, "could not schedule timer to rebuild router: ", err)
end

local plugins_iterator_async_opts = {
name = "plugins_iterator",
timeout = 0,
on_timeout = "return_true",
}

local function rebuild_plugins_iterator_timer(premature)
if premature then
return
-- Before rebuiding the internal structures, retrieve the current PostgreSQL transaction ID to make it the
-- current transaction ID after the rebuild has finished.
local rebuild_transaction_id, err = global.get_current_transaction_id()
if not rebuild_transaction_id then
log(ERR, err)
end

local _, err = rebuild_plugins_iterator(plugins_iterator_async_opts)
if err then
log(ERR, "could not rebuild plugins iterator via timer: ", err)
local router_update_status, err = rebuild_router({
name = "router",
timeout = 0,
on_timeout = "return_true",
})
if not router_update_status then
log(ERR, "could not rebuild router via timer: ", err)
end
end

local _, err = kong.timer:named_every("plugins-iterator-rebuild",
worker_state_update_frequency,
rebuild_plugins_iterator_timer)
if err then
log(ERR, "could not schedule timer to rebuild plugins iterator: ", err)
end


if wasm.enabled() then
local wasm_async_opts = {
name = "wasm",
local plugins_iterator_update_status, err = rebuild_plugins_iterator({
name = "plugins_iterator",
timeout = 0,
on_timeout = "return_true",
}

local function rebuild_wasm_filter_chains_timer(premature)
if premature then
return
end
})
if not plugins_iterator_update_status then
log(ERR, "could not rebuild plugins iterator via timer: ", err)
end

local _, err = rebuild_wasm_state(wasm_async_opts)
if err then
if wasm.enabled() then
local wasm_update_status, err = rebuild_wasm_state({
name = "wasm",
timeout = 0,
on_timeout = "return_true",
})
if not wasm_update_status then
log(ERR, "could not rebuild wasm filter chains via timer: ", err)
end
end

local _, err = kong.timer:named_every("wasm-filter-chains-rebuild",
worker_state_update_frequency,
rebuild_wasm_filter_chains_timer)
if err then
log(ERR, "could not schedule timer to rebuild wasm filter chains: ", err)
if rebuild_transaction_id then
-- Yield to process any pending invalidations
utils.yield()

log(DEBUG, "configuration processing completed for transaction ID " .. rebuild_transaction_id)
global.CURRENT_TRANSACTION_ID = rebuild_transaction_id
end
end

local _, err = kong.timer:named_every("rebuild",
worker_state_update_frequency,
rebuild_timer)
if err then
log(ERR, "could not schedule timer to rebuild: ", err)
end
end
end,
},
Expand Down Expand Up @@ -1134,6 +1108,25 @@ return {
},
access = {
before = function(ctx)
if kong.configuration.log_level == "debug" then
-- If this is a version-conditional request, abort it if this dataplane has not processed at least the
-- specified configuration version yet.
local if_kong_transaction_id = kong.request and kong.request.get_header('if-kong-transaction-id')
if if_kong_transaction_id then
if_kong_transaction_id = tonumber(if_kong_transaction_id)
if if_kong_transaction_id and if_kong_transaction_id >= global.CURRENT_TRANSACTION_ID then
return kong.response.error(
503,
"Service Unavailable",
{
["X-Kong-Reconfiguration-Status"] = "pending",
["Retry-After"] = tostring(kong.configuration.worker_state_update_frequency or 1),
}
)
end
end
end

-- if there is a gRPC service in the context, don't re-execute the pre-access
-- phase handler - it has been executed before the internal redirect
if ctx.service and (ctx.service.protocol == "grpc" or
Expand Down
1 change: 1 addition & 0 deletions spec/02-integration/03-db/15-connection_pool_spec.lua
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ for pool_size, backlog_size in ipairs({ 0, 3 }) do
nginx_worker_processes = 1,
pg_pool_size = pool_size,
pg_backlog = backlog_size,
log_level = "info",
}))
client = helpers.admin_client()
end)
Expand Down
2 changes: 2 additions & 0 deletions spec/02-integration/04-admin_api/02-kong_routes_spec.lua
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ describe("Admin API - Kong routes with strategy #" .. strategy, function()
res2.headers["Date"] = nil
res1.headers["X-Kong-Admin-Latency"] = nil
res2.headers["X-Kong-Admin-Latency"] = nil
res1.headers["Kong-Transaction-Id"] = nil
res2.headers["Kong-Transaction-Id"] = nil

assert.same(res1.headers, res2.headers)
end)
Expand Down
Loading

0 comments on commit 416e6e6

Please sign in to comment.