diff --git a/changelog/unreleased/reconfiguration-completion-detection.yml b/changelog/unreleased/reconfiguration-completion-detection.yml new file mode 100644 index 000000000000..4389fd362a78 --- /dev/null +++ b/changelog/unreleased/reconfiguration-completion-detection.yml @@ -0,0 +1,3 @@ +message: Provide mechanism to detect completion of reconfiguration on the proxy path +type: feature +scope: Core diff --git a/kong-3.6.0-0.rockspec b/kong-3.6.0-0.rockspec index 6301320fd0a4..e924e0067851 100644 --- a/kong-3.6.0-0.rockspec +++ b/kong-3.6.0-0.rockspec @@ -170,6 +170,7 @@ build = { ["kong.tools.sha256"] = "kong/tools/sha256.lua", ["kong.tools.yield"] = "kong/tools/yield.lua", ["kong.tools.uuid"] = "kong/tools/uuid.lua", + ["kong.tools.rand"] = "kong/tools/rand.lua", ["kong.runloop.handler"] = "kong/runloop/handler.lua", ["kong.runloop.events"] = "kong/runloop/events.lua", diff --git a/kong/clustering/config_helper.lua b/kong/clustering/config_helper.lua index 790f3e72c15d..1c0083b82ec9 100644 --- a/kong/clustering/config_helper.lua +++ b/kong/clustering/config_helper.lua @@ -202,7 +202,12 @@ local function fill_empty_hashes(hashes) end end -function _M.update(declarative_config, config_table, config_hash, hashes) +function _M.update(declarative_config, msg) + + local config_table = msg.config_table + local config_hash = msg.config_hash + local hashes = msg.hashes + assert(type(config_table) == "table") if not config_hash then @@ -236,11 +241,13 @@ function _M.update(declarative_config, config_table, config_hash, hashes) -- executed by worker 0 local res - res, err = declarative.load_into_cache_with_events(entities, meta, new_hash, hashes) + res, err = declarative.load_into_cache_with_events(entities, meta, new_hash, hashes, msg.current_transaction_id) if not res then return nil, err end + ngx_log(ngx.NOTICE, "loaded configuration with transaction ID " .. msg.current_transaction_id) + return true end diff --git a/kong/clustering/control_plane.lua b/kong/clustering/control_plane.lua index ca924eb3ed0f..9251371bde45 100644 --- a/kong/clustering/control_plane.lua +++ b/kong/clustering/control_plane.lua @@ -11,8 +11,10 @@ local compat = require("kong.clustering.compat") local constants = require("kong.constants") local events = require("kong.clustering.events") local calculate_config_hash = require("kong.clustering.config_helper").calculate_config_hash +local global = require("kong.global") local extract_dp_cert = require("kong.clustering.tls").extract_dp_cert + local string = string local setmetatable = setmetatable local type = type @@ -115,8 +117,10 @@ function _M:export_deflated_reconfigure_payload() local config_hash, hashes = calculate_config_hash(config_table) + local current_transaction_id = global.get_current_transaction_id() local payload = { type = "reconfigure", + current_transaction_id = current_transaction_id, timestamp = ngx_now(), config_table = config_table, config_hash = config_hash, @@ -143,6 +147,8 @@ function _M:export_deflated_reconfigure_payload() self.current_config_hash = config_hash self.deflated_reconfigure_payload = payload + ngx_log(ngx_NOTICE, "exported configuration with transaction id " .. current_transaction_id) + return payload, nil, config_hash end diff --git a/kong/clustering/data_plane.lua b/kong/clustering/data_plane.lua index d0f0e1e020a9..4030b3174b05 100644 --- a/kong/clustering/data_plane.lua +++ b/kong/clustering/data_plane.lua @@ -213,10 +213,7 @@ function _M:communicate(premature) msg.timestamp and " with timestamp: " .. msg.timestamp or "", log_suffix) - local config_table = assert(msg.config_table) - - local pok, res, err = pcall(config_helper.update, self.declarative_config, - config_table, msg.config_hash, msg.hashes) + local pok, res, err = pcall(config_helper.update, self.declarative_config, msg) if pok then ping_immediately = true end diff --git a/kong/db/declarative/import.lua b/kong/db/declarative/import.lua index 4908e3d6a8e3..3c30a31da262 100644 --- a/kong/db/declarative/import.lua +++ b/kong/db/declarative/import.lua @@ -507,7 +507,7 @@ do local DECLARATIVE_LOCK_KEY = "declarative:lock" -- make sure no matter which path it exits, we released the lock. - load_into_cache_with_events = function(entities, meta, hash, hashes) + load_into_cache_with_events = function(entities, meta, hash, hashes, transaction_id) local kong_shm = ngx.shared.kong local ok, err = kong_shm:add(DECLARATIVE_LOCK_KEY, 0, DECLARATIVE_LOCK_TTL) @@ -522,6 +522,11 @@ do end ok, err = load_into_cache_with_events_no_lock(entities, meta, hash, hashes) + + if ok and transaction_id then + ok, err = kong_shm:set("declarative:current-transaction-id", transaction_id) + end + kong_shm:delete(DECLARATIVE_LOCK_KEY) return ok, err diff --git a/kong/db/strategies/postgres/connector.lua b/kong/db/strategies/postgres/connector.lua index fd5e9259066a..b5b9c257d8fa 100644 --- a/kong/db/strategies/postgres/connector.lua +++ b/kong/db/strategies/postgres/connector.lua @@ -519,10 +519,11 @@ function _mt:query(sql, operation) end local phase = get_phase() + local in_admin_api = phase == "content" and ngx.ctx.KONG_PHASE == ADMIN_API_PHASE if not operation or - not self.config_ro or - (phase == "content" and ngx.ctx.KONG_PHASE == ADMIN_API_PHASE) + not self.config_ro or + in_admin_api then -- admin API requests skips the replica optimization -- to ensure all its results are always strongly consistent @@ -552,6 +553,9 @@ function _mt:query(sql, operation) res, err, partial, num_queries = conn:query(sql) + if in_admin_api and operation == "write" and res and res[1] and res[1]._pg_transaction_id then + kong.response.set_header('X-Kong-Transaction-ID', res[1]._pg_transaction_id) + end -- if err is string then either it is a SQL error -- or it is a socket error, here we abort connections -- that encounter errors instead of reusing them, for diff --git a/kong/db/strategies/postgres/init.lua b/kong/db/strategies/postgres/init.lua index 74da93465aa6..804f4fb0b34a 100644 --- a/kong/db/strategies/postgres/init.lua +++ b/kong/db/strategies/postgres/init.lua @@ -987,6 +987,8 @@ function _M.new(connector, schema, errors) insert(upsert_expressions, ttl_escaped .. " = " .. "EXCLUDED." .. ttl_escaped) end + insert(select_expressions, "pg_current_xact_id() as _pg_transaction_id") + local primary_key_escaped = {} for i, key in ipairs(primary_key) do local primary_key_field = primary_key_fields[key] diff --git a/kong/global.lua b/kong/global.lua index cdceaa7f58ef..2c2449b5c64f 100644 --- a/kong/global.lua +++ b/kong/global.lua @@ -68,7 +68,8 @@ end local _GLOBAL = { - phases = phase_checker.phases, + phases = phase_checker.phases, + CURRENT_TRANSACTION_ID = 0, } @@ -294,4 +295,14 @@ function _GLOBAL.init_timing() end +function _GLOBAL.get_current_transaction_id() + local rows, err = kong.db.connector:query("select pg_current_xact_id() as _pg_transaction_id") + if not rows then + return nil, "could not query postgres for current transaction id: " .. err + else + return tonumber(rows[1]._pg_transaction_id) + end +end + + return _GLOBAL diff --git a/kong/router/atc.lua b/kong/router/atc.lua index 7c59cba03b4d..533ae5251207 100644 --- a/kong/router/atc.lua +++ b/kong/router/atc.lua @@ -96,6 +96,12 @@ end local function escape_str(str) + -- raw string + if not str:find([["#]], 1, true) then + return "r#\"" .. str .. "\"#" + end + + -- standard string escaping (unlikely case) if str:find([[\]], 1, true) then str = str:gsub([[\]], [[\\]]) end diff --git a/kong/router/compat.lua b/kong/router/compat.lua index 6da3522f4698..531cd8b1fa80 100644 --- a/kong/router/compat.lua +++ b/kong/router/compat.lua @@ -165,9 +165,9 @@ local function get_expression(route) -- See #6425, if `net.protocol` is not `https` -- then SNI matching should simply not be considered if srcs or dsts then - gen = "(net.protocol != \"tls\"" .. LOGICAL_OR .. gen .. ")" + gen = "(net.protocol != r#\"tls\"#" .. LOGICAL_OR .. gen .. ")" else - gen = "(net.protocol != \"https\"" .. LOGICAL_OR .. gen .. ")" + gen = "(net.protocol != r#\"https\"#" .. LOGICAL_OR .. gen .. ")" end expression_append(expr_buf, LOGICAL_AND, gen) diff --git a/kong/runloop/handler.lua b/kong/runloop/handler.lua index 250d712f55b9..e2759287ed4c 100644 --- a/kong/runloop/handler.lua +++ b/kong/runloop/handler.lua @@ -13,8 +13,7 @@ local concurrency = require "kong.concurrency" local lrucache = require "resty.lrucache" local ktls = require "resty.kong.tls" local request_id = require "kong.tracing.request_id" - - +local global = require "kong.global" local PluginsIterator = require "kong.runloop.plugins_iterator" @@ -748,6 +747,8 @@ do wasm.set_state(wasm_state) end + global.CURRENT_TRANSACTION_ID = kong_shm:get("declarative:current-transaction-id") or 0 + return true end) -- concurrency.with_coroutine_mutex @@ -765,11 +766,6 @@ do end -local function register_events() - events.register_events(reconfigure_handler) -end - - local balancer_prepare do local function sleep_once_for_balancer_init() @@ -921,7 +917,7 @@ return { return end - register_events() + events.register_events(reconfigure_handler) -- initialize balancers for active healthchecks timer_at(0, function() @@ -967,84 +963,62 @@ return { if strategy ~= "off" then local worker_state_update_frequency = kong.configuration.worker_state_update_frequency or 1 - local router_async_opts = { - name = "router", - timeout = 0, - on_timeout = "return_true", - } - - local function rebuild_router_timer(premature) + local function rebuild_timer(premature) if premature then return end - -- Don't wait for the semaphore (timeout = 0) when updating via the - -- timer. - -- If the semaphore is locked, that means that the rebuild is - -- already ongoing. - local ok, err = rebuild_router(router_async_opts) - if not ok then - log(ERR, "could not rebuild router via timer: ", err) + -- Before rebuiding the internal structures, retrieve the current PostgreSQL transaction ID to make it the + -- current transaction ID after the rebuild has finished. + local rebuild_transaction_id, err = global.get_current_transaction_id() + if not rebuild_transaction_id then + log(ERR, err) end - end - local _, err = kong.timer:named_every("router-rebuild", - worker_state_update_frequency, - rebuild_router_timer) - if err then - log(ERR, "could not schedule timer to rebuild router: ", err) - end - - local plugins_iterator_async_opts = { - name = "plugins_iterator", - timeout = 0, - on_timeout = "return_true", - } - - local function rebuild_plugins_iterator_timer(premature) - if premature then - return - end - - local _, err = rebuild_plugins_iterator(plugins_iterator_async_opts) - if err then - log(ERR, "could not rebuild plugins iterator via timer: ", err) + local router_update_status, err = rebuild_router({ + name = "router", + timeout = 0, + on_timeout = "return_true", + }) + if not router_update_status then + log(ERR, "could not rebuild router via timer: ", err) end - end - - local _, err = kong.timer:named_every("plugins-iterator-rebuild", - worker_state_update_frequency, - rebuild_plugins_iterator_timer) - if err then - log(ERR, "could not schedule timer to rebuild plugins iterator: ", err) - end - - if wasm.enabled() then - local wasm_async_opts = { - name = "wasm", + local plugins_iterator_update_status, err = rebuild_plugins_iterator({ + name = "plugins_iterator", timeout = 0, on_timeout = "return_true", - } - - local function rebuild_wasm_filter_chains_timer(premature) - if premature then - return - end + }) + if not plugins_iterator_update_status then + log(ERR, "could not rebuild plugins iterator via timer: ", err) + end - local _, err = rebuild_wasm_state(wasm_async_opts) - if err then + if wasm.enabled() then + local wasm_update_status, err = rebuild_wasm_state({ + name = "wasm", + timeout = 0, + on_timeout = "return_true", + }) + if not wasm_update_status then log(ERR, "could not rebuild wasm filter chains via timer: ", err) end end - local _, err = kong.timer:named_every("wasm-filter-chains-rebuild", - worker_state_update_frequency, - rebuild_wasm_filter_chains_timer) - if err then - log(ERR, "could not schedule timer to rebuild wasm filter chains: ", err) + if rebuild_transaction_id then + -- Yield to process any pending invalidations + utils.yield() + + log(NOTICE, "configuration processing completed for transaction ID " .. rebuild_transaction_id) + global.CURRENT_TRANSACTION_ID = rebuild_transaction_id end end + + local _, err = kong.timer:named_every("rebuild", + worker_state_update_frequency, + rebuild_timer) + if err then + log(ERR, "could not schedule timer to rebuild: ", err) + end end end, }, @@ -1134,6 +1108,23 @@ return { }, access = { before = function(ctx) + -- If this is a version-conditional request, abort it if this dataplane has not processed at least the + -- specified configuration version yet. + local if_kong_transaction_id = kong.request and kong.request.get_header('x-if-kong-transaction-id') + if if_kong_transaction_id then + if_kong_transaction_id = tonumber(if_kong_transaction_id) + if if_kong_transaction_id and if_kong_transaction_id >= global.CURRENT_TRANSACTION_ID then + return kong.response.error( + 503, + "Service Unavailable", + { + ["X-Kong-Reconfiguration-Status"] = "pending", + ["Retry-After"] = tostring(kong.configuration.worker_state_update_frequency or 1), + } + ) + end + end + -- if there is a gRPC service in the context, don't re-execute the pre-access -- phase handler - it has been executed before the internal redirect if ctx.service and (ctx.service.protocol == "grpc" or diff --git a/kong/runloop/wasm.lua b/kong/runloop/wasm.lua index 664368ff4c3a..3ae3f7e8c029 100644 --- a/kong/runloop/wasm.lua +++ b/kong/runloop/wasm.lua @@ -55,9 +55,7 @@ local tostring = tostring local ipairs = ipairs local type = type local assert = assert -local concat = table.concat local insert = table.insert -local sha256 = utils.sha256_bin local cjson_encode = cjson.encode local cjson_decode = cjson.decode local fmt = string.format @@ -106,10 +104,14 @@ local STATUS = STATUS_DISABLED local hash_chain do + local buffer = require "string.buffer" + + local sha256 = utils.sha256_bin + local HASH_DISABLED = sha256("disabled") local HASH_NONE = sha256("none") - local buf = {} + local buf = buffer.new() ---@param chain kong.db.schema.entities.filter_chain ---@return string @@ -121,16 +123,18 @@ do return HASH_DISABLED end - local n = 0 - for _, filter in ipairs(chain.filters) do - buf[n + 1] = filter.name - buf[n + 2] = tostring(filter.enabled) - buf[n + 3] = tostring(filter.enabled and sha256(filter.config)) - n = n + 3 + local filters = chain.filters + for i = 1, #filters do + local filter = filters[i] + + buf:put(filter.name) + buf:put(tostring(filter.enabled)) + buf:put(tostring(filter.enabled and sha256(filter.config))) end - local s = concat(buf, "", 1, n) - clear_tab(buf) + local s = buf:get() + + buf:reset() return sha256(s) end diff --git a/kong/tools/rand.lua b/kong/tools/rand.lua new file mode 100644 index 000000000000..cfb4bfbf3409 --- /dev/null +++ b/kong/tools/rand.lua @@ -0,0 +1,133 @@ +local ffi = require "ffi" + + +local C = ffi.C +local ffi_new = ffi.new + + +ffi.cdef[[ +typedef unsigned char u_char; + +int RAND_bytes(u_char *buf, int num); + +unsigned long ERR_get_error(void); +void ERR_load_crypto_strings(void); +void ERR_free_strings(void); + +const char *ERR_reason_error_string(unsigned long e); + +int open(const char * filename, int flags, ...); +size_t read(int fd, void *buf, size_t count); +int write(int fd, const void *ptr, int numbytes); +int close(int fd); +char *strerror(int errnum); +]] + + +local _M = {} + + +local get_rand_bytes +do + local ngx_log = ngx.log + local WARN = ngx.WARN + + local system_constants = require "lua_system_constants" + local O_RDONLY = system_constants.O_RDONLY() + local ffi_fill = ffi.fill + local ffi_str = ffi.string + local bytes_buf_t = ffi.typeof "char[?]" + + local function urandom_bytes(buf, size) + local fd = C.open("/dev/urandom", O_RDONLY, 0) -- mode is ignored + if fd < 0 then + ngx_log(WARN, "Error opening random fd: ", + ffi_str(C.strerror(ffi.errno()))) + + return false + end + + local res = C.read(fd, buf, size) + if res <= 0 then + ngx_log(WARN, "Error reading from urandom: ", + ffi_str(C.strerror(ffi.errno()))) + + return false + end + + if C.close(fd) ~= 0 then + ngx_log(WARN, "Error closing urandom: ", + ffi_str(C.strerror(ffi.errno()))) + end + + return true + end + + -- try to get n_bytes of CSPRNG data, first via /dev/urandom, + -- and then falling back to OpenSSL if necessary + get_rand_bytes = function(n_bytes, urandom) + local buf = ffi_new(bytes_buf_t, n_bytes) + ffi_fill(buf, n_bytes, 0x0) + + -- only read from urandom if we were explicitly asked + if urandom then + local rc = urandom_bytes(buf, n_bytes) + + -- if the read of urandom was successful, we returned true + -- and buf is filled with our bytes, so return it as a string + if rc then + return ffi_str(buf, n_bytes) + end + end + + if C.RAND_bytes(buf, n_bytes) == 0 then + -- get error code + local err_code = C.ERR_get_error() + if err_code == 0 then + return nil, "could not get SSL error code from the queue" + end + + -- get human-readable error string + C.ERR_load_crypto_strings() + local err = C.ERR_reason_error_string(err_code) + C.ERR_free_strings() + + return nil, "could not get random bytes (" .. + "reason:" .. ffi_str(err) .. ") " + end + + return ffi_str(buf, n_bytes) + end +end +_M.get_rand_bytes = get_rand_bytes + + +--- Generates a random unique string +-- @return string The random string (a chunk of base64ish-encoded random bytes) +local random_string +do + local char = string.char + local rand = math.random + local encode_base64 = ngx.encode_base64 + + -- generate a random-looking string by retrieving a chunk of bytes and + -- replacing non-alphanumeric characters with random alphanumeric replacements + -- (we dont care about deriving these bytes securely) + -- this serves to attempt to maintain some backward compatibility with the + -- previous implementation (stripping a UUID of its hyphens), while significantly + -- expanding the size of the keyspace. + random_string = function() + -- get 24 bytes, which will return a 32 char string after encoding + -- this is done in attempt to maintain backwards compatibility as + -- much as possible while improving the strength of this function + return encode_base64(get_rand_bytes(24, true)) + :gsub("/", char(rand(48, 57))) -- 0 - 10 + :gsub("+", char(rand(65, 90))) -- A - Z + :gsub("=", char(rand(97, 122))) -- a - z + end + +end +_M.random_string = random_string + + +return _M diff --git a/kong/tools/string.lua b/kong/tools/string.lua index 3ed03a5d293a..45aa2a4ab6b6 100644 --- a/kong/tools/string.lua +++ b/kong/tools/string.lua @@ -1,3 +1,11 @@ +local pl_stringx = require "pl.stringx" + + +local type = type +local ipairs = ipairs +local tostring = tostring +local lower = string.lower +local fmt = string.format local find = string.find local gsub = string.gsub @@ -5,6 +13,131 @@ local gsub = string.gsub local _M = {} +--- splits a string. +-- just a placeholder to the penlight `pl.stringx.split` function +-- @function split +_M.split = pl_stringx.split + + +--- strips whitespace from a string. +-- @function strip +_M.strip = function(str) + if str == nil then + return "" + end + str = tostring(str) + if #str > 200 then + return str:gsub("^%s+", ""):reverse():gsub("^%s+", ""):reverse() + else + return str:match("^%s*(.-)%s*$") + end +end + + +-- Numbers taken from table 3-7 in www.unicode.org/versions/Unicode6.2.0/UnicodeStandard-6.2.pdf +-- find-based solution inspired by http://notebook.kulchenko.com/programming/fixing-malformed-utf8-in-lua +function _M.validate_utf8(val) + local str = tostring(val) + local i, len = 1, #str + while i <= len do + if i == find(str, "[%z\1-\127]", i) then i = i + 1 + elseif i == find(str, "[\194-\223][\123-\191]", i) then i = i + 2 + elseif i == find(str, "\224[\160-\191][\128-\191]", i) + or i == find(str, "[\225-\236][\128-\191][\128-\191]", i) + or i == find(str, "\237[\128-\159][\128-\191]", i) + or i == find(str, "[\238-\239][\128-\191][\128-\191]", i) then i = i + 3 + elseif i == find(str, "\240[\144-\191][\128-\191][\128-\191]", i) + or i == find(str, "[\241-\243][\128-\191][\128-\191][\128-\191]", i) + or i == find(str, "\244[\128-\143][\128-\191][\128-\191]", i) then i = i + 4 + else + return false, i + end + end + + return true +end + + +--- +-- Converts bytes to another unit in a human-readable string. +-- @tparam number bytes A value in bytes. +-- +-- @tparam[opt] string unit The unit to convert the bytes into. Can be either +-- of `b/B`, `k/K`, `m/M`, or `g/G` for bytes (unchanged), kibibytes, +-- mebibytes, or gibibytes, respectively. Defaults to `b` (bytes). +-- @tparam[opt] number scale The number of digits to the right of the decimal +-- point. Defaults to 2. +-- @treturn string A human-readable string. +-- @usage +-- +-- bytes_to_str(5497558) -- "5497558" +-- bytes_to_str(5497558, "m") -- "5.24 MiB" +-- bytes_to_str(5497558, "G", 3) -- "5.120 GiB" +-- +function _M.bytes_to_str(bytes, unit, scale) + if not unit or unit == "" or lower(unit) == "b" then + return fmt("%d", bytes) + end + + scale = scale or 2 + + if type(scale) ~= "number" or scale < 0 then + error("scale must be equal or greater than 0", 2) + end + + local fspec = fmt("%%.%df", scale) + + if lower(unit) == "k" then + return fmt(fspec .. " KiB", bytes / 2^10) + end + + if lower(unit) == "m" then + return fmt(fspec .. " MiB", bytes / 2^20) + end + + if lower(unit) == "g" then + return fmt(fspec .. " GiB", bytes / 2^30) + end + + error("invalid unit '" .. unit .. "' (expected 'k/K', 'm/M', or 'g/G')", 2) +end + + +local try_decode_base64 +do + local decode_base64 = ngx.decode_base64 + local decode_base64url = require "ngx.base64".decode_base64url + + local function decode_base64_str(str) + if type(str) == "string" then + return decode_base64(str) + or decode_base64url(str) + or nil, "base64 decoding failed: invalid input" + + else + return nil, "base64 decoding failed: not a string" + end + end + + function try_decode_base64(value) + if type(value) == "table" then + for i, v in ipairs(value) do + value[i] = decode_base64_str(v) or v + end + + return value + end + + if type(value) == "string" then + return decode_base64_str(value) or value + end + + return value + end +end +_M.try_decode_base64 = try_decode_base64 + + local replace_dashes local replace_dashes_lower do diff --git a/kong/tools/utils.lua b/kong/tools/utils.lua index 37e7a83ebd8e..2bab014e55d5 100644 --- a/kong/tools/utils.lua +++ b/kong/tools/utils.lua @@ -34,8 +34,6 @@ local re_match = ngx.re.match local setmetatable = setmetatable ffi.cdef[[ -typedef unsigned char u_char; - typedef long time_t; typedef int clockid_t; typedef struct timespec { @@ -46,43 +44,10 @@ typedef struct timespec { int clock_gettime(clockid_t clk_id, struct timespec *tp); int gethostname(char *name, size_t len); - -int RAND_bytes(u_char *buf, int num); - -unsigned long ERR_get_error(void); -void ERR_load_crypto_strings(void); -void ERR_free_strings(void); - -const char *ERR_reason_error_string(unsigned long e); - -int open(const char * filename, int flags, ...); -size_t read(int fd, void *buf, size_t count); -int write(int fd, const void *ptr, int numbytes); -int close(int fd); -char *strerror(int errnum); ]] local _M = {} ---- splits a string. --- just a placeholder to the penlight `pl.stringx.split` function --- @function split -_M.split = split - ---- strips whitespace from a string. --- @function strip -_M.strip = function(str) - if str == nil then - return "" - end - str = tostring(str) - if #str > 200 then - return str:gsub("^%s+", ""):reverse():gsub("^%s+", ""):reverse() - else - return str:match("^%s*(.-)%s*$") - end -end - do local _system_infos @@ -134,109 +99,6 @@ do end -local get_rand_bytes - -do - local ngx_log = ngx.log - local WARN = ngx.WARN - - local system_constants = require "lua_system_constants" - local O_RDONLY = system_constants.O_RDONLY() - local ffi_fill = ffi.fill - local ffi_str = ffi.string - local bytes_buf_t = ffi.typeof "char[?]" - - local function urandom_bytes(buf, size) - local fd = C.open("/dev/urandom", O_RDONLY, 0) -- mode is ignored - if fd < 0 then - ngx_log(WARN, "Error opening random fd: ", - ffi_str(C.strerror(ffi.errno()))) - - return false - end - - local res = C.read(fd, buf, size) - if res <= 0 then - ngx_log(WARN, "Error reading from urandom: ", - ffi_str(C.strerror(ffi.errno()))) - - return false - end - - if C.close(fd) ~= 0 then - ngx_log(WARN, "Error closing urandom: ", - ffi_str(C.strerror(ffi.errno()))) - end - - return true - end - - -- try to get n_bytes of CSPRNG data, first via /dev/urandom, - -- and then falling back to OpenSSL if necessary - get_rand_bytes = function(n_bytes, urandom) - local buf = ffi_new(bytes_buf_t, n_bytes) - ffi_fill(buf, n_bytes, 0x0) - - -- only read from urandom if we were explicitly asked - if urandom then - local rc = urandom_bytes(buf, n_bytes) - - -- if the read of urandom was successful, we returned true - -- and buf is filled with our bytes, so return it as a string - if rc then - return ffi_str(buf, n_bytes) - end - end - - if C.RAND_bytes(buf, n_bytes) == 0 then - -- get error code - local err_code = C.ERR_get_error() - if err_code == 0 then - return nil, "could not get SSL error code from the queue" - end - - -- get human-readable error string - C.ERR_load_crypto_strings() - local err = C.ERR_reason_error_string(err_code) - C.ERR_free_strings() - - return nil, "could not get random bytes (" .. - "reason:" .. ffi_str(err) .. ") " - end - - return ffi_str(buf, n_bytes) - end - - _M.get_rand_bytes = get_rand_bytes -end - ---- Generates a random unique string --- @return string The random string (a chunk of base64ish-encoded random bytes) -do - local char = string.char - local rand = math.random - local encode_base64 = ngx.encode_base64 - - -- generate a random-looking string by retrieving a chunk of bytes and - -- replacing non-alphanumeric characters with random alphanumeric replacements - -- (we dont care about deriving these bytes securely) - -- this serves to attempt to maintain some backward compatibility with the - -- previous implementation (stripping a UUID of its hyphens), while significantly - -- expanding the size of the keyspace. - local function random_string() - -- get 24 bytes, which will return a 32 char string after encoding - -- this is done in attempt to maintain backwards compatibility as - -- much as possible while improving the strength of this function - return encode_base64(get_rand_bytes(24, true)) - :gsub("/", char(rand(48, 57))) -- 0 - 10 - :gsub("+", char(rand(65, 90))) -- A - Z - :gsub("=", char(rand(97, 122))) -- a - z - end - - _M.random_string = random_string -end - - do local url = require "socket.url" @@ -457,29 +319,6 @@ function _M.load_module_if_exists(module_name) end end --- Numbers taken from table 3-7 in www.unicode.org/versions/Unicode6.2.0/UnicodeStandard-6.2.pdf --- find-based solution inspired by http://notebook.kulchenko.com/programming/fixing-malformed-utf8-in-lua -function _M.validate_utf8(val) - local str = tostring(val) - local i, len = 1, #str - while i <= len do - if i == find(str, "[%z\1-\127]", i) then i = i + 1 - elseif i == find(str, "[\194-\223][\123-\191]", i) then i = i + 2 - elseif i == find(str, "\224[\160-\191][\128-\191]", i) - or i == find(str, "[\225-\236][\128-\191][\128-\191]", i) - or i == find(str, "\237[\128-\159][\128-\191]", i) - or i == find(str, "[\238-\239][\128-\191][\128-\191]", i) then i = i + 3 - elseif i == find(str, "\240[\144-\191][\128-\191][\128-\191]", i) - or i == find(str, "[\241-\243][\128-\191][\128-\191][\128-\191]", i) - or i == find(str, "\244[\128-\143][\128-\191][\128-\191]", i) then i = i + 4 - else - return false, i - end - end - - return true -end - do local ipmatcher = require "resty.ipmatcher" @@ -934,51 +773,6 @@ do end ---- --- Converts bytes to another unit in a human-readable string. --- @tparam number bytes A value in bytes. --- --- @tparam[opt] string unit The unit to convert the bytes into. Can be either --- of `b/B`, `k/K`, `m/M`, or `g/G` for bytes (unchanged), kibibytes, --- mebibytes, or gibibytes, respectively. Defaults to `b` (bytes). --- @tparam[opt] number scale The number of digits to the right of the decimal --- point. Defaults to 2. --- @treturn string A human-readable string. --- @usage --- --- bytes_to_str(5497558) -- "5497558" --- bytes_to_str(5497558, "m") -- "5.24 MiB" --- bytes_to_str(5497558, "G", 3) -- "5.120 GiB" --- -function _M.bytes_to_str(bytes, unit, scale) - if not unit or unit == "" or lower(unit) == "b" then - return fmt("%d", bytes) - end - - scale = scale or 2 - - if type(scale) ~= "number" or scale < 0 then - error("scale must be equal or greater than 0", 2) - end - - local fspec = fmt("%%.%df", scale) - - if lower(unit) == "k" then - return fmt(fspec .. " KiB", bytes / 2^10) - end - - if lower(unit) == "m" then - return fmt(fspec .. " MiB", bytes / 2^20) - end - - if lower(unit) == "g" then - return fmt(fspec .. " GiB", bytes / 2^30) - end - - error("invalid unit '" .. unit .. "' (expected 'k/K', 'm/M', or 'g/G')", 2) -end - - do local NGX_ERROR = ngx.ERROR @@ -1295,41 +1089,6 @@ end _M.time_ns = time_ns -local try_decode_base64 -do - local decode_base64 = ngx.decode_base64 - local decode_base64url = require "ngx.base64".decode_base64url - - local function decode_base64_str(str) - if type(str) == "string" then - return decode_base64(str) - or decode_base64url(str) - or nil, "base64 decoding failed: invalid input" - - else - return nil, "base64 decoding failed: not a string" - end - end - - function try_decode_base64(value) - if type(value) == "table" then - for i, v in ipairs(value) do - value[i] = decode_base64_str(v) or v - end - - return value - end - - if type(value) == "string" then - return decode_base64_str(value) or value - end - - return value - end -end -_M.try_decode_base64 = try_decode_base64 - - local get_now_ms local get_updated_now_ms local get_start_time_ms @@ -1370,7 +1129,9 @@ do "kong.tools.table", "kong.tools.sha256", "kong.tools.yield", + "kong.tools.string", "kong.tools.uuid", + "kong.tools.rand", } for _, str in ipairs(modules) do diff --git a/spec/01-unit/08-router_spec.lua b/spec/01-unit/08-router_spec.lua index b8b39777f697..114ff31fbe29 100644 --- a/spec/01-unit/08-router_spec.lua +++ b/spec/01-unit/08-router_spec.lua @@ -2150,40 +2150,73 @@ for _, flavor in ipairs({ "traditional", "traditional_compatible", "expressions" it("empty methods", function() use_case[1].route.methods = v - assert.equal(get_expression(use_case[1].route), [[(http.path ^= "/foo")]]) + assert.equal(get_expression(use_case[1].route), [[(http.path ^= r#"/foo"#)]]) assert(new_router(use_case)) end) it("empty hosts", function() use_case[1].route.hosts = v - assert.equal(get_expression(use_case[1].route), [[(http.method == "GET") && (http.path ^= "/foo")]]) + assert.equal(get_expression(use_case[1].route), [[(http.method == r#"GET"#) && (http.path ^= r#"/foo"#)]]) assert(new_router(use_case)) end) it("empty headers", function() use_case[1].route.headers = v - assert.equal(get_expression(use_case[1].route), [[(http.method == "GET") && (http.path ^= "/foo")]]) + assert.equal(get_expression(use_case[1].route), [[(http.method == r#"GET"#) && (http.path ^= r#"/foo"#)]]) assert(new_router(use_case)) end) it("empty paths", function() use_case[1].route.paths = v - assert.equal(get_expression(use_case[1].route), [[(http.method == "GET")]]) + assert.equal(get_expression(use_case[1].route), [[(http.method == r#"GET"#)]]) assert(new_router(use_case)) end) it("empty snis", function() use_case[1].route.snis = v - assert.equal(get_expression(use_case[1].route), [[(http.method == "GET") && (http.path ^= "/foo")]]) + assert.equal(get_expression(use_case[1].route), [[(http.method == r#"GET"#) && (http.path ^= r#"/foo"#)]]) assert(new_router(use_case)) end) end end) + describe("raw string", function() + local use_case + local get_expression = atc_compat.get_expression + + before_each(function() + use_case = { + { + service = service, + route = { + id = "e8fb37f1-102d-461e-9c51-6608a6bb8101", + methods = { "GET" }, + }, + }, + } + end) + + it("path has '\"'", function() + use_case[1].route.paths = { [[~/\"/*$]], } + + assert.equal([[(http.method == r#"GET"#) && (http.path ~ r#"^/\"/*$"#)]], + get_expression(use_case[1].route)) + assert(new_router(use_case)) + end) + + it("path has '\"#'", function() + use_case[1].route.paths = { [[~/\"#/*$]], } + + assert.equal([[(http.method == r#"GET"#) && (http.path ~ "^/\\\"#/*$")]], + get_expression(use_case[1].route)) + assert(new_router(use_case)) + end) + end) + describe("check regex with '\\'", function() local use_case local get_expression = atc_compat.get_expression @@ -2203,7 +2236,7 @@ for _, flavor in ipairs({ "traditional", "traditional_compatible", "expressions" it("regex path has double '\\'", function() use_case[1].route.paths = { [[~/\\/*$]], } - assert.equal([[(http.method == "GET") && (http.path ~ "^/\\\\/*$")]], + assert.equal([[(http.method == r#"GET"#) && (http.path ~ r#"^/\\/*$"#)]], get_expression(use_case[1].route)) assert(new_router(use_case)) end) @@ -2211,7 +2244,7 @@ for _, flavor in ipairs({ "traditional", "traditional_compatible", "expressions" it("regex path has '\\d'", function() use_case[1].route.paths = { [[~/\d+]], } - assert.equal([[(http.method == "GET") && (http.path ~ "^/\\d+")]], + assert.equal([[(http.method == r#"GET"#) && (http.path ~ r#"^/\d+"#)]], get_expression(use_case[1].route)) assert(new_router(use_case)) end) @@ -4659,7 +4692,7 @@ for _, flavor in ipairs({ "traditional", "traditional_compatible", "expressions" use_case[1].route.destinations = {{ ip = "192.168.0.1/16" },} assert.equal(get_expression(use_case[1].route), - [[(net.protocol != "tls" || (tls.sni == "www.example.org")) && (net.dst.ip in 192.168.0.0/16)]]) + [[(net.protocol != r#"tls"# || (tls.sni == r#"www.example.org"#)) && (net.dst.ip in 192.168.0.0/16)]]) assert(new_router(use_case)) end) @@ -4667,7 +4700,7 @@ for _, flavor in ipairs({ "traditional", "traditional_compatible", "expressions" use_case[1].route.destinations = v assert.equal(get_expression(use_case[1].route), - [[(net.protocol != "tls" || (tls.sni == "www.example.org")) && (net.src.ip == 127.0.0.1)]]) + [[(net.protocol != r#"tls"# || (tls.sni == r#"www.example.org"#)) && (net.src.ip == 127.0.0.1)]]) assert(new_router(use_case)) end) end diff --git a/spec/02-integration/04-admin_api/24-reconfiguration-completion_spec.lua b/spec/02-integration/04-admin_api/24-reconfiguration-completion_spec.lua new file mode 100644 index 000000000000..9f528c4bb46b --- /dev/null +++ b/spec/02-integration/04-admin_api/24-reconfiguration-completion_spec.lua @@ -0,0 +1,156 @@ +local helpers = require "spec.helpers" +local cjson = require "cjson" + +describe("Admin API - Reconfiguration Completion -", function() + + local WORKER_STATE_UPDATE_FREQ = 1 + + local admin_client + local proxy_client + + local function run_tests() + + local res = admin_client:post("/plugins", { + body = { + name = "request-termination", + config = { + status_code = 200, + body = "kong terminated the request", + } + }, + headers = { ["Content-Type"] = "application/json" }, + }) + assert.res_status(201, res) + + res = admin_client:post("/services", { + body = { + name = "test-service", + url = "http://127.0.0.1", + }, + headers = { ["Content-Type"] = "application/json" }, + }) + local body = assert.res_status(201, res) + local service = cjson.decode(body) + + -- We're running the route setup in `eventually` to cover for the unlikely case that reconfiguration completes + -- between adding the route and requesting the path through the proxy path. + + local next_path do + local path_suffix = 0 + function next_path() + path_suffix = path_suffix + 1 + return "/" .. tostring(path_suffix) + end + end + + local service_path + local kong_transaction_id + + assert.eventually(function() + service_path = next_path() + + res = admin_client:post("/services/" .. service.id .. "/routes", { + body = { + paths = { service_path } + }, + headers = { ["Content-Type"] = "application/json" }, + }) + assert.res_status(201, res) + kong_transaction_id = res.headers['x-kong-transaction-id'] + assert.is_string(kong_transaction_id) + + res = proxy_client:get(service_path, + { + headers = { + ["X-If-Kong-Transaction-Id"] = kong_transaction_id + } + }) + assert.res_status(503, res) + assert.equals("pending", res.headers['x-kong-reconfiguration-status']) + local retry_after = tonumber(res.headers['retry-after']) + ngx.sleep(retry_after) + end) + .has_no_error() + + assert.eventually(function() + res = proxy_client:get(service_path, + { + headers = { + ["X-If-Kong-Transaction-Id"] = kong_transaction_id + } + }) + body = assert.res_status(200, res) + assert.equals("kong terminated the request", body) + end) + .has_no_error() + end + + describe("#traditional mode -", function() + lazy_setup(function() + helpers.get_db_utils() + assert(helpers.start_kong({ + worker_consistency = "eventual", + worker_state_update_frequency = WORKER_STATE_UPDATE_FREQ, + })) + admin_client = helpers.admin_client() + proxy_client = helpers.proxy_client() + end) + + teardown(function() + if admin_client then + admin_client:close() + end + if proxy_client then + proxy_client:close() + end + helpers.stop_kong() + end) + + it("rejects proxy requests if worker state has not been updated yet", run_tests) + end) + + describe("#hybrid mode -", function() + lazy_setup(function() + helpers.get_db_utils() + + assert(helpers.start_kong({ + role = "control_plane", + database = "postgres", + prefix = "cp", + cluster_cert = "spec/fixtures/kong_clustering.crt", + cluster_cert_key = "spec/fixtures/kong_clustering.key", + lua_ssl_trusted_certificate = "spec/fixtures/kong_clustering.crt", + cluster_listen = "127.0.0.1:9005", + cluster_telemetry_listen = "127.0.0.1:9006", + nginx_conf = "spec/fixtures/custom_nginx.template", + })) + + assert(helpers.start_kong({ + role = "data_plane", + database = "off", + prefix = "dp", + cluster_cert = "spec/fixtures/kong_clustering.crt", + cluster_cert_key = "spec/fixtures/kong_clustering.key", + lua_ssl_trusted_certificate = "spec/fixtures/kong_clustering.crt", + cluster_control_plane = "127.0.0.1:9005", + cluster_telemetry_endpoint = "127.0.0.1:9006", + proxy_listen = "0.0.0.0:9002", + })) + admin_client = helpers.admin_client() + proxy_client = helpers.proxy_client("127.0.0.1", 9002) + end) + + teardown(function() + if admin_client then + admin_client:close() + end + if proxy_client then + proxy_client:close() + end + helpers.stop_kong("dp") + helpers.stop_kong("cp") + end) + + it("rejects proxy requests if worker state has not been updated yet", run_tests) + end) +end)