Skip to content

Commit

Permalink
Revert "feat(testing): add reconfiguration completion detection mecha…
Browse files Browse the repository at this point in the history
…nism"

This reverts commit 3a7bc16.
  • Loading branch information
chronolaw authored Nov 7, 2023
1 parent f8bce46 commit 11ba086
Show file tree
Hide file tree
Showing 10 changed files with 79 additions and 244 deletions.
3 changes: 0 additions & 3 deletions changelog/unreleased/reconfiguration-completion-detection.yml

This file was deleted.

11 changes: 2 additions & 9 deletions kong/clustering/config_helper.lua
Original file line number Diff line number Diff line change
Expand Up @@ -202,12 +202,7 @@ local function fill_empty_hashes(hashes)
end
end

function _M.update(declarative_config, msg)

local config_table = msg.config_table
local config_hash = msg.config_hash
local hashes = msg.hashes

function _M.update(declarative_config, config_table, config_hash, hashes)
assert(type(config_table) == "table")

if not config_hash then
Expand Down Expand Up @@ -241,13 +236,11 @@ function _M.update(declarative_config, msg)
-- executed by worker 0

local res
res, err = declarative.load_into_cache_with_events(entities, meta, new_hash, hashes, msg.current_transaction_id)
res, err = declarative.load_into_cache_with_events(entities, meta, new_hash, hashes)
if not res then
return nil, err
end

ngx_log(ngx.NOTICE, "loaded configuration with transaction ID " .. msg.current_transaction_id)

return true
end

Expand Down
5 changes: 0 additions & 5 deletions kong/clustering/control_plane.lua
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ local compat = require("kong.clustering.compat")
local constants = require("kong.constants")
local events = require("kong.clustering.events")
local calculate_config_hash = require("kong.clustering.config_helper").calculate_config_hash
local global = require("kong.global")


local string = string
Expand Down Expand Up @@ -116,10 +115,8 @@ function _M:export_deflated_reconfigure_payload()

local config_hash, hashes = calculate_config_hash(config_table)

local current_transaction_id = global.get_current_transaction_id()
local payload = {
type = "reconfigure",
current_transaction_id = current_transaction_id,
timestamp = ngx_now(),
config_table = config_table,
config_hash = config_hash,
Expand All @@ -146,8 +143,6 @@ function _M:export_deflated_reconfigure_payload()
self.current_config_hash = config_hash
self.deflated_reconfigure_payload = payload

ngx_log(ngx_NOTICE, "exported configuration with transaction id " .. current_transaction_id)

return payload, nil, config_hash
end

Expand Down
5 changes: 4 additions & 1 deletion kong/clustering/data_plane.lua
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,10 @@ function _M:communicate(premature)
msg.timestamp and " with timestamp: " .. msg.timestamp or "",
log_suffix)

local pok, res, err = pcall(config_helper.update, self.declarative_config, msg)
local config_table = assert(msg.config_table)

local pok, res, err = pcall(config_helper.update, self.declarative_config,
config_table, msg.config_hash, msg.hashes)
if pok then
ping_immediately = true
end
Expand Down
7 changes: 1 addition & 6 deletions kong/db/declarative/import.lua
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,7 @@ do
local DECLARATIVE_LOCK_KEY = "declarative:lock"

-- make sure no matter which path it exits, we released the lock.
load_into_cache_with_events = function(entities, meta, hash, hashes, transaction_id)
load_into_cache_with_events = function(entities, meta, hash, hashes)
local kong_shm = ngx.shared.kong

local ok, err = kong_shm:add(DECLARATIVE_LOCK_KEY, 0, DECLARATIVE_LOCK_TTL)
Expand All @@ -522,11 +522,6 @@ do
end

ok, err = load_into_cache_with_events_no_lock(entities, meta, hash, hashes)

if ok and transaction_id then
ok, err = kong_shm:set("declarative:current-transaction-id", transaction_id)
end

