Skip to content

Commit

Permalink
Merge branch 'master' into expose_cert_expiry_to_api
Browse files Browse the repository at this point in the history
Signed-off-by: tzssangglass <[email protected]>
  • Loading branch information
tzssangglass committed Nov 6, 2023
2 parents b184c79 + 073fcff commit c10b28e
Show file tree
Hide file tree
Showing 18 changed files with 595 additions and 342 deletions.
3 changes: 3 additions & 0 deletions changelog/unreleased/reconfiguration-completion-detection.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
message: Provide mechanism to detect completion of reconfiguration on the proxy path
type: feature
scope: Core
1 change: 1 addition & 0 deletions kong-3.6.0-0.rockspec
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ build = {
["kong.tools.sha256"] = "kong/tools/sha256.lua",
["kong.tools.yield"] = "kong/tools/yield.lua",
["kong.tools.uuid"] = "kong/tools/uuid.lua",
["kong.tools.rand"] = "kong/tools/rand.lua",

["kong.runloop.handler"] = "kong/runloop/handler.lua",
["kong.runloop.events"] = "kong/runloop/events.lua",
Expand Down
11 changes: 9 additions & 2 deletions kong/clustering/config_helper.lua
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,12 @@ local function fill_empty_hashes(hashes)
end
end

function _M.update(declarative_config, config_table, config_hash, hashes)
function _M.update(declarative_config, msg)

local config_table = msg.config_table
local config_hash = msg.config_hash
local hashes = msg.hashes

assert(type(config_table) == "table")

if not config_hash then
Expand Down Expand Up @@ -236,11 +241,13 @@ function _M.update(declarative_config, config_table, config_hash, hashes)
-- executed by worker 0

local res
res, err = declarative.load_into_cache_with_events(entities, meta, new_hash, hashes)
res, err = declarative.load_into_cache_with_events(entities, meta, new_hash, hashes, msg.current_transaction_id)
if not res then
return nil, err
end

ngx_log(ngx.NOTICE, "loaded configuration with transaction ID " .. msg.current_transaction_id)

return true
end

Expand Down
6 changes: 6 additions & 0 deletions kong/clustering/control_plane.lua
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ local compat = require("kong.clustering.compat")
local constants = require("kong.constants")
local events = require("kong.clustering.events")
local calculate_config_hash = require("kong.clustering.config_helper").calculate_config_hash
local global = require("kong.global")
local extract_dp_cert = require("kong.clustering.tls").extract_dp_cert


local string = string
local setmetatable = setmetatable
local type = type
Expand Down Expand Up @@ -115,8 +117,10 @@ function _M:export_deflated_reconfigure_payload()

local config_hash, hashes = calculate_config_hash(config_table)

local current_transaction_id = global.get_current_transaction_id()
local payload = {
type = "reconfigure",
current_transaction_id = current_transaction_id,
timestamp = ngx_now(),
config_table = config_table,
config_hash = config_hash,
Expand All @@ -143,6 +147,8 @@ function _M:export_deflated_reconfigure_payload()
self.current_config_hash = config_hash
self.deflated_reconfigure_payload = payload

ngx_log(ngx_NOTICE, "exported configuration with transaction id " .. current_transaction_id)

return payload, nil, config_hash
end

Expand Down
5 changes: 1 addition & 4 deletions kong/clustering/data_plane.lua
Original file line number Diff line number Diff line change
Expand Up @@ -213,10 +213,7 @@ function _M:communicate(premature)
msg.timestamp and " with timestamp: " .. msg.timestamp or "",
log_suffix)

local config_table = assert(msg.config_table)

local pok, res, err = pcall(config_helper.update, self.declarative_config,
config_table, msg.config_hash, msg.hashes)
local pok, res, err = pcall(config_helper.update, self.declarative_config, msg)
if pok then
ping_immediately = true
end
Expand Down
7 changes: 6 additions & 1 deletion kong/db/declarative/import.lua
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,7 @@ do
local DECLARATIVE_LOCK_KEY = "declarative:lock"

-- make sure no matter which path it exits, we released the lock.
load_into_cache_with_events = function(entities, meta, hash, hashes)
load_into_cache_with_events = function(entities, meta, hash, hashes, transaction_id)
local kong_shm = ngx.shared.kong

local ok, err = kong_shm:add(DECLARATIVE_LOCK_KEY, 0, DECLARATIVE_LOCK_TTL)
Expand All @@ -522,6 +522,11 @@ do
end

ok, err = load_into_cache_with_events_no_lock(entities, meta, hash, hashes)

if ok and transaction_id then
ok, err = kong_shm:set("declarative:current-transaction-id", transaction_id)
end

kong_shm:delete(DECLARATIVE_LOCK_KEY)

return ok, err
Expand Down
8 changes: 6 additions & 2 deletions kong/db/strategies/postgres/connector.lua
Original file line number Diff line number Diff line change
Expand Up @@ -519,10 +519,11 @@ function _mt:query(sql, operation)
end

local phase = get_phase()
local in_admin_api = phase == "content" and ngx.ctx.KONG_PHASE == ADMIN_API_PHASE

if not operation or
not self.config_ro or
(phase == "content" and ngx.ctx.KONG_PHASE == ADMIN_API_PHASE)
not self.config_ro or
in_admin_api
then
-- admin API requests skips the replica optimization
-- to ensure all its results are always strongly consistent
Expand Down Expand Up @@ -552,6 +553,9 @@ function _mt:query(sql, operation)

res, err, partial, num_queries = conn:query(sql)

if in_admin_api and operation == "write" and res and res[1] and res[1]._pg_transaction_id then
kong.response.set_header('X-Kong-Transaction-ID', res[1]._pg_transaction_id)
end
-- if err is string then either it is a SQL error
-- or it is a socket error, here we abort connections
-- that encounter errors instead of reusing them, for
Expand Down
2 changes: 2 additions & 0 deletions kong/db/strategies/postgres/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -987,6 +987,8 @@ function _M.new(connector, schema, errors)
insert(upsert_expressions, ttl_escaped .. " = " .. "EXCLUDED." .. ttl_escaped)
end

insert(select_expressions, "pg_current_xact_id() as _pg_transaction_id")

local primary_key_escaped = {}
for i, key in ipairs(primary_key) do
local primary_key_field = primary_key_fields[key]
Expand Down
13 changes: 12 additions & 1 deletion kong/global.lua
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ end


local _GLOBAL = {
phases = phase_checker.phases,
phases = phase_checker.phases,
CURRENT_TRANSACTION_ID = 0,
}


Expand Down Expand Up @@ -294,4 +295,14 @@ function _GLOBAL.init_timing()
end


function _GLOBAL.get_current_transaction_id()
local rows, err = kong.db.connector:query("select pg_current_xact_id() as _pg_transaction_id")
if not rows then
return nil, "could not query postgres for current transaction id: " .. err
else
return tonumber(rows[1]._pg_transaction_id)
end
end


return _GLOBAL
6 changes: 6 additions & 0 deletions kong/router/atc.lua
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,12 @@ end


local function escape_str(str)
-- raw string
if not str:find([["#]], 1, true) then
return "r#\"" .. str .. "\"#"
end

-- standard string escaping (unlikely case)
if str:find([[\]], 1, true) then
str = str:gsub([[\]], [[\\]])
end
Expand Down
4 changes: 2 additions & 2 deletions kong/router/compat.lua
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,9 @@ local function get_expression(route)
-- See #6425, if `net.protocol` is not `https`
-- then SNI matching should simply not be considered
if srcs or dsts then
gen = "(net.protocol != \"tls\"" .. LOGICAL_OR .. gen .. ")"
gen = "(net.protocol != r#\"tls\"#" .. LOGICAL_OR .. gen .. ")"
else
gen = "(net.protocol != \"https\"" .. LOGICAL_OR .. gen .. ")"
gen = "(net.protocol != r#\"https\"#" .. LOGICAL_OR .. gen .. ")"
end

expression_append(expr_buf, LOGICAL_AND, gen)
Expand Down
129 changes: 60 additions & 69 deletions kong/runloop/handler.lua
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@ local concurrency = require "kong.concurrency"
local lrucache = require "resty.lrucache"
local ktls = require "resty.kong.tls"
local request_id = require "kong.tracing.request_id"


local global = require "kong.global"


local PluginsIterator = require "kong.runloop.plugins_iterator"
Expand Down Expand Up @@ -748,6 +747,8 @@ do
wasm.set_state(wasm_state)
end

global.CURRENT_TRANSACTION_ID = kong_shm:get("declarative:current-transaction-id") or 0

return true
end) -- concurrency.with_coroutine_mutex

Expand All @@ -765,11 +766,6 @@ do
end


local function register_events()
events.register_events(reconfigure_handler)
end


local balancer_prepare
do
local function sleep_once_for_balancer_init()
Expand Down Expand Up @@ -921,7 +917,7 @@ return {
return
end

register_events()
events.register_events(reconfigure_handler)

-- initialize balancers for active healthchecks
timer_at(0, function()
Expand Down Expand Up @@ -967,84 +963,62 @@ return {
if strategy ~= "off" then
local worker_state_update_frequency = kong.configuration.worker_state_update_frequency or 1

local router_async_opts = {
name = "router",
timeout = 0,
on_timeout = "return_true",
}

local function rebuild_router_timer(premature)
local function rebuild_timer(premature)
if premature then
return
end

-- Don't wait for the semaphore (timeout = 0) when updating via the
-- timer.
-- If the semaphore is locked, that means that the rebuild is
-- already ongoing.
local ok, err = rebuild_router(router_async_opts)
if not ok then
log(ERR, "could not rebuild router via timer: ", err)
-- Before rebuiding the internal structures, retrieve the current PostgreSQL transaction ID to make it the
-- current transaction ID after the rebuild has finished.
local rebuild_transaction_id, err = global.get_current_transaction_id()
if not rebuild_transaction_id then
log(ERR, err)
end
end

local _, err = kong.timer:named_every("router-rebuild",
worker_state_update_frequency,
rebuild_router_timer)
if err then
log(ERR, "could not schedule timer to rebuild router: ", err)
end

local plugins_iterator_async_opts = {
name = "plugins_iterator",
timeout = 0,
on_timeout = "return_true",
}

local function rebuild_plugins_iterator_timer(premature)
if premature then
return
end

local _, err = rebuild_plugins_iterator(plugins_iterator_async_opts)
if err then
log(ERR, "could not rebuild plugins iterator via timer: ", err)
local router_update_status, err = rebuild_router({
name = "router",
timeout = 0,
on_timeout = "return_true",
})
if not router_update_status then
log(ERR, "could not rebuild router via timer: ", err)
end
end

local _, err = kong.timer:named_every("plugins-iterator-rebuild",
worker_state_update_frequency,
rebuild_plugins_iterator_timer)
if err then
log(ERR, "could not schedule timer to rebuild plugins iterator: ", err)
end


if wasm.enabled() then
local wasm_async_opts = {
name = "wasm",
local plugins_iterator_update_status, err = rebuild_plugins_iterator({
name = "plugins_iterator",
timeout = 0,
on_timeout = "return_true",
}

local function rebuild_wasm_filter_chains_timer(premature)
if premature then
return
end
})
if not plugins_iterator_update_status then
log(ERR, "could not rebuild plugins iterator via timer: ", err)
end

local _, err = rebuild_wasm_state(wasm_async_opts)
if err then
if wasm.enabled() then
local wasm_update_status, err = rebuild_wasm_state({
name = "wasm",
timeout = 0,
on_timeout = "return_true",
})
if not wasm_update_status then
log(ERR, "could not rebuild wasm filter chains via timer: ", err)
end
end

local _, err = kong.timer:named_every("wasm-filter-chains-rebuild",
worker_state_update_frequency,
rebuild_wasm_filter_chains_timer)
if err then
log(ERR, "could not schedule timer to rebuild wasm filter chains: ", err)
if rebuild_transaction_id then
-- Yield to process any pending invalidations
utils.yield()

log(NOTICE, "configuration processing completed for transaction ID " .. rebuild_transaction_id)
global.CURRENT_TRANSACTION_ID = rebuild_transaction_id
end
end

local _, err = kong.timer:named_every("rebuild",
worker_state_update_frequency,
rebuild_timer)
if err then
log(ERR, "could not schedule timer to rebuild: ", err)
end
end
end,
},
Expand Down Expand Up @@ -1134,6 +1108,23 @@ return {
},
access = {
before = function(ctx)
-- If this is a version-conditional request, abort it if this dataplane has not processed at least the
-- specified configuration version yet.
local if_kong_transaction_id = kong.request and kong.request.get_header('x-if-kong-transaction-id')
if if_kong_transaction_id then
if_kong_transaction_id = tonumber(if_kong_transaction_id)
if if_kong_transaction_id and if_kong_transaction_id >= global.CURRENT_TRANSACTION_ID then
return kong.response.error(
503,
"Service Unavailable",
{
["X-Kong-Reconfiguration-Status"] = "pending",
["Retry-After"] = tostring(kong.configuration.worker_state_update_frequency or 1),
}
)
end
end

-- if there is a gRPC service in the context, don't re-execute the pre-access
-- phase handler - it has been executed before the internal redirect
if ctx.service and (ctx.service.protocol == "grpc" or
Expand Down
Loading

0 comments on commit c10b28e

Please sign in to comment.