diff --git a/changelog/unreleased/reconfiguration-completion-detection.yml b/changelog/unreleased/reconfiguration-completion-detection.yml deleted file mode 100644 index 4389fd362a7..00000000000 --- a/changelog/unreleased/reconfiguration-completion-detection.yml +++ /dev/null @@ -1,3 +0,0 @@ -message: Provide mechanism to detect completion of reconfiguration on the proxy path -type: feature -scope: Core diff --git a/kong/clustering/config_helper.lua b/kong/clustering/config_helper.lua index 1c0083b82ec..790f3e72c15 100644 --- a/kong/clustering/config_helper.lua +++ b/kong/clustering/config_helper.lua @@ -202,12 +202,7 @@ local function fill_empty_hashes(hashes) end end -function _M.update(declarative_config, msg) - - local config_table = msg.config_table - local config_hash = msg.config_hash - local hashes = msg.hashes - +function _M.update(declarative_config, config_table, config_hash, hashes) assert(type(config_table) == "table") if not config_hash then @@ -241,13 +236,11 @@ function _M.update(declarative_config, msg) -- executed by worker 0 local res - res, err = declarative.load_into_cache_with_events(entities, meta, new_hash, hashes, msg.current_transaction_id) + res, err = declarative.load_into_cache_with_events(entities, meta, new_hash, hashes) if not res then return nil, err end - ngx_log(ngx.NOTICE, "loaded configuration with transaction ID " .. msg.current_transaction_id) - return true end diff --git a/kong/clustering/control_plane.lua b/kong/clustering/control_plane.lua index 6939d7a78a5..a2696f9a3eb 100644 --- a/kong/clustering/control_plane.lua +++ b/kong/clustering/control_plane.lua @@ -11,7 +11,6 @@ local compat = require("kong.clustering.compat") local constants = require("kong.constants") local events = require("kong.clustering.events") local calculate_config_hash = require("kong.clustering.config_helper").calculate_config_hash -local global = require("kong.global") local string = string @@ -116,10 +115,8 @@ function _M:export_deflated_reconfigure_payload() local config_hash, hashes = calculate_config_hash(config_table) - local current_transaction_id = global.get_current_transaction_id() local payload = { type = "reconfigure", - current_transaction_id = current_transaction_id, timestamp = ngx_now(), config_table = config_table, config_hash = config_hash, @@ -146,8 +143,6 @@ function _M:export_deflated_reconfigure_payload() self.current_config_hash = config_hash self.deflated_reconfigure_payload = payload - ngx_log(ngx_NOTICE, "exported configuration with transaction id " .. current_transaction_id) - return payload, nil, config_hash end diff --git a/kong/clustering/data_plane.lua b/kong/clustering/data_plane.lua index 4030b3174b0..d0f0e1e020a 100644 --- a/kong/clustering/data_plane.lua +++ b/kong/clustering/data_plane.lua @@ -213,7 +213,10 @@ function _M:communicate(premature) msg.timestamp and " with timestamp: " .. msg.timestamp or "", log_suffix) - local pok, res, err = pcall(config_helper.update, self.declarative_config, msg) + local config_table = assert(msg.config_table) + + local pok, res, err = pcall(config_helper.update, self.declarative_config, + config_table, msg.config_hash, msg.hashes) if pok then ping_immediately = true end diff --git a/kong/db/declarative/import.lua b/kong/db/declarative/import.lua index 3c30a31da26..4908e3d6a8e 100644 --- a/kong/db/declarative/import.lua +++ b/kong/db/declarative/import.lua @@ -507,7 +507,7 @@ do local DECLARATIVE_LOCK_KEY = "declarative:lock" -- make sure no matter which path it exits, we released the lock. - load_into_cache_with_events = function(entities, meta, hash, hashes, transaction_id) + load_into_cache_with_events = function(entities, meta, hash, hashes) local kong_shm = ngx.shared.kong local ok, err = kong_shm:add(DECLARATIVE_LOCK_KEY, 0, DECLARATIVE_LOCK_TTL) @@ -522,11 +522,6 @@ do end ok, err = load_into_cache_with_events_no_lock(entities, meta, hash, hashes) - - if ok and transaction_id then - ok, err = kong_shm:set("declarative:current-transaction-id", transaction_id) - end - kong_shm:delete(DECLARATIVE_LOCK_KEY) return ok, err diff --git a/kong/db/strategies/postgres/connector.lua b/kong/db/strategies/postgres/connector.lua index b5b9c257d8f..fd5e9259066 100644 --- a/kong/db/strategies/postgres/connector.lua +++ b/kong/db/strategies/postgres/connector.lua @@ -519,11 +519,10 @@ function _mt:query(sql, operation) end local phase = get_phase() - local in_admin_api = phase == "content" and ngx.ctx.KONG_PHASE == ADMIN_API_PHASE if not operation or - not self.config_ro or - in_admin_api + not self.config_ro or + (phase == "content" and ngx.ctx.KONG_PHASE == ADMIN_API_PHASE) then -- admin API requests skips the replica optimization -- to ensure all its results are always strongly consistent @@ -553,9 +552,6 @@ function _mt:query(sql, operation) res, err, partial, num_queries = conn:query(sql) - if in_admin_api and operation == "write" and res and res[1] and res[1]._pg_transaction_id then - kong.response.set_header('X-Kong-Transaction-ID', res[1]._pg_transaction_id) - end -- if err is string then either it is a SQL error -- or it is a socket error, here we abort connections -- that encounter errors instead of reusing them, for diff --git a/kong/db/strategies/postgres/init.lua b/kong/db/strategies/postgres/init.lua index 804f4fb0b34..74da93465aa 100644 --- a/kong/db/strategies/postgres/init.lua +++ b/kong/db/strategies/postgres/init.lua @@ -987,8 +987,6 @@ function _M.new(connector, schema, errors) insert(upsert_expressions, ttl_escaped .. " = " .. "EXCLUDED." .. ttl_escaped) end - insert(select_expressions, "pg_current_xact_id() as _pg_transaction_id") - local primary_key_escaped = {} for i, key in ipairs(primary_key) do local primary_key_field = primary_key_fields[key] diff --git a/kong/global.lua b/kong/global.lua index 2c2449b5c64..cdceaa7f58e 100644 --- a/kong/global.lua +++ b/kong/global.lua @@ -68,8 +68,7 @@ end local _GLOBAL = { - phases = phase_checker.phases, - CURRENT_TRANSACTION_ID = 0, + phases = phase_checker.phases, } @@ -295,14 +294,4 @@ function _GLOBAL.init_timing() end -function _GLOBAL.get_current_transaction_id() - local rows, err = kong.db.connector:query("select pg_current_xact_id() as _pg_transaction_id") - if not rows then - return nil, "could not query postgres for current transaction id: " .. err - else - return tonumber(rows[1]._pg_transaction_id) - end -end - - return _GLOBAL diff --git a/kong/runloop/handler.lua b/kong/runloop/handler.lua index e2759287ed4..250d712f55b 100644 --- a/kong/runloop/handler.lua +++ b/kong/runloop/handler.lua @@ -13,7 +13,8 @@ local concurrency = require "kong.concurrency" local lrucache = require "resty.lrucache" local ktls = require "resty.kong.tls" local request_id = require "kong.tracing.request_id" -local global = require "kong.global" + + local PluginsIterator = require "kong.runloop.plugins_iterator" @@ -747,8 +748,6 @@ do wasm.set_state(wasm_state) end - global.CURRENT_TRANSACTION_ID = kong_shm:get("declarative:current-transaction-id") or 0 - return true end) -- concurrency.with_coroutine_mutex @@ -766,6 +765,11 @@ do end +local function register_events() + events.register_events(reconfigure_handler) +end + + local balancer_prepare do local function sleep_once_for_balancer_init() @@ -917,7 +921,7 @@ return { return end - events.register_events(reconfigure_handler) + register_events() -- initialize balancers for active healthchecks timer_at(0, function() @@ -963,62 +967,84 @@ return { if strategy ~= "off" then local worker_state_update_frequency = kong.configuration.worker_state_update_frequency or 1 - local function rebuild_timer(premature) + local router_async_opts = { + name = "router", + timeout = 0, + on_timeout = "return_true", + } + + local function rebuild_router_timer(premature) if premature then return end - -- Before rebuiding the internal structures, retrieve the current PostgreSQL transaction ID to make it the - -- current transaction ID after the rebuild has finished. - local rebuild_transaction_id, err = global.get_current_transaction_id() - if not rebuild_transaction_id then - log(ERR, err) + -- Don't wait for the semaphore (timeout = 0) when updating via the + -- timer. + -- If the semaphore is locked, that means that the rebuild is + -- already ongoing. + local ok, err = rebuild_router(router_async_opts) + if not ok then + log(ERR, "could not rebuild router via timer: ", err) end + end - local router_update_status, err = rebuild_router({ - name = "router", - timeout = 0, - on_timeout = "return_true", - }) - if not router_update_status then - log(ERR, "could not rebuild router via timer: ", err) + local _, err = kong.timer:named_every("router-rebuild", + worker_state_update_frequency, + rebuild_router_timer) + if err then + log(ERR, "could not schedule timer to rebuild router: ", err) + end + + local plugins_iterator_async_opts = { + name = "plugins_iterator", + timeout = 0, + on_timeout = "return_true", + } + + local function rebuild_plugins_iterator_timer(premature) + if premature then + return end - local plugins_iterator_update_status, err = rebuild_plugins_iterator({ - name = "plugins_iterator", - timeout = 0, - on_timeout = "return_true", - }) - if not plugins_iterator_update_status then + local _, err = rebuild_plugins_iterator(plugins_iterator_async_opts) + if err then log(ERR, "could not rebuild plugins iterator via timer: ", err) end + end - if wasm.enabled() then - local wasm_update_status, err = rebuild_wasm_state({ - name = "wasm", - timeout = 0, - on_timeout = "return_true", - }) - if not wasm_update_status then + local _, err = kong.timer:named_every("plugins-iterator-rebuild", + worker_state_update_frequency, + rebuild_plugins_iterator_timer) + if err then + log(ERR, "could not schedule timer to rebuild plugins iterator: ", err) + end + + + if wasm.enabled() then + local wasm_async_opts = { + name = "wasm", + timeout = 0, + on_timeout = "return_true", + } + + local function rebuild_wasm_filter_chains_timer(premature) + if premature then + return + end + + local _, err = rebuild_wasm_state(wasm_async_opts) + if err then log(ERR, "could not rebuild wasm filter chains via timer: ", err) end end - if rebuild_transaction_id then - -- Yield to process any pending invalidations - utils.yield() - - log(NOTICE, "configuration processing completed for transaction ID " .. rebuild_transaction_id) - global.CURRENT_TRANSACTION_ID = rebuild_transaction_id + local _, err = kong.timer:named_every("wasm-filter-chains-rebuild", + worker_state_update_frequency, + rebuild_wasm_filter_chains_timer) + if err then + log(ERR, "could not schedule timer to rebuild wasm filter chains: ", err) end end - - local _, err = kong.timer:named_every("rebuild", - worker_state_update_frequency, - rebuild_timer) - if err then - log(ERR, "could not schedule timer to rebuild: ", err) - end end end, }, @@ -1108,23 +1134,6 @@ return { }, access = { before = function(ctx) - -- If this is a version-conditional request, abort it if this dataplane has not processed at least the - -- specified configuration version yet. - local if_kong_transaction_id = kong.request and kong.request.get_header('x-if-kong-transaction-id') - if if_kong_transaction_id then - if_kong_transaction_id = tonumber(if_kong_transaction_id) - if if_kong_transaction_id and if_kong_transaction_id >= global.CURRENT_TRANSACTION_ID then - return kong.response.error( - 503, - "Service Unavailable", - { - ["X-Kong-Reconfiguration-Status"] = "pending", - ["Retry-After"] = tostring(kong.configuration.worker_state_update_frequency or 1), - } - ) - end - end - -- if there is a gRPC service in the context, don't re-execute the pre-access -- phase handler - it has been executed before the internal redirect if ctx.service and (ctx.service.protocol == "grpc" or diff --git a/spec/02-integration/04-admin_api/24-reconfiguration-completion_spec.lua b/spec/02-integration/04-admin_api/24-reconfiguration-completion_spec.lua deleted file mode 100644 index 9f528c4bb46..00000000000 --- a/spec/02-integration/04-admin_api/24-reconfiguration-completion_spec.lua +++ /dev/null @@ -1,156 +0,0 @@ -local helpers = require "spec.helpers" -local cjson = require "cjson" - -describe("Admin API - Reconfiguration Completion -", function() - - local WORKER_STATE_UPDATE_FREQ = 1 - - local admin_client - local proxy_client - - local function run_tests() - - local res = admin_client:post("/plugins", { - body = { - name = "request-termination", - config = { - status_code = 200, - body = "kong terminated the request", - } - }, - headers = { ["Content-Type"] = "application/json" }, - }) - assert.res_status(201, res) - - res = admin_client:post("/services", { - body = { - name = "test-service", - url = "http://127.0.0.1", - }, - headers = { ["Content-Type"] = "application/json" }, - }) - local body = assert.res_status(201, res) - local service = cjson.decode(body) - - -- We're running the route setup in `eventually` to cover for the unlikely case that reconfiguration completes - -- between adding the route and requesting the path through the proxy path. - - local next_path do - local path_suffix = 0 - function next_path() - path_suffix = path_suffix + 1 - return "/" .. tostring(path_suffix) - end - end - - local service_path - local kong_transaction_id - - assert.eventually(function() - service_path = next_path() - - res = admin_client:post("/services/" .. service.id .. "/routes", { - body = { - paths = { service_path } - }, - headers = { ["Content-Type"] = "application/json" }, - }) - assert.res_status(201, res) - kong_transaction_id = res.headers['x-kong-transaction-id'] - assert.is_string(kong_transaction_id) - - res = proxy_client:get(service_path, - { - headers = { - ["X-If-Kong-Transaction-Id"] = kong_transaction_id - } - }) - assert.res_status(503, res) - assert.equals("pending", res.headers['x-kong-reconfiguration-status']) - local retry_after = tonumber(res.headers['retry-after']) - ngx.sleep(retry_after) - end) - .has_no_error() - - assert.eventually(function() - res = proxy_client:get(service_path, - { - headers = { - ["X-If-Kong-Transaction-Id"] = kong_transaction_id - } - }) - body = assert.res_status(200, res) - assert.equals("kong terminated the request", body) - end) - .has_no_error() - end - - describe("#traditional mode -", function() - lazy_setup(function() - helpers.get_db_utils() - assert(helpers.start_kong({ - worker_consistency = "eventual", - worker_state_update_frequency = WORKER_STATE_UPDATE_FREQ, - })) - admin_client = helpers.admin_client() - proxy_client = helpers.proxy_client() - end) - - teardown(function() - if admin_client then - admin_client:close() - end - if proxy_client then - proxy_client:close() - end - helpers.stop_kong() - end) - - it("rejects proxy requests if worker state has not been updated yet", run_tests) - end) - - describe("#hybrid mode -", function() - lazy_setup(function() - helpers.get_db_utils() - - assert(helpers.start_kong({ - role = "control_plane", - database = "postgres", - prefix = "cp", - cluster_cert = "spec/fixtures/kong_clustering.crt", - cluster_cert_key = "spec/fixtures/kong_clustering.key", - lua_ssl_trusted_certificate = "spec/fixtures/kong_clustering.crt", - cluster_listen = "127.0.0.1:9005", - cluster_telemetry_listen = "127.0.0.1:9006", - nginx_conf = "spec/fixtures/custom_nginx.template", - })) - - assert(helpers.start_kong({ - role = "data_plane", - database = "off", - prefix = "dp", - cluster_cert = "spec/fixtures/kong_clustering.crt", - cluster_cert_key = "spec/fixtures/kong_clustering.key", - lua_ssl_trusted_certificate = "spec/fixtures/kong_clustering.crt", - cluster_control_plane = "127.0.0.1:9005", - cluster_telemetry_endpoint = "127.0.0.1:9006", - proxy_listen = "0.0.0.0:9002", - })) - admin_client = helpers.admin_client() - proxy_client = helpers.proxy_client("127.0.0.1", 9002) - end) - - teardown(function() - if admin_client then - admin_client:close() - end - if proxy_client then - proxy_client:close() - end - helpers.stop_kong("dp") - helpers.stop_kong("cp") - end) - - it("rejects proxy requests if worker state has not been updated yet", run_tests) - end) -end)