kong_shm:delete(DECLARATIVE_LOCK_KEY)

return ok, err
Expand Down
8 changes: 2 additions & 6 deletions kong/db/strategies/postgres/connector.lua
Original file line number Diff line number Diff line change
Expand Up @@ -519,11 +519,10 @@ function _mt:query(sql, operation)
end

local phase = get_phase()
local in_admin_api = phase == "content" and ngx.ctx.KONG_PHASE == ADMIN_API_PHASE

if not operation or
not self.config_ro or
in_admin_api
not self.config_ro or
(phase == "content" and ngx.ctx.KONG_PHASE == ADMIN_API_PHASE)
then
-- admin API requests skips the replica optimization
-- to ensure all its results are always strongly consistent
Expand Down Expand Up @@ -553,9 +552,6 @@ function _mt:query(sql, operation)

res, err, partial, num_queries = conn:query(sql)

if in_admin_api and operation == "write" and res and res[1] and res[1]._pg_transaction_id then
kong.response.set_header('X-Kong-Transaction-ID', res[1]._pg_transaction_id)
end
-- if err is string then either it is a SQL error
-- or it is a socket error, here we abort connections
-- that encounter errors instead of reusing them, for
Expand Down
2 changes: 0 additions & 2 deletions kong/db/strategies/postgres/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -987,8 +987,6 @@ function _M.new(connector, schema, errors)
insert(upsert_expressions, ttl_escaped .. " = " .. "EXCLUDED." .. ttl_escaped)
end

insert(select_expressions, "pg_current_xact_id() as _pg_transaction_id")

local primary_key_escaped = {}
for i, key in ipairs(primary_key) do
local primary_key_field = primary_key_fields[key]
Expand Down
13 changes: 1 addition & 12 deletions kong/global.lua
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,7 @@ end


local _GLOBAL = {
phases = phase_checker.phases,
CURRENT_TRANSACTION_ID = 0,
phases = phase_checker.phases,
}


Expand Down Expand Up @@ -295,14 +294,4 @@ function _GLOBAL.init_timing()
end


function _GLOBAL.get_current_transaction_id()
local rows, err = kong.db.connector:query("select pg_current_xact_id() as _pg_transaction_id")
if not rows then
return nil, "could not query postgres for current transaction id: " .. err
else
return tonumber(rows[1]._pg_transaction_id)
end
end


return _GLOBAL
126 changes: 69 additions & 57 deletions kong/runloop/handler.lua
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ local concurrency = require "kong.concurrency"
local lrucache = require "resty.lrucache"
local ktls = require "resty.kong.tls"
local request_id = require "kong.tracing.request_id"
local global = require "kong.global"




local PluginsIterator = require "kong.runloop.plugins_iterator"
Expand Down Expand Up @@ -747,8 +748,6 @@ do
wasm.set_state(wasm_state)
end

global.CURRENT_TRANSACTION_ID = kong_shm:get("declarative:current-transaction-id") or 0

return true
end) -- concurrency.with_coroutine_mutex

Expand All @@ -766,6 +765,11 @@ do
end


local function register_events()
events.register_events(reconfigure_handler)
end


local balancer_prepare
do
local function sleep_once_for_balancer_init()
Expand Down Expand Up @@ -917,7 +921,7 @@ return {
return
end

events.register_events(reconfigure_handler)
register_events()

-- initialize balancers for active healthchecks
timer_at(0, function()
Expand Down Expand Up @@ -963,59 +967,84 @@ return {
if strategy ~= "off" then
local worker_state_update_frequency = kong.configuration.worker_state_update_frequency or 1

local function rebuild_timer(premature)
local router_async_opts = {
name = "router",
timeout = 0,
on_timeout = "return_true",
}

local function rebuild_router_timer(premature)
if premature then
return
end

-- Before rebuiding the internal structures, retrieve the current PostgreSQL transaction ID to make it the
-- current transaction ID after the rebuild has finished.
local rebuild_transaction_id, err = global.get_current_transaction_id()
if not rebuild_transaction_id then
log(ERR, err)
-- Don't wait for the semaphore (timeout = 0) when updating via the
-- timer.
-- If the semaphore is locked, that means that the rebuild is
-- already ongoing.
local ok, err = rebuild_router(router_async_opts)
if not ok then
log(ERR, "could not rebuild router via timer: ", err)
end
end

local router_update_status, err = rebuild_router({
name = "router",
timeout = 0,
on_timeout = "return_true",
})
if not router_update_status then
log(ERR, "could not rebuild router via timer: ", err)
local _, err = kong.timer:named_every("router-rebuild",
worker_state_update_frequency,
rebuild_router_timer)
if err then
log(ERR, "could not schedule timer to rebuild router: ", err)
end

local plugins_iterator_async_opts = {
name = "plugins_iterator",
timeout = 0,
on_timeout = "return_true",
}

local function rebuild_plugins_iterator_timer(premature)
if premature then
return
end

local plugins_iterator_update_status, err = rebuild_plugins_iterator({
name = "plugins_iterator",
timeout = 0,
on_timeout = "return_true",
})
if not plugins_iterator_update_status then
local _, err = rebuild_plugins_iterator(plugins_iterator_async_opts)
if err then
log(ERR, "could not rebuild plugins iterator via timer: ", err)
end
end

if wasm.enabled() then
local wasm_update_status, err = rebuild_wasm_state({
name = "wasm",
timeout = 0,
on_timeout = "return_true",
})
if not wasm_update_status then
local _, err = kong.timer:named_every("plugins-iterator-rebuild",
worker_state_update_frequency,
rebuild_plugins_iterator_timer)
if err then
log(ERR, "could not schedule timer to rebuild plugins iterator: ", err)
end


if wasm.enabled() then
local wasm_async_opts = {
name = "wasm",
timeout = 0,
on_timeout = "return_true",
}

local function rebuild_wasm_filter_chains_timer(premature)
if premature then
return
end

local _, err = rebuild_wasm_state(wasm_async_opts)
if err then
log(ERR, "could not rebuild wasm filter chains via timer: ", err)
end
end

if rebuild_transaction_id then
log(NOTICE, "configuration processing completed for transaction ID " .. rebuild_transaction_id)
global.CURRENT_TRANSACTION_ID = rebuild_transaction_id
local _, err = kong.timer:named_every("wasm-filter-chains-rebuild",
worker_state_update_frequency,
rebuild_wasm_filter_chains_timer)
if err then
log(ERR, "could not schedule timer to rebuild wasm filter chains: ", err)
end
end

local _, err = kong.timer:named_every("rebuild",
worker_state_update_frequency,
rebuild_timer)
if err then
log(ERR, "could not schedule timer to rebuild: ", err)
end
end
end,
},
Expand Down Expand Up @@ -1105,23 +1134,6 @@ return {
},
access = {
before = function(ctx)
-- If this is a version-conditional request, abort it if this dataplane has not processed at least the
-- specified configuration version yet.
local if_kong_transaction_id = kong.request and kong.request.get_header('x-if-kong-transaction-id')
if if_kong_transaction_id then
if_kong_transaction_id = tonumber(if_kong_transaction_id)
if if_kong_transaction_id and if_kong_transaction_id >= global.CURRENT_TRANSACTION_ID then
return kong.response.error(
503,
"Service Unavailable",
{
["X-Kong-Reconfiguration-Status"] = "pending",
["Retry-After"] = tostring(kong.configuration.worker_state_update_frequency or 1),
}
)
end
end

-- if there is a gRPC service in the context, don't re-execute the pre-access
-- phase handler - it has been executed before the internal redirect
if ctx.service and (ctx.service.protocol == "grpc" or
Expand Down
Loading

0 comments on commit 11ba086

Please sign in to comment.