From 73158170750bb6655f2efcbeb11b50e9bff89a4d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Hans=20H=C3=BCbner?= Date: Fri, 11 Aug 2023 16:41:54 +0200 Subject: [PATCH 01/25] fix(dns): fix retry and timeout handling (#11386) - Stop retrying in dns/client.lua, let the resolver handle this. This change also makes it possible to disable retries, which previously was not possible - Be more faithful to the timeouts set by the user. Previously, the timeout configured was used only for the ultimate request sent to the DNS server, but asynchronous requests allowed longer timeouts which was not transparent. - When the DNS server fails, stop trying other query types. Previously, the behavior was such that after an (intermediate) failure to query for one record type (say "SRV"), the client would try the next record type (say "A") and succeed with that. It would then return the contents of the "A" record even if the "SRV" record pointed to a different address. - Change domain names used for testing the DNS client into the kong-gateway-testing.link zone, which is controlled by the Kong Gateway team. Fixes https://github.com/Kong/kong/issues/10182 KAG-2300 Signed-off-by: Xiaochen Wang --- .../fix_dns_retry_and_timeout_handling.yml | 3 + kong/resty/dns/client.lua | 95 +++++++-------- spec/01-unit/21-dns-client/02-client_spec.lua | 110 ++++++++++++++---- .../lua-resty-dns/resty/dns/resolver.lua | 14 ++- 4 files changed, 140 insertions(+), 82 deletions(-) create mode 100644 changelog/unreleased/kong/fix_dns_retry_and_timeout_handling.yml diff --git a/changelog/unreleased/kong/fix_dns_retry_and_timeout_handling.yml b/changelog/unreleased/kong/fix_dns_retry_and_timeout_handling.yml new file mode 100644 index 0000000000..359666786c --- /dev/null +++ b/changelog/unreleased/kong/fix_dns_retry_and_timeout_handling.yml @@ -0,0 +1,3 @@ +message: Update the DNS client to follow configured timeouts in a more predictable manner +type: bugfix +scope: Core diff --git a/kong/resty/dns/client.lua b/kong/resty/dns/client.lua index b361a25c02..c5bfc55821 100644 --- a/kong/resty/dns/client.lua +++ b/kong/resty/dns/client.lua @@ -360,9 +360,9 @@ end -- @param self the try_list to add to -- @param status string with current status, added to the list for the current try -- @return the try_list -local function try_status(self, status) - local status_list = self[#self].msg - status_list[#status_list + 1] = status +local function add_status_to_try_list(self, status) + local try_list = self[#self].msg + try_list[#try_list + 1] = status return self end @@ -383,8 +383,7 @@ end -- @section resolving -local poolMaxWait -local poolMaxRetry +local resolve_max_wait --- Initialize the client. Can be called multiple times. When called again it -- will clear the cache. @@ -638,8 +637,7 @@ _M.init = function(options) config = options -- store it in our module level global - poolMaxRetry = 1 -- do one retry, dns resolver is already doing 'retrans' number of retries on top - poolMaxWait = options.timeout / 1000 * options.retrans -- default is to wait for the dns resolver to hit its timeouts + resolve_max_wait = options.timeout / 1000 * options.retrans -- maximum time to wait for the dns resolver to hit its timeouts return true end @@ -672,7 +670,7 @@ local function parseAnswer(qname, qtype, answers, try_list) if (answer.type ~= qtype) or (answer.name ~= check_qname) then local key = answer.type..":"..answer.name - try_status(try_list, key .. " removed") + add_status_to_try_list(try_list, key .. " removed") local lst = others[key] if not lst then lst = {} @@ -710,7 +708,7 @@ local function individualQuery(qname, r_opts, try_list) return r, "failed to create a resolver: " .. err, try_list end - try_status(try_list, "querying") + add_status_to_try_list(try_list, "querying") local result result, err = r:query(qname, r_opts) @@ -737,7 +735,7 @@ local function executeQuery(premature, item) --[[ log(DEBUG, PREFIX, "Query executing: ", item.qname, ":", item.r_opts.qtype, " ", fquery(item)) --]] - try_status(item.try_list, "querying") + add_status_to_try_list(item.try_list, "querying") item.result, item.err = r:query(item.qname, item.r_opts) if item.result then --[[ @@ -779,7 +777,7 @@ local function asyncQuery(qname, r_opts, try_list) --[[ log(DEBUG, PREFIX, "Query async (exists): ", key, " ", fquery(item)) --]] - try_status(try_list, "in progress (async)") + add_status_to_try_list(try_list, "in progress (async)") return item -- already in progress, return existing query end @@ -801,7 +799,7 @@ local function asyncQuery(qname, r_opts, try_list) --[[ log(DEBUG, PREFIX, "Query async (scheduled): ", key, " ", fquery(item)) --]] - try_status(try_list, "scheduled") + add_status_to_try_list(try_list, "scheduled") return item end @@ -809,33 +807,24 @@ end -- schedules a sync query. -- This will be synchronized, so multiple calls (sync or async) might result in 1 query. --- The `poolMaxWait` is how long a thread waits for another to complete the query. --- The `poolMaxRetry` is how often we wait for another query to complete. --- The maximum delay would be `poolMaxWait * poolMaxRetry`. +-- The maximum delay would be `options.timeout * options.retrans`. -- @param qname the name to query for -- @param r_opts a table with the query options -- @param try_list the try_list object to add to -- @return `result + nil + try_list`, or `nil + err + try_list` in case of errors -local function syncQuery(qname, r_opts, try_list, count) +local function syncQuery(qname, r_opts, try_list) local key = qname..":"..r_opts.qtype local item = queue[key] - count = count or 1 -- if nothing is in progress, we start a new async query if not item then local err item, err = asyncQuery(qname, r_opts, try_list) - --[[ - log(DEBUG, PREFIX, "Query sync (new): ", key, " ", fquery(item)," count=", count) - --]] if not item then return item, err, try_list end else - --[[ - log(DEBUG, PREFIX, "Query sync (exists): ", key, " ", fquery(item)," count=", count) - --]] - try_status(try_list, "in progress (sync)") + add_status_to_try_list(try_list, "in progress (sync)") end local supported_semaphore_wait_phases = { @@ -860,7 +849,7 @@ local function syncQuery(qname, r_opts, try_list, count) end -- block and wait for the async query to complete - local ok, err = item.semaphore:wait(poolMaxWait) + local ok, err = item.semaphore:wait(resolve_max_wait) if ok and item.result then -- we were released, and have a query result from the -- other thread, so all is well, return it @@ -871,25 +860,16 @@ local function syncQuery(qname, r_opts, try_list, count) return item.result, item.err, try_list end - -- there was an error, either a semaphore timeout, or a lookup error - -- go retry - try_status(try_list, "try "..count.." error: "..(item.err or err or "unknown")) - if count > poolMaxRetry then - --[[ - log(DEBUG, PREFIX, "Query sync (fail): ", key, " ", fquery(item)," retries exceeded. count=", count) - --]] - return nil, "dns lookup pool exceeded retries (" .. - tostring(poolMaxRetry) .. "): "..tostring(item.err or err), - try_list - end + err = err or item.err or "unknown" + add_status_to_try_list(try_list, "error: "..err) -- don't block on the same thread again, so remove it from the queue - if queue[key] == item then queue[key] = nil end + if queue[key] == item then + queue[key] = nil + end - --[[ - log(DEBUG, PREFIX, "Query sync (fail): ", key, " ", fquery(item)," retrying. count=", count) - --]] - return syncQuery(qname, r_opts, try_list, count + 1) + -- there was an error, either a semaphore timeout, or a lookup error + return nil, err end -- will lookup a name in the cache, or alternatively query the nameservers. @@ -928,7 +908,7 @@ local function lookup(qname, r_opts, dnsCacheOnly, try_list) try_list = try_add(try_list, qname, r_opts.qtype, "cache-hit") if entry.expired then -- the cached record is stale but usable, so we do a refresh query in the background - try_status(try_list, "stale") + add_status_to_try_list(try_list, "stale") asyncQuery(qname, r_opts, try_list) end @@ -946,7 +926,7 @@ local function check_ipv6(qname, qtype, try_list) local record = cachelookup(qname, qtype) if record then - try_status(try_list, "cached") + add_status_to_try_list(try_list, "cached") return record, nil, try_list end @@ -966,7 +946,7 @@ local function check_ipv6(qname, qtype, try_list) end if qtype == _M.TYPE_AAAA and check:match("^%x%x?%x?%x?:%x%x?%x?%x?:%x%x?%x?%x?:%x%x?%x?%x?:%x%x?%x?%x?:%x%x?%x?%x?:%x%x?%x?%x?:%x%x?%x?%x?$") then - try_status(try_list, "validated") + add_status_to_try_list(try_list, "validated") record = {{ address = qname, type = _M.TYPE_AAAA, @@ -978,7 +958,7 @@ local function check_ipv6(qname, qtype, try_list) else -- not a valid IPv6 address, or a bad type (non ipv6) -- return a "server error" - try_status(try_list, "bad IPv6") + add_status_to_try_list(try_list, "bad IPv6") record = { errcode = 3, errstr = "name error", @@ -999,12 +979,12 @@ local function check_ipv4(qname, qtype, try_list) local record = cachelookup(qname, qtype) if record then - try_status(try_list, "cached") + add_status_to_try_list(try_list, "cached") return record, nil, try_list end if qtype == _M.TYPE_A then - try_status(try_list, "validated") + add_status_to_try_list(try_list, "validated") record = {{ address = qname, type = _M.TYPE_A, @@ -1016,7 +996,7 @@ local function check_ipv4(qname, qtype, try_list) else -- bad query type for this ipv4 address -- return a "server error" - try_status(try_list, "bad IPv4") + add_status_to_try_list(try_list, "bad IPv4") record = { errcode = 3, errstr = "name error", @@ -1160,7 +1140,7 @@ local function resolve(qname, r_opts, dnsCacheOnly, try_list) records = nil -- luacheck: pop err = "recursion detected" - try_status(try_list, "recursion detected") + add_status_to_try_list(try_list, "recursion detected") return nil, err, try_list end end @@ -1172,14 +1152,14 @@ local function resolve(qname, r_opts, dnsCacheOnly, try_list) -- luacheck: push no unused records = nil -- luacheck: pop - try_list = try_status(try_list, "stale") + try_list = add_status_to_try_list(try_list, "stale") else -- a valid non-stale record -- check for CNAME records, and dereferencing the CNAME if (records[1] or EMPTY).type == _M.TYPE_CNAME and qtype ~= _M.TYPE_CNAME then opts.qtype = nil - try_status(try_list, "dereferencing") + add_status_to_try_list(try_list, "dereferencing") return resolve(records[1].cname, opts, dnsCacheOnly, try_list) end @@ -1227,8 +1207,10 @@ local function resolve(qname, r_opts, dnsCacheOnly, try_list) end if not records then -- luacheck: ignore - -- nothing to do, an error - -- fall through to the next entry in our search sequence + -- An error has occurred, terminate the lookup process. We don't want to try other record types because + -- that would potentially cause us to respond with wrong answers (i.e. the contents of an A record if the + -- query for the SRV record failed due to a network error). + goto failed elseif records.errcode then -- dns error: fall through to the next entry in our search sequence @@ -1287,7 +1269,7 @@ local function resolve(qname, r_opts, dnsCacheOnly, try_list) if records[1].type == _M.TYPE_CNAME and qtype ~= _M.TYPE_CNAME then -- dereference CNAME opts.qtype = nil - try_status(try_list, "dereferencing") + add_status_to_try_list(try_list, "dereferencing") return resolve(records[1].cname, opts, dnsCacheOnly, try_list) end @@ -1296,8 +1278,9 @@ local function resolve(qname, r_opts, dnsCacheOnly, try_list) end -- we had some error, record it in the status list - try_status(try_list, err) + add_status_to_try_list(try_list, err) end + ::failed:: -- we failed, clear cache and return last error if not dnsCacheOnly then @@ -1507,7 +1490,7 @@ local function toip(qname, port, dnsCacheOnly, try_list) local entry = rec[roundRobinW(rec)] -- our SRV entry might still contain a hostname, so recurse, with found port number local srvport = (entry.port ~= 0 and entry.port) or port -- discard port if it is 0 - try_status(try_list, "dereferencing SRV") + add_status_to_try_list(try_list, "dereferencing SRV") return toip(entry.target, srvport, dnsCacheOnly, try_list) else -- must be A or AAAA diff --git a/spec/01-unit/21-dns-client/02-client_spec.lua b/spec/01-unit/21-dns-client/02-client_spec.lua index 586ab0edeb..106e47fde1 100644 --- a/spec/01-unit/21-dns-client/02-client_spec.lua +++ b/spec/01-unit/21-dns-client/02-client_spec.lua @@ -3,6 +3,13 @@ local tempfilename = require("pl.path").tmpname local pretty = require("pl.pretty").write +-- Several DNS tests use the actual DNS to verify the client behavior against real name servers. It seems that +-- even though we have a DNS mocking system, it is good to have some coverage against actual servers to ensure that +-- we're not relying on mocked behavior. We use the domain name kong-gateway-testing.link, which is hosted in Route53 +-- in the AWS sandbox, allowing Gateway developers to make additions if required. +local TEST_DOMAIN="kong-gateway-testing.link" + + -- empty records and not found errors should be identical, hence we -- define a constant for that error message local NOT_FOUND_ERROR = "dns server error: 3 name error" @@ -79,7 +86,7 @@ describe("[DNS client]", function() resolvConf = {"nameserver [fe80::1%enp0s20f0u1u1]"}, }) end) - local ip, port = client.toip("thijsschreijer.nl") + local ip, port = client.toip(TEST_DOMAIN) assert.is_nil(ip) assert.not_matches([[failed to parse host name "[fe80::1%enp0s20f0u1u1]": invalid IPv6 address]], port, nil, true) assert.matches([[failed to create a resolver: no nameservers specified]], port, nil, true) @@ -557,13 +564,68 @@ describe("[DNS client]", function() }, list) end) + for retrans in ipairs({1, 2}) do + for _, timeout in ipairs({1, 2}) do + it("correctly observes #timeout of " .. tostring(timeout) .. " seconds with " .. tostring(retrans) .. " retries", function() + -- KAG-2300 - https://github.com/Kong/kong/issues/10182 + -- If we encounter a timeout while talking to the DNS server, expect the total timeout to be close to the + -- configured timeout * retrans parameters + assert(client.init({ + resolvConf = { + "nameserver 198.51.100.0", + "domain one.com", + }, + timeout = timeout * 1000, + retrans = retrans, + hosts = { + "127.0.0.1 host" + } + })) + query_func = function(self, original_query_func, name, options) + ngx.sleep(5) + return nil + end + local start_time = ngx.now() + client.resolve("host1.one.com.") + local duration = ngx.now() - start_time + assert.truthy(duration < (timeout * retrans + 1)) + end) + end + end + + -- The domain name below needs to have both a SRV and an A record + local SRV_A_TEST_NAME = "timeouttest."..TEST_DOMAIN + + it("verify correctly set up test DNS entry", function() + assert(client.init({ timeout = 1000, retrans = 2 })) + local answers = client.resolve(SRV_A_TEST_NAME, { qtype = client.TYPE_SRV}) + assert.same(client.TYPE_SRV, answers[1].type) + answers = client.resolve(SRV_A_TEST_NAME, { qtype = client.TYPE_A}) + assert.same(client.TYPE_A, answers[1].type) + end) + + it("does not respond with incorrect answer on transient failure", function() + -- KAG-2300 - https://github.com/Kong/kong/issues/10182 + -- If we encounter a timeout while talking to the DNS server, don't keep trying with other record types + assert(client.init({ timeout = 1000, retrans = 2 })) + query_func = function(self, original_query_func, name, options) + if options.qtype == client.TYPE_SRV then + ngx.sleep(10) + else + return original_query_func(self, name, options) + end + end + local answers = client.resolve(SRV_A_TEST_NAME) + assert.is_nil(answers) + end) + end) it("fetching a record without nameservers errors", function() assert(client.init({ resolvConf = {} })) - local host = "thijsschreijer.nl" + local host = TEST_DOMAIN local typ = client.TYPE_A local answers, err, _ = client.resolve(host, { qtype = typ }) @@ -574,7 +636,7 @@ describe("[DNS client]", function() it("fetching a TXT record", function() assert(client.init()) - local host = "txttest.thijsschreijer.nl" + local host = "txttest."..TEST_DOMAIN local typ = client.TYPE_TXT local answers, err, try_list = client.resolve(host, { qtype = typ }) @@ -587,7 +649,7 @@ describe("[DNS client]", function() it("fetching a CNAME record", function() assert(client.init()) - local host = "smtp.thijsschreijer.nl" + local host = "smtp."..TEST_DOMAIN local typ = client.TYPE_CNAME local answers = assert(client.resolve(host, { qtype = typ })) @@ -599,7 +661,7 @@ describe("[DNS client]", function() it("fetching a CNAME record FQDN", function() assert(client.init()) - local host = "smtp.thijsschreijer.nl" + local host = "smtp."..TEST_DOMAIN local typ = client.TYPE_CNAME local answers = assert(client.resolve(host .. ".", { qtype = typ })) @@ -611,7 +673,7 @@ describe("[DNS client]", function() it("expire and touch times", function() assert(client.init()) - local host = "txttest.thijsschreijer.nl" + local host = "txttest."..TEST_DOMAIN local typ = client.TYPE_TXT local answers, _, _ = assert(client.resolve(host, { qtype = typ })) @@ -667,7 +729,7 @@ describe("[DNS client]", function() it("fetching multiple A records", function() assert(client.init()) - local host = "atest.thijsschreijer.nl" + local host = "atest."..TEST_DOMAIN local typ = client.TYPE_A local answers = assert(client.resolve(host, { qtype = typ })) @@ -681,7 +743,7 @@ describe("[DNS client]", function() it("fetching multiple A records FQDN", function() assert(client.init()) - local host = "atest.thijsschreijer.nl" + local host = "atest."..TEST_DOMAIN local typ = client.TYPE_A local answers = assert(client.resolve(host .. ".", { qtype = typ })) @@ -710,20 +772,20 @@ describe("[DNS client]", function() This does not affect client side code, as the result is always the final A record. --]] - local host = "smtp.thijsschreijer.nl" + local host = "smtp."..TEST_DOMAIN local typ = client.TYPE_A local answers, _, _ = assert(client.resolve(host)) -- check first CNAME local key1 = client.TYPE_CNAME..":"..host local entry1 = lrucache:get(key1) - assert.are.equal(host, entry1[1].name) -- the 1st record is the original 'smtp.thijsschreijer.nl' + assert.are.equal(host, entry1[1].name) -- the 1st record is the original 'smtp.'..TEST_DOMAIN assert.are.equal(client.TYPE_CNAME, entry1[1].type) -- and that is a CNAME -- check second CNAME local key2 = client.TYPE_CNAME..":"..entry1[1].cname local entry2 = lrucache:get(key2) - assert.are.equal(entry1[1].cname, entry2[1].name) -- the 2nd is the middle 'thuis.thijsschreijer.nl' + assert.are.equal(entry1[1].cname, entry2[1].name) -- the 2nd is the middle 'thuis.'..TEST_DOMAIN assert.are.equal(client.TYPE_CNAME, entry2[1].type) -- and that is also a CNAME -- check second target to match final record @@ -745,7 +807,7 @@ describe("[DNS client]", function() it("fetching multiple SRV records (un-typed)", function() assert(client.init()) - local host = "srvtest.thijsschreijer.nl" + local host = "srvtest."..TEST_DOMAIN local typ = client.TYPE_SRV -- un-typed lookup @@ -763,7 +825,7 @@ describe("[DNS client]", function() assert(client.init({ search = {}, })) local lrucache = client.getcache() - local host = "cname2srv.thijsschreijer.nl" + local host = "cname2srv."..TEST_DOMAIN local typ = client.TYPE_SRV -- un-typed lookup @@ -793,7 +855,7 @@ describe("[DNS client]", function() }, })) - local host = "srvtest.thijsschreijer.nl" + local host = "srvtest."..TEST_DOMAIN local typ = client.TYPE_A --> the entry is SRV not A local answers, err, _ = client.resolve(host, {qtype = typ}) @@ -809,7 +871,7 @@ describe("[DNS client]", function() }, })) - local host = "IsNotHere.thijsschreijer.nl" + local host = "IsNotHere."..TEST_DOMAIN local answers, err, _ = client.resolve(host) assert.is_nil(answers) @@ -1099,7 +1161,7 @@ describe("[DNS client]", function() describe("toip() function", function() it("A/AAAA-record, round-robin",function() assert(client.init({ search = {}, })) - local host = "atest.thijsschreijer.nl" + local host = "atest."..TEST_DOMAIN local answers = assert(client.resolve(host)) answers.last_index = nil -- make sure to clean local ips = {} @@ -1303,11 +1365,12 @@ describe("[DNS client]", function() assert.is_number(port) assert.is_not.equal(0, port) end) + it("port passing if SRV port=0",function() assert(client.init({ search = {}, })) local ip, port, host - host = "srvport0.thijsschreijer.nl" + host = "srvport0."..TEST_DOMAIN ip, port = client.toip(host, 10) assert.is_string(ip) assert.is_number(port) @@ -1317,6 +1380,7 @@ describe("[DNS client]", function() assert.is_string(ip) assert.is_nil(port) end) + it("recursive SRV pointing to itself",function() assert(client.init({ resolvConf = { @@ -1325,7 +1389,7 @@ describe("[DNS client]", function() }, })) local ip, record, port, host, err, _ - host = "srvrecurse.thijsschreijer.nl" + host = "srvrecurse."..TEST_DOMAIN -- resolve SRV specific should return the record including its -- recursive entry @@ -1480,7 +1544,7 @@ describe("[DNS client]", function() --empty/error responses should be cached for a configurable time local emptyTtl = 0.1 local staleTtl = 0.1 - local qname = "really.really.really.does.not.exist.thijsschreijer.nl" + local qname = "really.really.really.does.not.exist."..TEST_DOMAIN assert(client.init({ emptyTtl = emptyTtl, staleTtl = staleTtl, @@ -1648,7 +1712,7 @@ describe("[DNS client]", function() -- starting resolving coroutine.yield(coroutine.running()) local result, _, _ = client.resolve( - "thijsschreijer.nl", + TEST_DOMAIN, { qtype = client.TYPE_A } ) table.insert(results, result) @@ -1692,7 +1756,7 @@ describe("[DNS client]", function() })) -- insert a stub thats waits and returns a fixed record - local name = "thijsschreijer.nl" + local name = TEST_DOMAIN query_func = function() local ip = "1.4.2.3" local entry = { @@ -1738,7 +1802,7 @@ describe("[DNS client]", function() -- all results are equal, as they all will wait for the first response for i = 1, 10 do - assert.equal("dns lookup pool exceeded retries (1): timeout", results[i]) + assert.equal("timeout", results[i]) end end) end) @@ -1755,7 +1819,7 @@ describe("[DNS client]", function() -- insert a stub thats waits and returns a fixed record local call_count = 0 - local name = "thijsschreijer.nl" + local name = TEST_DOMAIN query_func = function() local ip = "1.4.2.3" local entry = { diff --git a/spec/fixtures/mocks/lua-resty-dns/resty/dns/resolver.lua b/spec/fixtures/mocks/lua-resty-dns/resty/dns/resolver.lua index 75dcea922b..f1b83355ab 100644 --- a/spec/fixtures/mocks/lua-resty-dns/resty/dns/resolver.lua +++ b/spec/fixtures/mocks/lua-resty-dns/resty/dns/resolver.lua @@ -95,10 +95,18 @@ resolver.query = function(self, name, options, tries) end if not mocks_only then - -- no mock, so invoke original resolver - local a, b, c = old_query(self, name, options, tries) - return a, b, c + -- No mock, so invoke original resolver. Note that if the original resolver fails (i.e. because an + -- invalid domain name like example.com was used), we return an empty result set instead of passing + -- the error up to the caller. This is done so that if the mock contains "A" records (which would + -- be the most common case), the initial query for a SRV record does not fail, but appear not to have + -- yielded any results. This will make dns/client.lua try finding an A record next. + local records, err, tries = old_query(self, name, options, tries) + if records then + return records, err, tries + end end + + return {}, nil, tries end do From 30c178fb45845f9775aeb915a99048bbc89864d1 Mon Sep 17 00:00:00 2001 From: Xiaochen Wang Date: Wed, 15 Nov 2023 14:41:52 +0800 Subject: [PATCH 02/25] fix(dns): eliminate asynchronous timer in `syncQuery()` to prevent deadlock risk (#11900) * fix(dns): introduce the synchronous query in syncQuery() to prevent hang risk Originally the first request to `syncQuery()` will trigger an asynchronous timer event, which added the risk of thread pool hanging. With this patch, cold synchronously DNS query will always happen in the current thread if current phase supports yielding. Fix FTI-5348 --------- Co-authored-by: Datong Sun --- .../unreleased/kong/fix_dns_blocking.yml | 3 + kong/resty/dns/client.lua | 143 +++++++++--------- spec/01-unit/21-dns-client/02-client_spec.lua | 22 ++- t/03-dns-client/01-phases.t | 7 +- t/03-dns-client/02-timer-usage.t | 80 +++++----- 5 files changed, 134 insertions(+), 121 deletions(-) create mode 100644 changelog/unreleased/kong/fix_dns_blocking.yml diff --git a/changelog/unreleased/kong/fix_dns_blocking.yml b/changelog/unreleased/kong/fix_dns_blocking.yml new file mode 100644 index 0000000000..a167c5fa16 --- /dev/null +++ b/changelog/unreleased/kong/fix_dns_blocking.yml @@ -0,0 +1,3 @@ +message: Eliminate asynchronous timer in syncQuery() to prevent hang risk +type: bugfix +scope: Core diff --git a/kong/resty/dns/client.lua b/kong/resty/dns/client.lua index c5bfc55821..913dd3efc8 100644 --- a/kong/resty/dns/client.lua +++ b/kong/resty/dns/client.lua @@ -30,10 +30,10 @@ local time = ngx.now local log = ngx.log local ERR = ngx.ERR local WARN = ngx.WARN +local ALERT = ngx.ALERT local DEBUG = ngx.DEBUG local PREFIX = "[dns-client] " local timer_at = ngx.timer.at -local get_phase = ngx.get_phase local math_min = math.min local math_max = math.max @@ -637,7 +637,9 @@ _M.init = function(options) config = options -- store it in our module level global - resolve_max_wait = options.timeout / 1000 * options.retrans -- maximum time to wait for the dns resolver to hit its timeouts + -- maximum time to wait for the dns resolver to hit its timeouts + -- + 1s to ensure some delay in timer execution and semaphore return are accounted for + resolve_max_wait = options.timeout / 1000 * options.retrans + 1 return true end @@ -721,44 +723,62 @@ local function individualQuery(qname, r_opts, try_list) return result, nil, try_list end - local queue = setmetatable({}, {__mode = "v"}) + +local function enqueue_query(key, qname, r_opts, try_list) + local item = { + key = key, + semaphore = semaphore(), + qname = qname, + r_opts = deepcopy(r_opts), + try_list = try_list, + expire_time = time() + resolve_max_wait, + } + queue[key] = item + return item +end + + +local function dequeue_query(item) + if queue[item.key] == item then + -- query done, but by now many others might be waiting for our result. + -- 1) stop new ones from adding to our lock/semaphore + queue[item.key] = nil + -- 2) release all waiting threads + item.semaphore:post(math_max(item.semaphore:count() * -1, 1)) + item.semaphore = nil + end +end + + +local function queue_get_query(key, try_list) + local item = queue[key] + + if not item then + return nil + end + + -- bug checks: release it actively if the waiting query queue is blocked + if item.expire_time < time() then + local err = "stale query, key:" .. key + add_status_to_try_list(try_list, err) + log(ALERT, PREFIX, err) + dequeue_query(item) + return nil + end + + return item +end + + -- to be called as a timer-callback, performs a query and returns the results -- in the `item` table. local function executeQuery(premature, item) if premature then return end - local r, err = resolver:new(config) - if not r then - item.result, item.err = r, "failed to create a resolver: " .. err - else - --[[ - log(DEBUG, PREFIX, "Query executing: ", item.qname, ":", item.r_opts.qtype, " ", fquery(item)) - --]] - add_status_to_try_list(item.try_list, "querying") - item.result, item.err = r:query(item.qname, item.r_opts) - if item.result then - --[[ - log(DEBUG, PREFIX, "Query answer: ", item.qname, ":", item.r_opts.qtype, " ", fquery(item), - " ", frecord(item.result)) - --]] - parseAnswer(item.qname, item.r_opts.qtype, item.result, item.try_list) - --[[ - log(DEBUG, PREFIX, "Query parsed answer: ", item.qname, ":", item.r_opts.qtype, " ", fquery(item), - " ", frecord(item.result)) - else - log(DEBUG, PREFIX, "Query error: ", item.qname, ":", item.r_opts.qtype, " err=", tostring(err)) - --]] - end - end + item.result, item.err = individualQuery(item.qname, item.r_opts, item.try_list) - -- query done, but by now many others might be waiting for our result. - -- 1) stop new ones from adding to our lock/semaphore - queue[item.key] = nil - -- 2) release all waiting threads - item.semaphore:post(math_max(item.semaphore:count() * -1, 1)) - item.semaphore = nil - ngx.sleep(0) + dequeue_query(item) end @@ -772,7 +792,7 @@ end -- the `semaphore` field will be removed). Upon error it returns `nil+error`. local function asyncQuery(qname, r_opts, try_list) local key = qname..":"..r_opts.qtype - local item = queue[key] + local item = queue_get_query(key, try_list) if item then --[[ log(DEBUG, PREFIX, "Query async (exists): ", key, " ", fquery(item)) @@ -781,14 +801,7 @@ local function asyncQuery(qname, r_opts, try_list) return item -- already in progress, return existing query end - item = { - key = key, - semaphore = semaphore(), - qname = qname, - r_opts = deepcopy(r_opts), - try_list = try_list, - } - queue[key] = item + item = enqueue_query(key, qname, r_opts, try_list) local ok, err = timer_at(0, executeQuery, item) if not ok then @@ -814,40 +827,24 @@ end -- @return `result + nil + try_list`, or `nil + err + try_list` in case of errors local function syncQuery(qname, r_opts, try_list) local key = qname..":"..r_opts.qtype - local item = queue[key] - -- if nothing is in progress, we start a new async query + local item = queue_get_query(key, try_list) + + -- If nothing is in progress, we start a new sync query if not item then - local err - item, err = asyncQuery(qname, r_opts, try_list) - if not item then - return item, err, try_list - end - else - add_status_to_try_list(try_list, "in progress (sync)") - end + item = enqueue_query(key, qname, r_opts, try_list) - local supported_semaphore_wait_phases = { - rewrite = true, - access = true, - content = true, - timer = true, - ssl_cert = true, - ssl_session_fetch = true, - } + item.result, item.err = individualQuery(qname, item.r_opts, try_list) - local ngx_phase = get_phase() + dequeue_query(item) - if not supported_semaphore_wait_phases[ngx_phase] then - -- phase not supported by `semaphore:wait` - -- return existing query (item) - -- - -- this will avoid: - -- "dns lookup pool exceeded retries" (second try and subsequent retries) - -- "API disabled in the context of init_worker_by_lua" (first try) - return item, nil, try_list + return item.result, item.err, try_list end + -- If the query is already in progress, we wait for it. + + add_status_to_try_list(try_list, "in progress (sync)") + -- block and wait for the async query to complete local ok, err = item.semaphore:wait(resolve_max_wait) if ok and item.result then @@ -860,6 +857,14 @@ local function syncQuery(qname, r_opts, try_list) return item.result, item.err, try_list end + -- bug checks + if not ok and not item.err then + item.err = err -- only first expired wait() reports error + log(ALERT, PREFIX, "semaphore:wait(", resolve_max_wait, ") failed: ", err, + ", count: ", item.semaphore and item.semaphore:count(), + ", qname: ", qname) + end + err = err or item.err or "unknown" add_status_to_try_list(try_list, "error: "..err) diff --git a/spec/01-unit/21-dns-client/02-client_spec.lua b/spec/01-unit/21-dns-client/02-client_spec.lua index 106e47fde1..6a6715db1c 100644 --- a/spec/01-unit/21-dns-client/02-client_spec.lua +++ b/spec/01-unit/21-dns-client/02-client_spec.lua @@ -582,7 +582,10 @@ describe("[DNS client]", function() } })) query_func = function(self, original_query_func, name, options) - ngx.sleep(5) + -- The first request uses syncQuery not waiting on the + -- aysncQuery timer, so the low-level r:query() could not sleep(5s), + -- it can only sleep(timeout). + ngx.sleep(math.min(timeout, 5)) return nil end local start_time = ngx.now() @@ -1745,9 +1748,12 @@ describe("[DNS client]", function() end) it("timeout while waiting", function() + + local timeout = 500 + local ip = "1.4.2.3" -- basically the local function _synchronized_query assert(client.init({ - timeout = 500, + timeout = timeout, retrans = 1, resolvConf = { -- resolv.conf without `search` and `domain` options @@ -1758,7 +1764,7 @@ describe("[DNS client]", function() -- insert a stub thats waits and returns a fixed record local name = TEST_DOMAIN query_func = function() - local ip = "1.4.2.3" + local ip = ip local entry = { { type = client.TYPE_A, @@ -1770,7 +1776,9 @@ describe("[DNS client]", function() touch = 0, expire = gettime() + 10, } - sleep(0.5) -- wait before we return the results + -- wait before we return the results + -- `+ 2` s ensures that the semaphore:wait() expires + sleep(timeout/1000 + 2) return entry end @@ -1800,10 +1808,12 @@ describe("[DNS client]", function() ngx.thread.wait(coros[i]) -- this wait will resume the scheduled ones end - -- all results are equal, as they all will wait for the first response - for i = 1, 10 do + -- results[1~9] are equal, as they all will wait for the first response + for i = 1, 9 do assert.equal("timeout", results[i]) end + -- results[10] comes from synchronous DNS access of the first request + assert.equal(ip, results[10][1]["address"]) end) end) diff --git a/t/03-dns-client/01-phases.t b/t/03-dns-client/01-phases.t index e12cfab420..7f10aa9f61 100644 --- a/t/03-dns-client/01-phases.t +++ b/t/03-dns-client/01-phases.t @@ -1,6 +1,6 @@ use Test::Nginx::Socket; -plan tests => repeat_each() * (blocks() * 5); +plan tests => repeat_each() * (blocks() * 4 + 1); workers(6); @@ -59,8 +59,7 @@ qq { GET /t --- response_body answers: nil -err: dns client error: 101 empty record received ---- no_error_log +err: nil +--- error_log [error] -dns lookup pool exceeded retries API disabled in the context of init_worker_by_lua diff --git a/t/03-dns-client/02-timer-usage.t b/t/03-dns-client/02-timer-usage.t index 24cc32bddb..73c35ccb1c 100644 --- a/t/03-dns-client/02-timer-usage.t +++ b/t/03-dns-client/02-timer-usage.t @@ -2,76 +2,72 @@ use Test::Nginx::Socket; plan tests => repeat_each() * (blocks() * 5); -workers(6); +workers(1); no_shuffle(); run_tests(); __DATA__ - -=== TEST 1: reuse timers for queries of same name, independent on # of workers ---- http_config eval -qq { - init_worker_by_lua_block { - local client = require("kong.resty.dns.client") - assert(client.init({ - nameservers = { "8.8.8.8" }, - hosts = {}, -- empty tables to parse to prevent defaulting to /etc/hosts - resolvConf = {}, -- and resolv.conf files - order = { "A" }, - })) - local host = "httpbin.org" - local typ = client.TYPE_A - for i = 1, 10 do - client.resolve(host, { qtype = typ }) - end - - local host = "mockbin.org" - for i = 1, 10 do - client.resolve(host, { qtype = typ }) - end - - workers = ngx.worker.count() - timers = ngx.timer.pending_count() - } -} +=== TEST 1: stale result triggers async timer --- config location = /t { access_by_lua_block { + -- init local client = require("kong.resty.dns.client") - assert(client.init()) - local host = "httpbin.org" + assert(client.init({ + nameservers = { "127.0.0.53" }, + hosts = {}, -- empty tables to parse to prevent defaulting to /etc/hosts + resolvConf = {}, -- and resolv.conf files + order = { "A" }, + validTtl = 1, + })) + + local host = "konghq.com" local typ = client.TYPE_A - local answers, err = client.resolve(host, { qtype = typ }) + -- first time + + local answers, err, try_list = client.resolve(host, { qtype = typ }) if not answers then ngx.say("failed to resolve: ", err) + return end - ngx.say("first address name: ", answers[1].name) + ngx.say("first try_list: ", tostring(try_list)) + + -- sleep to wait for dns record to become stale + ngx.sleep(1.5) - host = "mockbin.org" - answers, err = client.resolve(host, { qtype = typ }) + -- second time: use stale result and trigger async timer + answers, err, try_list = client.resolve(host, { qtype = typ }) if not answers then ngx.say("failed to resolve: ", err) + return end - ngx.say("second address name: ", answers[1].name) + ngx.say("second try_list: ", tostring(try_list)) - ngx.say("workers: ", workers) + -- third time: use stale result and find triggered async timer - -- should be 2 timers maximum (1 for each hostname) - ngx.say("timers: ", timers) + answers, err, try_list = client.resolve(host, { qtype = typ }) + if not answers then + ngx.say("failed to resolve: ", err) + return + end + ngx.say("third address name: ", answers[1].name) + ngx.say("third try_list: ", tostring(try_list)) } } --- request GET /t --- response_body -first address name: httpbin.org -second address name: mockbin.org -workers: 6 -timers: 2 +first address name: konghq.com +first try_list: ["(short)konghq.com:1 - cache-miss","konghq.com:1 - cache-miss/querying"] +second address name: konghq.com +second try_list: ["(short)konghq.com:1 - cache-hit/stale","konghq.com:1 - cache-hit/stale/scheduled"] +third address name: konghq.com +third try_list: ["(short)konghq.com:1 - cache-hit/stale","konghq.com:1 - cache-hit/stale/in progress (async)"] --- no_error_log [error] dns lookup pool exceeded retries From 66c5775edefd32213694624ed7a520ea87619219 Mon Sep 17 00:00:00 2001 From: Mayo Date: Thu, 27 Oct 2022 12:28:30 +0800 Subject: [PATCH 03/25] chore(ci): fail on do-not-merge label (#9622) --- .github/workflows/label-check.yml | 13 +++++++++++++ 1 file changed, 13 insertions(+) create mode 100644 .github/workflows/label-check.yml diff --git a/.github/workflows/label-check.yml b/.github/workflows/label-check.yml new file mode 100644 index 0000000000..8af97298d1 --- /dev/null +++ b/.github/workflows/label-check.yml @@ -0,0 +1,13 @@ +name: Pull Request Label Checker +on: + pull_request: + types: [synchronize, opened, reopened, labeled, unlabeled] +jobs: + check-labels: + name: prevent merge labels + runs-on: ubuntu-latest + + steps: + - name: do-not-merge label found + run: exit 1 + if: ${{ contains(github.event.*.labels.*.name, 'do not merge') || contains(github.event.*.labels.*.name, 'do-not-merge') }} From 7e23dada955f0f483f173ef7472d530e210747fa Mon Sep 17 00:00:00 2001 From: Wangchong Zhou Date: Tue, 8 Nov 2022 14:12:39 +0800 Subject: [PATCH 04/25] chore(tests): fix label-check test (#9717) --- .github/workflows/label-check.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/label-check.yml b/.github/workflows/label-check.yml index 8af97298d1..00df7b9de5 100644 --- a/.github/workflows/label-check.yml +++ b/.github/workflows/label-check.yml @@ -9,5 +9,5 @@ jobs: steps: - name: do-not-merge label found - run: exit 1 - if: ${{ contains(github.event.*.labels.*.name, 'do not merge') || contains(github.event.*.labels.*.name, 'do-not-merge') }} + run: echo "do-not-merge label found, this PR will not be merged"; exit 1 + if: ${{ contains(github.event.*.labels.*.name, 'pr/do not merge') || contains(github.event.*.labels.*.name, 'DO NOT MERGE') }} From 717dbf147a96ebec90a1c81f6d4161c10a5403c7 Mon Sep 17 00:00:00 2001 From: Harry Date: Wed, 1 Mar 2023 11:06:47 -0800 Subject: [PATCH 05/25] chore(ci): notify slack on PRs performing schema changes (#10395) With this patch, CI will notify a Kong Inc internal slack channel on every PR that performs a schema change. --- .github/workflows/label-check.yml | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/.github/workflows/label-check.yml b/.github/workflows/label-check.yml index 00df7b9de5..d27c9cf4a3 100644 --- a/.github/workflows/label-check.yml +++ b/.github/workflows/label-check.yml @@ -11,3 +11,11 @@ jobs: - name: do-not-merge label found run: echo "do-not-merge label found, this PR will not be merged"; exit 1 if: ${{ contains(github.event.*.labels.*.name, 'pr/do not merge') || contains(github.event.*.labels.*.name, 'DO NOT MERGE') }} + schema-change-labels: + if: "${{ contains(github.event.*.labels.*.name, 'schema-change-noteworthy') }}" + runs-on: ubuntu-latest + steps: + - name: Schema change label found + uses: rtCamp/action-slack-notify@v2 + env: + SLACK_WEBHOOK: ${{ secrets.SLACK_WEBHOOK_SCHEMA_CHANGE }} From b22e1655fb757ac3e0e21e0106c7849f32860c9a Mon Sep 17 00:00:00 2001 From: Harry Date: Wed, 1 Mar 2023 11:33:17 -0800 Subject: [PATCH 06/25] chore(ci): split schema change job (#10408) It seems that if the do not merge label job is skipped then the second job doesn't run either: https://github.com/Kong/kong/actions/runs/4307151445/jobs/7511859202 This change splits the job into two and narrows down the events on which these jobs are triggered since the only meaninful input are the labels on the PR. --- .github/workflows/label-check.yml | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/.github/workflows/label-check.yml b/.github/workflows/label-check.yml index d27c9cf4a3..87ae8cf9dc 100644 --- a/.github/workflows/label-check.yml +++ b/.github/workflows/label-check.yml @@ -1,7 +1,7 @@ name: Pull Request Label Checker on: pull_request: - types: [synchronize, opened, reopened, labeled, unlabeled] + types: [labeled, unlabeled] jobs: check-labels: name: prevent merge labels @@ -11,11 +11,3 @@ jobs: - name: do-not-merge label found run: echo "do-not-merge label found, this PR will not be merged"; exit 1 if: ${{ contains(github.event.*.labels.*.name, 'pr/do not merge') || contains(github.event.*.labels.*.name, 'DO NOT MERGE') }} - schema-change-labels: - if: "${{ contains(github.event.*.labels.*.name, 'schema-change-noteworthy') }}" - runs-on: ubuntu-latest - steps: - - name: Schema change label found - uses: rtCamp/action-slack-notify@v2 - env: - SLACK_WEBHOOK: ${{ secrets.SLACK_WEBHOOK_SCHEMA_CHANGE }} From bf7db6104abbb5ecf90ab32dbe149e3c7a565d91 Mon Sep 17 00:00:00 2001 From: Datong Sun Date: Mon, 6 Mar 2023 18:34:41 +0800 Subject: [PATCH 07/25] chore(actions): do not allow "backport master" labels (#10430) This is a bad practice which could cause merge conflicts and is against our backport policy. --- .github/workflows/label-check.yml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.github/workflows/label-check.yml b/.github/workflows/label-check.yml index 87ae8cf9dc..2021f6d31d 100644 --- a/.github/workflows/label-check.yml +++ b/.github/workflows/label-check.yml @@ -11,3 +11,6 @@ jobs: - name: do-not-merge label found run: echo "do-not-merge label found, this PR will not be merged"; exit 1 if: ${{ contains(github.event.*.labels.*.name, 'pr/do not merge') || contains(github.event.*.labels.*.name, 'DO NOT MERGE') }} + - name: backport master label found + run: echo "Please do not backport into master, instead, create a PR targeting master and backport from it instead."; exit 1 + if: ${{ contains(github.event.*.labels.*.name, 'backport master') }} From e1778b95a11acc562cf2b3e9bae3bffed8d9f1e2 Mon Sep 17 00:00:00 2001 From: Harry Bagdi Date: Mon, 6 Mar 2023 13:56:32 -0800 Subject: [PATCH 08/25] chore: run label based jobs on other events It seems that Github Actions is not running these jobs even once even though the PRs are labelled at least once. This patch runs these jobs on other related PR activity. --- .github/workflows/label-check.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/label-check.yml b/.github/workflows/label-check.yml index 2021f6d31d..bfa8b67a79 100644 --- a/.github/workflows/label-check.yml +++ b/.github/workflows/label-check.yml @@ -1,7 +1,7 @@ name: Pull Request Label Checker on: pull_request: - types: [labeled, unlabeled] + types: [opened, edited, synchronize, labeled, unlabeled] jobs: check-labels: name: prevent merge labels From 4fee6caa8ebb7dcfcbccea4a3d2923e9a50477e4 Mon Sep 17 00:00:00 2001 From: Xiaoch Date: Tue, 5 Dec 2023 17:51:05 +0800 Subject: [PATCH 09/25] feat(templates): bump `dns_stale_ttl` default to 1 hour (#12087) A longer stale TTL can help reduce the load on less performant/reliable DNS servers, reducing proxy latency and availability impact to Kong's proxy path. KAG-3080 Co-authored-by: Datong Sun --------- Co-authored-by: Datong Sun (cherry picked from commit 533d3f76177596dcb9b5911dec52eb2cfff9fdf7) --- changelog/unreleased/kong/bump_dns_stale_ttl.yml | 3 +++ kong.conf.default | 4 +++- kong/templates/kong_defaults.lua | 2 +- 3 files changed, 7 insertions(+), 2 deletions(-) create mode 100644 changelog/unreleased/kong/bump_dns_stale_ttl.yml diff --git a/changelog/unreleased/kong/bump_dns_stale_ttl.yml b/changelog/unreleased/kong/bump_dns_stale_ttl.yml new file mode 100644 index 0000000000..43ed55cb07 --- /dev/null +++ b/changelog/unreleased/kong/bump_dns_stale_ttl.yml @@ -0,0 +1,3 @@ +message: Bump `dns_stale_ttl` default to 1 hour so stale DNS record can be used for longer time in case of resolver downtime. +type: performance +scope: Configuration diff --git a/kong.conf.default b/kong.conf.default index 73dd51acd3..87e5812afb 100644 --- a/kong.conf.default +++ b/kong.conf.default @@ -1312,7 +1312,7 @@ # property receives a value (in seconds), it # will override the TTL for all records. -#dns_stale_ttl = 4 # Defines, in seconds, how long a record will +#dns_stale_ttl = 3600 # Defines, in seconds, how long a record will # remain in cache past its TTL. This value # will be used while the new DNS record is # fetched in the background. @@ -1320,6 +1320,8 @@ # record until either the refresh query # completes, or the `dns_stale_ttl` number of # seconds have passed. + # This configuration enables Kong to be more + # resilient during resolver downtime. #dns_cache_size = 10000 # Defines the maximum allowed number of # DNS records stored in memory cache. diff --git a/kong/templates/kong_defaults.lua b/kong/templates/kong_defaults.lua index 421da77a5a..a440e202b2 100644 --- a/kong/templates/kong_defaults.lua +++ b/kong/templates/kong_defaults.lua @@ -156,7 +156,7 @@ dns_resolver = NONE dns_hostsfile = /etc/hosts dns_order = LAST,SRV,A,CNAME dns_valid_ttl = NONE -dns_stale_ttl = 4 +dns_stale_ttl = 3600 dns_cache_size = 10000 dns_not_found_ttl = 30 dns_error_ttl = 1 From 5b6d9323518928357ffb91ceefbd11bd6c9d41ea Mon Sep 17 00:00:00 2001 From: Andy Zhang Date: Mon, 4 Dec 2023 11:00:14 +0800 Subject: [PATCH 10/25] Revert "Revert "fix(postgres): close socket actively when timeout happens during query (#11480)"" This reverts commit 396774db39259f3c025e49bc9badbdd9b09013a1. --- CHANGELOG/unreleased/kong/11480.yaml | 7 ++++ kong/db/strategies/postgres/connector.lua | 34 ++++++++++++------ spec/02-integration/03-db/01-db_spec.lua | 44 +++++++++++++++++++++++ 3 files changed, 75 insertions(+), 10 deletions(-) create mode 100644 CHANGELOG/unreleased/kong/11480.yaml diff --git a/CHANGELOG/unreleased/kong/11480.yaml b/CHANGELOG/unreleased/kong/11480.yaml new file mode 100644 index 0000000000..96f3963555 --- /dev/null +++ b/CHANGELOG/unreleased/kong/11480.yaml @@ -0,0 +1,7 @@ +message: Fix a problem that abnormal socket connection will be reused when querying Postgres database. +type: bugfix +scope: Core +prs: + - 11480 +jiras: + - "FTI-5322" diff --git a/kong/db/strategies/postgres/connector.lua b/kong/db/strategies/postgres/connector.lua index e0c8d88607..3d932a5184 100644 --- a/kong/db/strategies/postgres/connector.lua +++ b/kong/db/strategies/postgres/connector.lua @@ -497,6 +497,7 @@ function _mt:query(sql, operation) operation = "write" end + local conn, is_new_conn local res, err, partial, num_queries local ok @@ -505,23 +506,36 @@ function _mt:query(sql, operation) return nil, "error acquiring query semaphore: " .. err end - local conn = self:get_stored_connection(operation) - if conn then - res, err, partial, num_queries = conn:query(sql) - - else - local connection + conn = self:get_stored_connection(operation) + if not conn then local config = operation == "write" and self.config or self.config_ro - connection, err = connect(config) - if not connection then + conn, err = connect(config) + if not conn then self:release_query_semaphore_resource(operation) return nil, err end + is_new_conn = true + end + + res, err, partial, num_queries = conn:query(sql) - res, err, partial, num_queries = connection:query(sql) + -- if err is string then either it is a SQL error + -- or it is a socket error, here we abort connections + -- that encounter errors instead of reusing them, for + -- safety reason + if err and type(err) == "string" then + ngx.log(ngx.DEBUG, "SQL query throw error: ", err, ", close connection") + local _, err = conn:disconnect() + if err then + -- We're at the end of the query - just logging if + -- we cannot cleanup the connection + ngx.log(ngx.ERR, "failed to disconnect: ", err) + end + self.store_connection(nil, operation) - setkeepalive(connection) + elseif is_new_conn then + setkeepalive(conn) end self:release_query_semaphore_resource(operation) diff --git a/spec/02-integration/03-db/01-db_spec.lua b/spec/02-integration/03-db/01-db_spec.lua index b7b46d7e8b..cd11e9bfdf 100644 --- a/spec/02-integration/03-db/01-db_spec.lua +++ b/spec/02-integration/03-db/01-db_spec.lua @@ -447,6 +447,50 @@ for _, strategy in helpers.each_strategy() do end) end) + describe("#testme :query() [#" .. strategy .. "]", function() + lazy_setup(function() + helpers.get_db_utils(strategy, {}) + end) + + postgres_only("establish new connection when error occurred", function() + ngx.IS_CLI = false + + local conf = utils.deep_copy(helpers.test_conf) + conf.pg_ro_host = conf.pg_host + conf.pg_ro_user = conf.pg_user + + local db, err = DB.new(conf, strategy) + + assert.is_nil(err) + assert.is_table(db) + assert(db:init_connector()) + assert(db:connect()) + + local res, err = db.connector:query("SELECT now();") + assert.not_nil(res) + assert.is_nil(err) + + local old_conn = db.connector:get_stored_connection("write") + assert.not_nil(old_conn) + + local res, err = db.connector:query("SELECT * FROM not_exist_table;") + assert.is_nil(res) + assert.not_nil(err) + + local new_conn = db.connector:get_stored_connection("write") + assert.is_nil(new_conn) + + local res, err = db.connector:query("SELECT now();") + assert.not_nil(res) + assert.is_nil(err) + + local res, err = db.connector:query("SELECT now();") + assert.not_nil(res) + assert.is_nil(err) + + assert(db:close()) + end) + end) describe(":setkeepalive() [#" .. strategy .. "]", function() lazy_setup(function() From 3bac87de433dc094cb812a8db4d5e2ad78a6cfc0 Mon Sep 17 00:00:00 2001 From: Andy Zhang Date: Mon, 4 Dec 2023 11:00:14 +0800 Subject: [PATCH 11/25] Revert "Revert "chore(rockspec): bump lua-resty-healthcheck to 1.5.3 (#9756)"" This reverts commit f8c96db1cfa29cfe649f42df587378fc3c62c4fb. --- kong-2.8.4-0.rockspec | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kong-2.8.4-0.rockspec b/kong-2.8.4-0.rockspec index 39e4ac1021..b98811b201 100644 --- a/kong-2.8.4-0.rockspec +++ b/kong-2.8.4-0.rockspec @@ -33,7 +33,7 @@ dependencies = { "luaxxhash >= 1.0", "lua-protobuf == 0.3.3", "lua-resty-worker-events == 1.0.0", - "lua-resty-healthcheck == 1.5.1", + "lua-resty-healthcheck == 1.5.3", "lua-resty-mlcache == 2.5.0", "lua-messagepack == 0.5.2", "lua-resty-openssl == 0.8.7", From 2350ba2a05ab5182a28e4e608bb48a38e641cfbb Mon Sep 17 00:00:00 2001 From: Andy Zhang Date: Mon, 4 Dec 2023 11:00:14 +0800 Subject: [PATCH 12/25] Revert "Revert "refactor(handler): trying to make reconfigure more atomic (#9993)"" This reverts commit e5d86d1e0a26b62402a928f9b876e98ef44e1f12. --- kong/runloop/handler.lua | 124 +++++++++++++++++++++++++----- kong/runloop/plugins_iterator.lua | 6 +- 2 files changed, 110 insertions(+), 20 deletions(-) diff --git a/kong/runloop/handler.lua b/kong/runloop/handler.lua index 41d26d5e13..970534f4c1 100644 --- a/kong/runloop/handler.lua +++ b/kong/runloop/handler.lua @@ -56,6 +56,7 @@ local ERR = ngx.ERR local CRIT = ngx.CRIT local NOTICE = ngx.NOTICE local WARN = ngx.WARN +local INFO = ngx.INFO local DEBUG = ngx.DEBUG local COMMA = byte(",") local SPACE = byte(" ") @@ -78,10 +79,10 @@ local GLOBAL_QUERY_OPTS = { workspace = ngx.null, show_ws_id = true } local get_plugins_iterator, get_updated_plugins_iterator -local build_plugins_iterator, update_plugins_iterator +local build_plugins_iterator, update_plugins_iterator, replace_plugins_iterator local rebuild_plugins_iterator -local get_updated_router, build_router, update_router +local get_updated_router, build_router, update_router, new_router, replace_router local server_header = meta._SERVER_TOKENS local rebuild_router @@ -365,12 +366,31 @@ local function register_events() local current_plugins_hash local current_balancer_hash + + local now = ngx.now + local update_time = ngx.update_time + local worker_id = ngx.worker.id() + + local exiting = ngx.worker.exiting + local function is_exiting() + if not exiting() then + return false + end + log(NOTICE, "declarative config flip was canceled on worker #", worker_id, + ": process exiting") + return true + end + worker_events.register(function(data) - if ngx.worker.exiting() then - log(NOTICE, "declarative flip config canceled: process exiting") + if is_exiting() then return true end + update_time() + local reconfigure_started_at = now() * 1000 + + log(INFO, "declarative config flip was started on worker #", worker_id) + local default_ws local router_hash local plugins_hash @@ -384,8 +404,11 @@ local function register_events() end local ok, err = concurrency.with_coroutine_mutex(FLIP_CONFIG_OPTS, function() + -- below you are encouraged to yield for cooperative threading + local rebuild_balancer = balancer_hash == nil or balancer_hash ~= current_balancer_hash if rebuild_balancer then + log(DEBUG, "stopping previously started health checkers on worker #", worker_id) balancer.stop_healthcheckers(CLEAR_HEALTH_STATUS_DELAY) end @@ -393,30 +416,75 @@ local function register_events() core_cache:flip() kong.default_workspace = default_ws - ngx.ctx.workspace = kong.default_workspace + ngx.ctx.workspace = default_ws + + local router, err + if router_hash == nil or router_hash ~= current_router_hash then + update_time() + local start = now() * 1000 + + router, err = new_router() + if not router then + return nil, err + end + + update_time() + log(INFO, "building a new router took ", now() * 1000 - start, + " ms on worker #", worker_id) + end + local plugins_iterator if plugins_hash == nil or plugins_hash ~= current_plugins_hash then - rebuild_plugins_iterator(PLUGINS_ITERATOR_SYNC_OPTS) - current_plugins_hash = plugins_hash + update_time() + local start = now() * 1000 + + plugins_iterator, err = PluginsIterator.new() + if not plugins_iterator then + return nil, err + end + + update_time() + log(INFO, "building a new plugins iterator took ", now() * 1000 - start, + " ms on worker #", worker_id) end - if router_hash == nil or router_hash ~= current_router_hash then - rebuild_router(ROUTER_SYNC_OPTS) + -- below you are not supposed to yield and this should be fast and atomic + + -- TODO: we should perhaps only purge the configuration related cache. + + log(DEBUG, "flushing caches as part of the config flip on worker #", worker_id) + + if router then + replace_router(router) current_router_hash = router_hash end + if plugins_iterator then + replace_plugins_iterator(plugins_iterator) + current_plugins_hash = plugins_hash + end + if rebuild_balancer then + -- TODO: balancer is a big blob of global state and you cannot easily + -- initialize new balancer and then atomically flip it. + log(DEBUG, "reinitializing balancer with a new configuration on worker #", worker_id) balancer.init() current_balancer_hash = balancer_hash end + update_time() + log(INFO, "declarative config flip took ", now() * 1000 - reconfigure_started_at, + " ms on worker #", worker_id) + declarative.lock() return true end) if not ok then - log(ERR, "config flip failed: ", err) + update_time() + log(ERR, "declarative config flip failed after ", now() * 1000 - reconfigure_started_at, + " ms on worker #", worker_id, ": ", err) end end, "declarative", "flip_config") @@ -584,13 +652,19 @@ do local plugins_iterator + replace_plugins_iterator = function(new_iterator) + plugins_iterator = new_iterator + return true + end + + build_plugins_iterator = function(version) local new_iterator, err = PluginsIterator.new(version) if not new_iterator then return nil, err end - plugins_iterator = new_iterator - return true + + return replace_plugins_iterator(new_iterator) end @@ -759,7 +833,7 @@ do end - build_router = function(version) + new_router = function(version) local db = kong.db local routes, i = {}, 0 @@ -825,12 +899,17 @@ do router_cache_size = cache_size end - local new_router, err = Router.new(routes, router_cache, router_cache_neg) - if not new_router then + local router_new, err = Router.new(routes, router_cache, router_cache_neg) + if not router_new then return nil, "could not create router: " .. err end - router = new_router + return router_new + end + + + replace_router = function(router_new, version) + router = router_new if version then router_version = version @@ -847,6 +926,16 @@ do end + build_router = function(version) + local router_new, err = new_router(version) + if not router_new then + return nil, err + end + + return replace_router(router_new, version) + end + + update_router = function() -- we might not need to rebuild the router (if we were not -- the first request in this process to enter this code path) @@ -1131,9 +1220,8 @@ return { name = "flip-config", timeout = rebuild_timeout, } - end - if strategy == "off" or kong.configuration.worker_consistency == "strict" then + elseif kong.configuration.worker_consistency == "strict" then ROUTER_SYNC_OPTS = { name = "router", timeout = rebuild_timeout, diff --git a/kong/runloop/plugins_iterator.lua b/kong/runloop/plugins_iterator.lua index 6e2341c578..ae174bd4ea 100644 --- a/kong/runloop/plugins_iterator.lua +++ b/kong/runloop/plugins_iterator.lua @@ -426,8 +426,10 @@ end function PluginsIterator.new(version) - if not version then - error("version must be given", 2) + if kong.db.strategy ~= "off" then + if not version then + error("version must be given", 2) + end end loaded_plugins = loaded_plugins or get_loaded_plugins() From 0cff93440acb4d8cdbd67bc5c65e2cc28a6a972e Mon Sep 17 00:00:00 2001 From: Andy Zhang Date: Mon, 4 Dec 2023 11:00:14 +0800 Subject: [PATCH 13/25] Revert "Revert "fix(*): prevent queues from growing without bounds (#10046) (#10058)"" This reverts commit 2eca27acfe3c3b0097273d0f4dc3d227d1371ce8. --- CHANGELOG.md | 12 ++++ kong/conf_loader/init.lua | 2 + kong/plugins/http-log/handler.lua | 2 +- kong/tools/batch_queue.lua | 89 +++++++++++++++++----------- spec/01-unit/27-batch_queue_spec.lua | 33 +++++++++++ 5 files changed, 103 insertions(+), 35 deletions(-) create mode 100644 spec/01-unit/27-batch_queue_spec.lua diff --git a/CHANGELOG.md b/CHANGELOG.md index d9a65a9cdd..62985f807a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -73,6 +73,18 @@ - Fixed a bug where internal redirects (i.e. those produced by the error_page directive) could interfere with worker process handling the request when buffered proxying is being used. +## Unrelease + +### Fixes + +##### Plugins + +- Update the batch queues module so that queues no longer grow without bounds if + their consumers fail to process the entries. Instead, old batches are now dropped + and an error is logged. + [#10046](https://github.com/Kong/kong/pull/10046) + + ## [2.8.3] > Released 2022/11/02 diff --git a/kong/conf_loader/init.lua b/kong/conf_loader/init.lua index 4bb20604c0..7b5670eec4 100644 --- a/kong/conf_loader/init.lua +++ b/kong/conf_loader/init.lua @@ -663,6 +663,8 @@ local CONF_INFERENCES = { untrusted_lua = { enum = { "on", "off", "sandbox" } }, untrusted_lua_sandbox_requires = { typ = "array" }, untrusted_lua_sandbox_environment = { typ = "array" }, + + max_queued_batches = { typ = "number" }, } diff --git a/kong/plugins/http-log/handler.lua b/kong/plugins/http-log/handler.lua index ef82bf5bc1..2c4d130b97 100644 --- a/kong/plugins/http-log/handler.lua +++ b/kong/plugins/http-log/handler.lua @@ -170,7 +170,7 @@ function HttpLogHandler:log(conf) } local err - q, err = BatchQueue.new(process, opts) + q, err = BatchQueue.new("http-log", process, opts) if not q then kong.log.err("could not create queue: ", err) return diff --git a/kong/tools/batch_queue.lua b/kong/tools/batch_queue.lua index 8eaf5ae56e..92322905a2 100644 --- a/kong/tools/batch_queue.lua +++ b/kong/tools/batch_queue.lua @@ -24,12 +24,14 @@ -- end -- -- local q = BatchQueue.new( +-- name, -- name of the queue for identification purposes in the log -- process, -- function used to "process/consume" values from the queue -- { -- Opts table with control values. Defaults shown: --- retry_count = 0, -- number of times to retry processing --- batch_max_size = 1000, -- max number of entries that can be queued before they are queued for processing --- process_delay = 1, -- in seconds, how often the current batch is closed & queued --- flush_timeout = 2, -- in seconds, how much time passes without activity before the current batch is closed and queued +-- retry_count = 0, -- number of times to retry processing +-- batch_max_size = 1000, -- max number of entries that can be queued before they are queued for processing +-- process_delay = 1, -- in seconds, how often the current batch is closed & queued +-- flush_timeout = 2, -- in seconds, how much time passes without activity before the current batch is closed and queued +-- max_queued_batches = 100, -- max number of batches that can be queued before the oldest batch is dropped when a new one is queued -- } -- ) -- @@ -68,11 +70,9 @@ local timer_at = ngx.timer.at local remove = table.remove local type = type local huge = math.huge -local fmt = string.format local min = math.min local now = ngx.now local ERR = ngx.ERR -local ngx_log = ngx.log local DEBUG = ngx.DEBUG local WARN = ngx.WARN @@ -100,10 +100,10 @@ local process local function schedule_flush(self) local ok, err = timer_at(self.flush_timeout/1000, flush, self) if not ok then - ngx_log(ERR, "failed to create delayed flush timer: ", err) + self:log(ERR, "failed to create delayed flush timer: %s", err) return end - --ngx_log(DEBUG, "delayed timer created") + --self:log(DEBUG, "delayed timer created") self.flush_scheduled = true end @@ -113,10 +113,10 @@ end -- @param self Queue -- @param batch: table with `entries` and `retries` counter -- @param delay number: timer delay in seconds -local function schedule_process(self, batch, delay) - local ok, err = timer_at(delay, process, self, batch) +local function schedule_process(self, delay) + local ok, err = timer_at(delay, process, self) if not ok then - ngx_log(ERR, "failed to create process timer: ", err) + self:log(ERR, "failed to create process timer: %s", err) return end self.process_scheduled = true @@ -147,13 +147,13 @@ flush = function(premature, self) if get_now() - self.last_t < self.flush_timeout then -- flushing reported: we had activity - ngx_log(DEBUG, "[flush] queue had activity, delaying flush") + self:log(DEBUG, "[flush] queue had activity, delaying flush") schedule_flush(self) return end -- no activity and timeout reached - ngx_log(DEBUG, "[flush] queue had no activity, flushing triggered by flush_timeout") + self:log(DEBUG, "[flush] queue had no activity, flushing triggered by flush_timeout") self:flush() self.flush_scheduled = false end @@ -165,27 +165,31 @@ end -- @param self Queue -- @param batch: table with `entries` and `retries` counter -- @return nothing -process = function(premature, self, batch) +process = function(premature, self) if premature then return end + local batch = self.batch_queue[1] + if not batch then + self:log(WARN, "queue process called but no batches to be processed") + return + end + local next_retry_delay local ok, err = self.process(batch.entries) if ok then -- success, reset retry delays self.retry_delay = 1 next_retry_delay = 0 - + remove(self.batch_queue, 1) else batch.retries = batch.retries + 1 if batch.retries < self.retry_count then - ngx_log(WARN, "failed to process entries: ", tostring(err)) - -- queue our data for processing again, at the end of the queue - self.batch_queue[#self.batch_queue + 1] = batch + self:log(WARN, "failed to process entries: %s", tostring(err)) else - ngx_log(ERR, fmt("entry batch was already tried %d times, dropping it", - batch.retries)) + self:log(ERR, "entry batch was already tried %d times, dropping it", batch.retries) + remove(self.batch_queue, 1) end self.retry_delay = self.retry_delay + 1 @@ -193,10 +197,8 @@ process = function(premature, self, batch) end if #self.batch_queue > 0 then -- more to process? - ngx_log(DEBUG, fmt("processing oldest data, %d still queued", - #self.batch_queue - 1)) - local oldest_batch = remove(self.batch_queue, 1) - schedule_process(self, oldest_batch, next_retry_delay) + self:log(DEBUG, "processing oldest data, %d still queued", #self.batch_queue) + schedule_process(self, next_retry_delay) return end @@ -218,13 +220,15 @@ end -- @param opts table, optionally including -- `retry_count`, `flush_timeout`, `batch_max_size` and `process_delay` -- @return table: a Queue object. -function Queue.new(process, opts) +function Queue.new(name, process, opts) opts = opts or {} + assert(type(name) == "string", + "arg #1 (name) must be a string") assert(type(process) == "function", - "arg #1 (process) must be a function") + "arg #2 (process) must be a function") assert(type(opts) == "table", - "arg #2 (opts) must be a table") + "arg #3 (opts) must be a table") assert(opts.retry_count == nil or type(opts.retry_count) == "number", "retry_count must be a number") assert(opts.flush_timeout == nil or type(opts.flush_timeout) == "number", @@ -233,8 +237,11 @@ function Queue.new(process, opts) "batch_max_size must be a number") assert(opts.process_delay == nil or type(opts.batch_max_size) == "number", "process_delay must be a number") + assert(opts.max_queued_batches == nil or type(opts.max_queued_batches) == "number", + "max_queued_batches must be a number") local self = { + name = name, process = process, -- flush timeout in milliseconds @@ -242,6 +249,7 @@ function Queue.new(process, opts) retry_count = opts.retry_count or 0, batch_max_size = opts.batch_max_size or 1000, process_delay = opts.process_delay or 1, + max_queued_batches = opts.max_queued_batches or (kong.configuration and kong.configuration.max_queued_batches) or 100, retry_delay = 1, @@ -258,6 +266,17 @@ function Queue.new(process, opts) end +------------------------------------------------------------------------------- +-- Log a message that includes the name of the queue for identification purposes +-- @param self Queue +-- @param level: log level +-- @param formatstring: format string, will get the queue name and ": " prepended +-- @param ...: formatter arguments +function Queue:log(level, formatstring, ...) + return ngx.log(level, string.format(self.name .. ": " .. formatstring, unpack({...}))) +end + + ------------------------------------------------------------------------------- -- Add data to the queue -- @param entry the value included in the queue. It can be any Lua value besides nil. @@ -269,8 +288,8 @@ function Queue:add(entry) if self.batch_max_size == 1 then -- no batching - local batch = { entries = { entry }, retries = 0 } - schedule_process(self, batch, 0) + self.batch_queue = { { entries = { entry }, retries = 0 } } + schedule_process(self, 0) return true end @@ -304,8 +323,12 @@ function Queue:flush() -- Queue the current batch, if it has at least 1 entry if current_batch_size > 0 then - ngx_log(DEBUG, "queueing batch for processing (", current_batch_size, " entries)") + self:log(DEBUG, "queueing batch for processing (%d entries)", current_batch_size) + while #self.batch_queue >= self.max_queued_batches do + self:log(ERR, "exceeded max_queued_batches (%d), dropping oldest", self.max_queued_batches) + remove(self.batch_queue, 1) + end self.batch_queue[#self.batch_queue + 1] = self.current_batch self.current_batch = { entries = {}, retries = 0 } end @@ -314,10 +337,8 @@ function Queue:flush() -- in the future. This will keep calling itself in the future until -- the queue is empty if #self.batch_queue > 0 and not self.process_scheduled then - ngx_log(DEBUG, fmt("processing oldest entry, %d still queued", - #self.batch_queue - 1)) - local oldest_batch = remove(self.batch_queue, 1) - schedule_process(self, oldest_batch, self.process_delay) + self:log(DEBUG, "processing oldest entry, %d still queued", #self.batch_queue) + schedule_process(self, self.process_delay) end return true diff --git a/spec/01-unit/27-batch_queue_spec.lua b/spec/01-unit/27-batch_queue_spec.lua new file mode 100644 index 0000000000..38b1edcb98 --- /dev/null +++ b/spec/01-unit/27-batch_queue_spec.lua @@ -0,0 +1,33 @@ + +local BatchQueue = require "kong.tools.batch_queue" +local helpers = require "spec.helpers" + +describe("batch queue", function() + + it("observes the limit parameter", function() + local count = 0 + local last + local function process(entries) + count = count + #entries + last = entries[#entries] + return true + end + + local q = BatchQueue.new("batch-queue-unit-test", process, {max_queued_batches=2, batch_max_size=100, process_delay=0}) + + q:add(1) + q:flush() + q:add(2) + q:flush() + q:add(3) + q:flush() + + helpers.wait_until(function() + ngx.sleep(.1) + return #q.batch_queue == 0 + end, 1) + + assert.equal(2, count) + assert.equal(3, last) + end) +end) From 747e4d211cbcb0099bf9d693ab90ce1cbe35a7c7 Mon Sep 17 00:00:00 2001 From: Andy Zhang Date: Mon, 4 Dec 2023 11:00:14 +0800 Subject: [PATCH 14/25] Revert "Revert "docs(*): document new max_queued_batches parameter (#10071)"" This reverts commit 2935e7f1c90d72af8c2813d62648ee3fbe8c6f0c. --- kong.conf.default | 10 ++++++++++ kong/templates/kong_defaults.lua | 2 ++ 2 files changed, 12 insertions(+) diff --git a/kong.conf.default b/kong.conf.default index 87e5812afb..9831258f6f 100644 --- a/kong.conf.default +++ b/kong.conf.default @@ -1562,3 +1562,13 @@ # **Warning**: Certain variables, when made # available, may create opportunities to # escape the sandbox. + +#max_queued_batches = 100 # Maximum number of batches to keep on an internal + # plugin queue before dropping old batches. This is + # meant as a global, last-resort control to prevent + # queues from consuming infinite memory. When batches + # are being dropped, an error message + # "exceeded max_queued_batches (%d), dropping oldest" + # will be logged. The error message will also include + # a string that identifies the plugin causing the + # problem. diff --git a/kong/templates/kong_defaults.lua b/kong/templates/kong_defaults.lua index a440e202b2..459318eaae 100644 --- a/kong/templates/kong_defaults.lua +++ b/kong/templates/kong_defaults.lua @@ -179,4 +179,6 @@ pluginserver_names = NONE untrusted_lua = sandbox untrusted_lua_sandbox_requires = untrusted_lua_sandbox_environment = + +max_queued_batches = 100 ]] From 59079c62cd3548aae5d6dba3152588de2e78c9c6 Mon Sep 17 00:00:00 2001 From: Andy Zhang Date: Mon, 4 Dec 2023 11:00:14 +0800 Subject: [PATCH 15/25] Revert "Revert "Revert "docs(*): document new max_queued_batches parameter (#10071)""" This reverts commit e08b93e63e711e3c8ccc53fe48f0218c733c60fd. --- kong.conf.default | 10 ---------- kong/templates/kong_defaults.lua | 2 -- 2 files changed, 12 deletions(-) diff --git a/kong.conf.default b/kong.conf.default index 9831258f6f..87e5812afb 100644 --- a/kong.conf.default +++ b/kong.conf.default @@ -1562,13 +1562,3 @@ # **Warning**: Certain variables, when made # available, may create opportunities to # escape the sandbox. - -#max_queued_batches = 100 # Maximum number of batches to keep on an internal - # plugin queue before dropping old batches. This is - # meant as a global, last-resort control to prevent - # queues from consuming infinite memory. When batches - # are being dropped, an error message - # "exceeded max_queued_batches (%d), dropping oldest" - # will be logged. The error message will also include - # a string that identifies the plugin causing the - # problem. diff --git a/kong/templates/kong_defaults.lua b/kong/templates/kong_defaults.lua index 459318eaae..a440e202b2 100644 --- a/kong/templates/kong_defaults.lua +++ b/kong/templates/kong_defaults.lua @@ -179,6 +179,4 @@ pluginserver_names = NONE untrusted_lua = sandbox untrusted_lua_sandbox_requires = untrusted_lua_sandbox_environment = - -max_queued_batches = 100 ]] From e30496e6ac0fbff48697ddbb5561b5cd083e4601 Mon Sep 17 00:00:00 2001 From: Andy Zhang Date: Mon, 4 Dec 2023 11:00:14 +0800 Subject: [PATCH 16/25] Revert "Revert "Revert "fix(*): prevent queues from growing without bounds (#10046) (#10058)""" This reverts commit bb8b84c101042d97692408e58b655eda53788338. --- CHANGELOG.md | 12 ---- kong/conf_loader/init.lua | 2 - kong/plugins/http-log/handler.lua | 2 +- kong/tools/batch_queue.lua | 89 +++++++++++----------------- spec/01-unit/27-batch_queue_spec.lua | 33 ----------- 5 files changed, 35 insertions(+), 103 deletions(-) delete mode 100644 spec/01-unit/27-batch_queue_spec.lua diff --git a/CHANGELOG.md b/CHANGELOG.md index 62985f807a..d9a65a9cdd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -73,18 +73,6 @@ - Fixed a bug where internal redirects (i.e. those produced by the error_page directive) could interfere with worker process handling the request when buffered proxying is being used. -## Unrelease - -### Fixes - -##### Plugins - -- Update the batch queues module so that queues no longer grow without bounds if - their consumers fail to process the entries. Instead, old batches are now dropped - and an error is logged. - [#10046](https://github.com/Kong/kong/pull/10046) - - ## [2.8.3] > Released 2022/11/02 diff --git a/kong/conf_loader/init.lua b/kong/conf_loader/init.lua index 7b5670eec4..4bb20604c0 100644 --- a/kong/conf_loader/init.lua +++ b/kong/conf_loader/init.lua @@ -663,8 +663,6 @@ local CONF_INFERENCES = { untrusted_lua = { enum = { "on", "off", "sandbox" } }, untrusted_lua_sandbox_requires = { typ = "array" }, untrusted_lua_sandbox_environment = { typ = "array" }, - - max_queued_batches = { typ = "number" }, } diff --git a/kong/plugins/http-log/handler.lua b/kong/plugins/http-log/handler.lua index 2c4d130b97..ef82bf5bc1 100644 --- a/kong/plugins/http-log/handler.lua +++ b/kong/plugins/http-log/handler.lua @@ -170,7 +170,7 @@ function HttpLogHandler:log(conf) } local err - q, err = BatchQueue.new("http-log", process, opts) + q, err = BatchQueue.new(process, opts) if not q then kong.log.err("could not create queue: ", err) return diff --git a/kong/tools/batch_queue.lua b/kong/tools/batch_queue.lua index 92322905a2..8eaf5ae56e 100644 --- a/kong/tools/batch_queue.lua +++ b/kong/tools/batch_queue.lua @@ -24,14 +24,12 @@ -- end -- -- local q = BatchQueue.new( --- name, -- name of the queue for identification purposes in the log -- process, -- function used to "process/consume" values from the queue -- { -- Opts table with control values. Defaults shown: --- retry_count = 0, -- number of times to retry processing --- batch_max_size = 1000, -- max number of entries that can be queued before they are queued for processing --- process_delay = 1, -- in seconds, how often the current batch is closed & queued --- flush_timeout = 2, -- in seconds, how much time passes without activity before the current batch is closed and queued --- max_queued_batches = 100, -- max number of batches that can be queued before the oldest batch is dropped when a new one is queued +-- retry_count = 0, -- number of times to retry processing +-- batch_max_size = 1000, -- max number of entries that can be queued before they are queued for processing +-- process_delay = 1, -- in seconds, how often the current batch is closed & queued +-- flush_timeout = 2, -- in seconds, how much time passes without activity before the current batch is closed and queued -- } -- ) -- @@ -70,9 +68,11 @@ local timer_at = ngx.timer.at local remove = table.remove local type = type local huge = math.huge +local fmt = string.format local min = math.min local now = ngx.now local ERR = ngx.ERR +local ngx_log = ngx.log local DEBUG = ngx.DEBUG local WARN = ngx.WARN @@ -100,10 +100,10 @@ local process local function schedule_flush(self) local ok, err = timer_at(self.flush_timeout/1000, flush, self) if not ok then - self:log(ERR, "failed to create delayed flush timer: %s", err) + ngx_log(ERR, "failed to create delayed flush timer: ", err) return end - --self:log(DEBUG, "delayed timer created") + --ngx_log(DEBUG, "delayed timer created") self.flush_scheduled = true end @@ -113,10 +113,10 @@ end -- @param self Queue -- @param batch: table with `entries` and `retries` counter -- @param delay number: timer delay in seconds -local function schedule_process(self, delay) - local ok, err = timer_at(delay, process, self) +local function schedule_process(self, batch, delay) + local ok, err = timer_at(delay, process, self, batch) if not ok then - self:log(ERR, "failed to create process timer: %s", err) + ngx_log(ERR, "failed to create process timer: ", err) return end self.process_scheduled = true @@ -147,13 +147,13 @@ flush = function(premature, self) if get_now() - self.last_t < self.flush_timeout then -- flushing reported: we had activity - self:log(DEBUG, "[flush] queue had activity, delaying flush") + ngx_log(DEBUG, "[flush] queue had activity, delaying flush") schedule_flush(self) return end -- no activity and timeout reached - self:log(DEBUG, "[flush] queue had no activity, flushing triggered by flush_timeout") + ngx_log(DEBUG, "[flush] queue had no activity, flushing triggered by flush_timeout") self:flush() self.flush_scheduled = false end @@ -165,31 +165,27 @@ end -- @param self Queue -- @param batch: table with `entries` and `retries` counter -- @return nothing -process = function(premature, self) +process = function(premature, self, batch) if premature then return end - local batch = self.batch_queue[1] - if not batch then - self:log(WARN, "queue process called but no batches to be processed") - return - end - local next_retry_delay local ok, err = self.process(batch.entries) if ok then -- success, reset retry delays self.retry_delay = 1 next_retry_delay = 0 - remove(self.batch_queue, 1) + else batch.retries = batch.retries + 1 if batch.retries < self.retry_count then - self:log(WARN, "failed to process entries: %s", tostring(err)) + ngx_log(WARN, "failed to process entries: ", tostring(err)) + -- queue our data for processing again, at the end of the queue + self.batch_queue[#self.batch_queue + 1] = batch else - self:log(ERR, "entry batch was already tried %d times, dropping it", batch.retries) - remove(self.batch_queue, 1) + ngx_log(ERR, fmt("entry batch was already tried %d times, dropping it", + batch.retries)) end self.retry_delay = self.retry_delay + 1 @@ -197,8 +193,10 @@ process = function(premature, self) end if #self.batch_queue > 0 then -- more to process? - self:log(DEBUG, "processing oldest data, %d still queued", #self.batch_queue) - schedule_process(self, next_retry_delay) + ngx_log(DEBUG, fmt("processing oldest data, %d still queued", + #self.batch_queue - 1)) + local oldest_batch = remove(self.batch_queue, 1) + schedule_process(self, oldest_batch, next_retry_delay) return end @@ -220,15 +218,13 @@ end -- @param opts table, optionally including -- `retry_count`, `flush_timeout`, `batch_max_size` and `process_delay` -- @return table: a Queue object. -function Queue.new(name, process, opts) +function Queue.new(process, opts) opts = opts or {} - assert(type(name) == "string", - "arg #1 (name) must be a string") assert(type(process) == "function", - "arg #2 (process) must be a function") + "arg #1 (process) must be a function") assert(type(opts) == "table", - "arg #3 (opts) must be a table") + "arg #2 (opts) must be a table") assert(opts.retry_count == nil or type(opts.retry_count) == "number", "retry_count must be a number") assert(opts.flush_timeout == nil or type(opts.flush_timeout) == "number", @@ -237,11 +233,8 @@ function Queue.new(name, process, opts) "batch_max_size must be a number") assert(opts.process_delay == nil or type(opts.batch_max_size) == "number", "process_delay must be a number") - assert(opts.max_queued_batches == nil or type(opts.max_queued_batches) == "number", - "max_queued_batches must be a number") local self = { - name = name, process = process, -- flush timeout in milliseconds @@ -249,7 +242,6 @@ function Queue.new(name, process, opts) retry_count = opts.retry_count or 0, batch_max_size = opts.batch_max_size or 1000, process_delay = opts.process_delay or 1, - max_queued_batches = opts.max_queued_batches or (kong.configuration and kong.configuration.max_queued_batches) or 100, retry_delay = 1, @@ -266,17 +258,6 @@ function Queue.new(name, process, opts) end -------------------------------------------------------------------------------- --- Log a message that includes the name of the queue for identification purposes --- @param self Queue --- @param level: log level --- @param formatstring: format string, will get the queue name and ": " prepended --- @param ...: formatter arguments -function Queue:log(level, formatstring, ...) - return ngx.log(level, string.format(self.name .. ": " .. formatstring, unpack({...}))) -end - - ------------------------------------------------------------------------------- -- Add data to the queue -- @param entry the value included in the queue. It can be any Lua value besides nil. @@ -288,8 +269,8 @@ function Queue:add(entry) if self.batch_max_size == 1 then -- no batching - self.batch_queue = { { entries = { entry }, retries = 0 } } - schedule_process(self, 0) + local batch = { entries = { entry }, retries = 0 } + schedule_process(self, batch, 0) return true end @@ -323,12 +304,8 @@ function Queue:flush() -- Queue the current batch, if it has at least 1 entry if current_batch_size > 0 then - self:log(DEBUG, "queueing batch for processing (%d entries)", current_batch_size) + ngx_log(DEBUG, "queueing batch for processing (", current_batch_size, " entries)") - while #self.batch_queue >= self.max_queued_batches do - self:log(ERR, "exceeded max_queued_batches (%d), dropping oldest", self.max_queued_batches) - remove(self.batch_queue, 1) - end self.batch_queue[#self.batch_queue + 1] = self.current_batch self.current_batch = { entries = {}, retries = 0 } end @@ -337,8 +314,10 @@ function Queue:flush() -- in the future. This will keep calling itself in the future until -- the queue is empty if #self.batch_queue > 0 and not self.process_scheduled then - self:log(DEBUG, "processing oldest entry, %d still queued", #self.batch_queue) - schedule_process(self, self.process_delay) + ngx_log(DEBUG, fmt("processing oldest entry, %d still queued", + #self.batch_queue - 1)) + local oldest_batch = remove(self.batch_queue, 1) + schedule_process(self, oldest_batch, self.process_delay) end return true diff --git a/spec/01-unit/27-batch_queue_spec.lua b/spec/01-unit/27-batch_queue_spec.lua deleted file mode 100644 index 38b1edcb98..0000000000 --- a/spec/01-unit/27-batch_queue_spec.lua +++ /dev/null @@ -1,33 +0,0 @@ - -local BatchQueue = require "kong.tools.batch_queue" -local helpers = require "spec.helpers" - -describe("batch queue", function() - - it("observes the limit parameter", function() - local count = 0 - local last - local function process(entries) - count = count + #entries - last = entries[#entries] - return true - end - - local q = BatchQueue.new("batch-queue-unit-test", process, {max_queued_batches=2, batch_max_size=100, process_delay=0}) - - q:add(1) - q:flush() - q:add(2) - q:flush() - q:add(3) - q:flush() - - helpers.wait_until(function() - ngx.sleep(.1) - return #q.batch_queue == 0 - end, 1) - - assert.equal(2, count) - assert.equal(3, last) - end) -end) From 369cc34dec4f6c68d6d5e1c971e2a50510e18cda Mon Sep 17 00:00:00 2001 From: Andy Zhang Date: Mon, 4 Dec 2023 11:00:14 +0800 Subject: [PATCH 17/25] Revert "Revert "fix(*): prevent queues from growing without bounds (#10046) (#10254)"" This reverts commit 53e2147b59a8df348aa3f4e9deccd1a58e32608b. --- CHANGELOG.md | 4 ++ kong.conf.default | 11 ++++ kong/conf_loader/init.lua | 2 + kong/plugins/http-log/handler.lua | 2 +- kong/templates/kong_defaults.lua | 2 + kong/tools/batch_queue.lua | 89 +++++++++++++++++----------- spec/01-unit/27-batch_queue_spec.lua | 30 ++++++++++ 7 files changed, 105 insertions(+), 35 deletions(-) create mode 100644 spec/01-unit/27-batch_queue_spec.lua diff --git a/CHANGELOG.md b/CHANGELOG.md index d9a65a9cdd..345eae607a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -83,6 +83,10 @@ - **HTTP Log**: fix internal error during validating the schema if http_endpoint contains userinfo but headers is empty [#9574](https://github.com/Kong/kong/pull/9574) +- Update the batch queues module so that queues no longer grow without bounds if + their consumers fail to process the entries. Instead, old batches are now dropped + and an error is logged. + [#10247](https://github.com/Kong/kong/pull/10247) ##### CLI diff --git a/kong.conf.default b/kong.conf.default index 87e5812afb..e1d79288cf 100644 --- a/kong.conf.default +++ b/kong.conf.default @@ -1562,3 +1562,14 @@ # **Warning**: Certain variables, when made # available, may create opportunities to # escape the sandbox. + +#max_queued_batches = 100 # Maximum number of batches to keep on an internal + # plugin queue before dropping old batches. This is + # meant as a global, last-resort control to prevent + # queues from consuming infinite memory. When batches + # are being dropped, an error message + # "exceeded max_queued_batches (%d), dropping oldest" + # will be logged. The error message will also include + # a string that identifies the plugin causing the + # problem. Queues are used by the http-log, statsd, + # opentelemetry and datadog plugins. diff --git a/kong/conf_loader/init.lua b/kong/conf_loader/init.lua index 4bb20604c0..7b5670eec4 100644 --- a/kong/conf_loader/init.lua +++ b/kong/conf_loader/init.lua @@ -663,6 +663,8 @@ local CONF_INFERENCES = { untrusted_lua = { enum = { "on", "off", "sandbox" } }, untrusted_lua_sandbox_requires = { typ = "array" }, untrusted_lua_sandbox_environment = { typ = "array" }, + + max_queued_batches = { typ = "number" }, } diff --git a/kong/plugins/http-log/handler.lua b/kong/plugins/http-log/handler.lua index ef82bf5bc1..2c4d130b97 100644 --- a/kong/plugins/http-log/handler.lua +++ b/kong/plugins/http-log/handler.lua @@ -170,7 +170,7 @@ function HttpLogHandler:log(conf) } local err - q, err = BatchQueue.new(process, opts) + q, err = BatchQueue.new("http-log", process, opts) if not q then kong.log.err("could not create queue: ", err) return diff --git a/kong/templates/kong_defaults.lua b/kong/templates/kong_defaults.lua index a440e202b2..459318eaae 100644 --- a/kong/templates/kong_defaults.lua +++ b/kong/templates/kong_defaults.lua @@ -179,4 +179,6 @@ pluginserver_names = NONE untrusted_lua = sandbox untrusted_lua_sandbox_requires = untrusted_lua_sandbox_environment = + +max_queued_batches = 100 ]] diff --git a/kong/tools/batch_queue.lua b/kong/tools/batch_queue.lua index 8eaf5ae56e..92322905a2 100644 --- a/kong/tools/batch_queue.lua +++ b/kong/tools/batch_queue.lua @@ -24,12 +24,14 @@ -- end -- -- local q = BatchQueue.new( +-- name, -- name of the queue for identification purposes in the log -- process, -- function used to "process/consume" values from the queue -- { -- Opts table with control values. Defaults shown: --- retry_count = 0, -- number of times to retry processing --- batch_max_size = 1000, -- max number of entries that can be queued before they are queued for processing --- process_delay = 1, -- in seconds, how often the current batch is closed & queued --- flush_timeout = 2, -- in seconds, how much time passes without activity before the current batch is closed and queued +-- retry_count = 0, -- number of times to retry processing +-- batch_max_size = 1000, -- max number of entries that can be queued before they are queued for processing +-- process_delay = 1, -- in seconds, how often the current batch is closed & queued +-- flush_timeout = 2, -- in seconds, how much time passes without activity before the current batch is closed and queued +-- max_queued_batches = 100, -- max number of batches that can be queued before the oldest batch is dropped when a new one is queued -- } -- ) -- @@ -68,11 +70,9 @@ local timer_at = ngx.timer.at local remove = table.remove local type = type local huge = math.huge -local fmt = string.format local min = math.min local now = ngx.now local ERR = ngx.ERR -local ngx_log = ngx.log local DEBUG = ngx.DEBUG local WARN = ngx.WARN @@ -100,10 +100,10 @@ local process local function schedule_flush(self) local ok, err = timer_at(self.flush_timeout/1000, flush, self) if not ok then - ngx_log(ERR, "failed to create delayed flush timer: ", err) + self:log(ERR, "failed to create delayed flush timer: %s", err) return end - --ngx_log(DEBUG, "delayed timer created") + --self:log(DEBUG, "delayed timer created") self.flush_scheduled = true end @@ -113,10 +113,10 @@ end -- @param self Queue -- @param batch: table with `entries` and `retries` counter -- @param delay number: timer delay in seconds -local function schedule_process(self, batch, delay) - local ok, err = timer_at(delay, process, self, batch) +local function schedule_process(self, delay) + local ok, err = timer_at(delay, process, self) if not ok then - ngx_log(ERR, "failed to create process timer: ", err) + self:log(ERR, "failed to create process timer: %s", err) return end self.process_scheduled = true @@ -147,13 +147,13 @@ flush = function(premature, self) if get_now() - self.last_t < self.flush_timeout then -- flushing reported: we had activity - ngx_log(DEBUG, "[flush] queue had activity, delaying flush") + self:log(DEBUG, "[flush] queue had activity, delaying flush") schedule_flush(self) return end -- no activity and timeout reached - ngx_log(DEBUG, "[flush] queue had no activity, flushing triggered by flush_timeout") + self:log(DEBUG, "[flush] queue had no activity, flushing triggered by flush_timeout") self:flush() self.flush_scheduled = false end @@ -165,27 +165,31 @@ end -- @param self Queue -- @param batch: table with `entries` and `retries` counter -- @return nothing -process = function(premature, self, batch) +process = function(premature, self) if premature then return end + local batch = self.batch_queue[1] + if not batch then + self:log(WARN, "queue process called but no batches to be processed") + return + end + local next_retry_delay local ok, err = self.process(batch.entries) if ok then -- success, reset retry delays self.retry_delay = 1 next_retry_delay = 0 - + remove(self.batch_queue, 1) else batch.retries = batch.retries + 1 if batch.retries < self.retry_count then - ngx_log(WARN, "failed to process entries: ", tostring(err)) - -- queue our data for processing again, at the end of the queue - self.batch_queue[#self.batch_queue + 1] = batch + self:log(WARN, "failed to process entries: %s", tostring(err)) else - ngx_log(ERR, fmt("entry batch was already tried %d times, dropping it", - batch.retries)) + self:log(ERR, "entry batch was already tried %d times, dropping it", batch.retries) + remove(self.batch_queue, 1) end self.retry_delay = self.retry_delay + 1 @@ -193,10 +197,8 @@ process = function(premature, self, batch) end if #self.batch_queue > 0 then -- more to process? - ngx_log(DEBUG, fmt("processing oldest data, %d still queued", - #self.batch_queue - 1)) - local oldest_batch = remove(self.batch_queue, 1) - schedule_process(self, oldest_batch, next_retry_delay) + self:log(DEBUG, "processing oldest data, %d still queued", #self.batch_queue) + schedule_process(self, next_retry_delay) return end @@ -218,13 +220,15 @@ end -- @param opts table, optionally including -- `retry_count`, `flush_timeout`, `batch_max_size` and `process_delay` -- @return table: a Queue object. -function Queue.new(process, opts) +function Queue.new(name, process, opts) opts = opts or {} + assert(type(name) == "string", + "arg #1 (name) must be a string") assert(type(process) == "function", - "arg #1 (process) must be a function") + "arg #2 (process) must be a function") assert(type(opts) == "table", - "arg #2 (opts) must be a table") + "arg #3 (opts) must be a table") assert(opts.retry_count == nil or type(opts.retry_count) == "number", "retry_count must be a number") assert(opts.flush_timeout == nil or type(opts.flush_timeout) == "number", @@ -233,8 +237,11 @@ function Queue.new(process, opts) "batch_max_size must be a number") assert(opts.process_delay == nil or type(opts.batch_max_size) == "number", "process_delay must be a number") + assert(opts.max_queued_batches == nil or type(opts.max_queued_batches) == "number", + "max_queued_batches must be a number") local self = { + name = name, process = process, -- flush timeout in milliseconds @@ -242,6 +249,7 @@ function Queue.new(process, opts) retry_count = opts.retry_count or 0, batch_max_size = opts.batch_max_size or 1000, process_delay = opts.process_delay or 1, + max_queued_batches = opts.max_queued_batches or (kong.configuration and kong.configuration.max_queued_batches) or 100, retry_delay = 1, @@ -258,6 +266,17 @@ function Queue.new(process, opts) end +------------------------------------------------------------------------------- +-- Log a message that includes the name of the queue for identification purposes +-- @param self Queue +-- @param level: log level +-- @param formatstring: format string, will get the queue name and ": " prepended +-- @param ...: formatter arguments +function Queue:log(level, formatstring, ...) + return ngx.log(level, string.format(self.name .. ": " .. formatstring, unpack({...}))) +end + + ------------------------------------------------------------------------------- -- Add data to the queue -- @param entry the value included in the queue. It can be any Lua value besides nil. @@ -269,8 +288,8 @@ function Queue:add(entry) if self.batch_max_size == 1 then -- no batching - local batch = { entries = { entry }, retries = 0 } - schedule_process(self, batch, 0) + self.batch_queue = { { entries = { entry }, retries = 0 } } + schedule_process(self, 0) return true end @@ -304,8 +323,12 @@ function Queue:flush() -- Queue the current batch, if it has at least 1 entry if current_batch_size > 0 then - ngx_log(DEBUG, "queueing batch for processing (", current_batch_size, " entries)") + self:log(DEBUG, "queueing batch for processing (%d entries)", current_batch_size) + while #self.batch_queue >= self.max_queued_batches do + self:log(ERR, "exceeded max_queued_batches (%d), dropping oldest", self.max_queued_batches) + remove(self.batch_queue, 1) + end self.batch_queue[#self.batch_queue + 1] = self.current_batch self.current_batch = { entries = {}, retries = 0 } end @@ -314,10 +337,8 @@ function Queue:flush() -- in the future. This will keep calling itself in the future until -- the queue is empty if #self.batch_queue > 0 and not self.process_scheduled then - ngx_log(DEBUG, fmt("processing oldest entry, %d still queued", - #self.batch_queue - 1)) - local oldest_batch = remove(self.batch_queue, 1) - schedule_process(self, oldest_batch, self.process_delay) + self:log(DEBUG, "processing oldest entry, %d still queued", #self.batch_queue) + schedule_process(self, self.process_delay) end return true diff --git a/spec/01-unit/27-batch_queue_spec.lua b/spec/01-unit/27-batch_queue_spec.lua new file mode 100644 index 0000000000..d4b0bef4c3 --- /dev/null +++ b/spec/01-unit/27-batch_queue_spec.lua @@ -0,0 +1,30 @@ + +local BatchQueue = require "kong.tools.batch_queue" + +describe("batch queue", function() + + it("observes the limit parameter", function() + local count = 0 + local last + local function process(entries) + count = count + #entries + last = entries[#entries] + return true + end + + local q = BatchQueue.new("batch-queue-unit-test", process, {max_queued_batches=2, batch_max_size=100, process_delay=0}) + + q:add(1) + q:flush() + q:add(2) + q:flush() + q:add(3) + q:flush() + + -- run scheduled timer tasks + ngx.sleep(1) + + assert.equal(2, count) + assert.equal(3, last) + end) +end) From 4c6a3f7dc3144e2702c582a61c7cf0552f2bed31 Mon Sep 17 00:00:00 2001 From: Andy Zhang Date: Mon, 4 Dec 2023 11:02:33 +0800 Subject: [PATCH 18/25] Revert "Revert "fix(runloop) do not reset `*:version` to `init` when worker respawns,"" This reverts commit 8a4a2d9b30611106fb872765d4bcd285d0e83d89. --- kong/init.lua | 12 ++++---- kong/runloop/handler.lua | 24 ++++++++++----- .../05-proxy/02-router_spec.lua | 29 ++++++++++++++++++- 3 files changed, 50 insertions(+), 15 deletions(-) diff --git a/kong/init.lua b/kong/init.lua index 98f10dfc5a..4f1b477c98 100644 --- a/kong/init.lua +++ b/kong/init.lua @@ -564,6 +564,12 @@ function Kong.init() if config.role ~= "control_plane" then assert(runloop.build_router("init")) + + ok, err = runloop.set_init_versions_in_cache() + if not ok then + error("error setting initial versions for router and plugins iterator in cache: " .. + tostring(err)) + end end end @@ -638,12 +644,6 @@ function Kong.init_worker() end kong.core_cache = core_cache - ok, err = runloop.set_init_versions_in_cache() - if not ok then - stash_init_worker_error(err) -- 'err' fully formatted - return - end - -- LEGACY singletons.cache = cache singletons.core_cache = core_cache diff --git a/kong/runloop/handler.lua b/kong/runloop/handler.lua index 970534f4c1..6608bd1513 100644 --- a/kong/runloop/handler.lua +++ b/kong/runloop/handler.lua @@ -13,6 +13,7 @@ local declarative = require "kong.db.declarative" local workspaces = require "kong.workspaces" local lrucache = require "resty.lrucache" local request_id = require "kong.tracing.request_id" +local marshall = require "kong.cache.marshall" local PluginsIterator = require "kong.runloop.plugins_iterator" @@ -1138,17 +1139,24 @@ end local function set_init_versions_in_cache() - if kong.configuration.role ~= "control_pane" then - local ok, err = kong.core_cache:safe_set("router:version", "init") - if not ok then - return nil, "failed to set router version in cache: " .. tostring(err) - end + -- because of worker events, kong.cache can not be initialized in `init` phase + -- therefore, we need to use the shdict API directly to set the initial value + assert(kong.configuration.role ~= "control_plane") + assert(ngx.get_phase() == "init") + local core_cache_shm = ngx.shared["kong_core_db_cache"] + + -- ttl = forever is okay as "*:versions" keys are always manually invalidated + local marshalled_value = marshall("init", 0, 0) + + -- see kong.cache.safe_set function + local ok, err = core_cache_shm:safe_set("kong_core_db_cacherouter:version", marshalled_value) + if not ok then + return nil, "failed to set initial router version in cache: " .. tostring(err) end - local ok, err = kong.core_cache:safe_set("plugins_iterator:version", "init") + ok, err = core_cache_shm:safe_set("kong_core_db_cacheplugins_iterator:version", marshalled_value) if not ok then - return nil, "failed to set plugins iterator version in cache: " .. - tostring(err) + return nil, "failed to set initial plugins iterator version in cache: " .. tostring(err) end return true diff --git a/spec/02-integration/05-proxy/02-router_spec.lua b/spec/02-integration/05-proxy/02-router_spec.lua index 331755c837..58609797cf 100644 --- a/spec/02-integration/05-proxy/02-router_spec.lua +++ b/spec/02-integration/05-proxy/02-router_spec.lua @@ -2196,7 +2196,7 @@ for _, strategy in helpers.each_strategy() do end) end) - describe("Router at startup [#" .. strategy .. "]" , function() + describe("Router [#" .. strategy .. ", flavor = " .. flavor .. "] at startup" , function() local proxy_client local route @@ -2260,6 +2260,33 @@ for _, strategy in helpers.each_strategy() do end end) + it("#db worker respawn correctly rebuilds router", function() + local admin_client = helpers.admin_client() + + local res = assert(admin_client:post("/routes", { + headers = { ["Content-Type"] = "application/json" }, + body = { + paths = { "/foo" }, + }, + })) + assert.res_status(201, res) + admin_client:close() + + assert(helpers.signal_workers(nil, "-TERM")) + + proxy_client:close() + proxy_client = helpers.proxy_client() + + local res = assert(proxy_client:send { + method = "GET", + path = "/foo", + headers = { ["kong-debug"] = 1 }, + }) + + local body = assert.response(res).has_status(503) + local json = cjson.decode(body) + assert.equal("no Service found with those values", json.message) + end) end) end end From 5f1fdbcbfc436b3aa0c1d29dbffb0db695c564b6 Mon Sep 17 00:00:00 2001 From: Andy Zhang Date: Mon, 4 Dec 2023 11:02:33 +0800 Subject: [PATCH 19/25] Revert "Revert "fix(*): do not use stale router data if workers are respawned"" This reverts commit 43116815403c64336a1af6e8e2b2c192a368e99e. --- kong/init.lua | 19 ++++++++++++++----- kong/runloop/handler.lua | 2 +- 2 files changed, 15 insertions(+), 6 deletions(-) diff --git a/kong/init.lua b/kong/init.lua index 4f1b477c98..978ee3c527 100644 --- a/kong/init.lua +++ b/kong/init.lua @@ -661,22 +661,31 @@ function Kong.init_worker() return end - if kong.configuration.role ~= "control_plane" then + local is_not_control_plane = kong.configuration.role ~= "control_plane" + if is_not_control_plane then ok, err = execute_cache_warmup(kong.configuration) if not ok then ngx_log(ngx_ERR, "failed to warm up the DB cache: " .. err) end end - runloop.init_worker.before() - - -- run plugins init_worker context ok, err = runloop.update_plugins_iterator() if not ok then stash_init_worker_error("failed to build the plugins iterator: " .. err) return end + if is_not_control_plane then + ok, err = runloop.update_router() + if not ok then + stash_init_worker_error("failed to build the router: " .. err) + return + end + end + + runloop.init_worker.before() + + -- run plugins init_worker context local plugins_iterator = runloop.get_plugins_iterator() local errors = execute_init_worker_plugins_iterator(plugins_iterator, ctx) if errors then @@ -689,7 +698,7 @@ function Kong.init_worker() runloop.init_worker.after() - if kong.configuration.role ~= "control_plane" then + if is_not_control_plane and ngx.worker.id() == 0 then plugin_servers.start() end diff --git a/kong/runloop/handler.lua b/kong/runloop/handler.lua index 6608bd1513..8eb31483ce 100644 --- a/kong/runloop/handler.lua +++ b/kong/runloop/handler.lua @@ -1167,7 +1167,7 @@ end -- before or after the plugins return { build_router = build_router, - + update_router = update_router, build_plugins_iterator = build_plugins_iterator, update_plugins_iterator = update_plugins_iterator, get_plugins_iterator = get_plugins_iterator, From 06bae64c4040e746ece4b11cc516ff5280927a78 Mon Sep 17 00:00:00 2001 From: Andy Zhang Date: Mon, 4 Dec 2023 11:02:33 +0800 Subject: [PATCH 20/25] Revert "Revert "tests(router): add worker_consistency=eventual to test case"" This reverts commit 20504197bbfd638250daf5a79d5e90c8472c7089. --- .../05-proxy/02-router_spec.lua | 155 +++++++++--------- 1 file changed, 80 insertions(+), 75 deletions(-) diff --git a/spec/02-integration/05-proxy/02-router_spec.lua b/spec/02-integration/05-proxy/02-router_spec.lua index 58609797cf..09a54d4f58 100644 --- a/spec/02-integration/05-proxy/02-router_spec.lua +++ b/spec/02-integration/05-proxy/02-router_spec.lua @@ -2196,97 +2196,102 @@ for _, strategy in helpers.each_strategy() do end) end) - describe("Router [#" .. strategy .. ", flavor = " .. flavor .. "] at startup" , function() - local proxy_client - local route + for _, consistency in ipairs({ "strict", "eventual" }) do + describe("Router [#" .. strategy .. ", consistency = " .. consistency .. "] at startup" , function() + local proxy_client + local route - lazy_setup(function() - local bp = helpers.get_db_utils(strategy, { - "routes", - "services", - "plugins", - }, { - "enable-buffering", - }) + lazy_setup(function() + local bp = helpers.get_db_utils(strategy, { + "routes", + "services", + "plugins", + }, { + "enable-buffering", + }) - route = bp.routes:insert({ - methods = { "GET" }, - protocols = { "http" }, - strip_path = false, - }) + route = bp.routes:insert({ + methods = { "GET" }, + protocols = { "http" }, + strip_path = false, + }) - if enable_buffering then - bp.plugins:insert { - name = "enable-buffering", - protocols = { "http", "https", "grpc", "grpcs" }, - } - end + if enable_buffering then + bp.plugins:insert { + name = "enable-buffering", + protocols = { "http", "https", "grpc", "grpcs" }, + } + end - assert(helpers.start_kong({ - database = strategy, - nginx_worker_processes = 4, - plugins = "bundled,enable-buffering", - nginx_conf = "spec/fixtures/custom_nginx.template", - })) - end) + assert(helpers.start_kong({ + worker_consistency = consistency, + database = strategy, + nginx_worker_processes = 4, + plugins = "bundled,enable-buffering", + nginx_conf = "spec/fixtures/custom_nginx.template", + })) + end) - lazy_teardown(function() - helpers.stop_kong() - end) + lazy_teardown(function() + helpers.stop_kong() + end) - before_each(function() - proxy_client = helpers.proxy_client() - end) + before_each(function() + proxy_client = helpers.proxy_client() + end) - after_each(function() - if proxy_client then - proxy_client:close() - end - end) + after_each(function() + if proxy_client then + proxy_client:close() + end + end) - it("uses configuration from datastore or declarative_config", function() - for _ = 1, 1000 do - proxy_client = helpers.proxy_client() - local res = assert(proxy_client:send { - method = "GET", - path = "/get", - headers = { ["kong-debug"] = 1 }, - }) + it("uses configuration from datastore or declarative_config", function() + for _ = 1, 1000 do + proxy_client = helpers.proxy_client() + local res = assert(proxy_client:send { + method = "GET", + path = "/get", + headers = { ["kong-debug"] = 1 }, + }) - assert.response(res).has_status(200) + assert.response(res).has_status(200) - assert.equal(route.service.name, res.headers["kong-service-name"]) - proxy_client:close() - end - end) + assert.equal(route.service.name, res.headers["kong-service-name"]) + proxy_client:close() + end + end) - it("#db worker respawn correctly rebuilds router", function() - local admin_client = helpers.admin_client() + it("#db worker respawn correctly rebuilds router", function() + local admin_client = helpers.admin_client() - local res = assert(admin_client:post("/routes", { - headers = { ["Content-Type"] = "application/json" }, - body = { - paths = { "/foo" }, - }, - })) - assert.res_status(201, res) - admin_client:close() + local res = assert(admin_client:post("/routes", { + headers = { ["Content-Type"] = "application/json" }, + body = { + paths = { "/foo" }, + }, + })) + assert.res_status(201, res) + admin_client:close() - assert(helpers.signal_workers(nil, "-TERM")) + local workers_before = helpers.get_kong_workers() + assert(helpers.signal_workers(nil, "-TERM")) + helpers.wait_until_no_common_workers(workers_before, 1) -- respawned - proxy_client:close() - proxy_client = helpers.proxy_client() + proxy_client:close() + proxy_client = helpers.proxy_client() - local res = assert(proxy_client:send { - method = "GET", - path = "/foo", - headers = { ["kong-debug"] = 1 }, - }) + local res = assert(proxy_client:send { + method = "GET", + path = "/foo", + headers = { ["kong-debug"] = 1 }, + }) - local body = assert.response(res).has_status(503) - local json = cjson.decode(body) - assert.equal("no Service found with those values", json.message) + local body = assert.response(res).has_status(503) + local json = cjson.decode(body) + assert.equal("no Service found with those values", json.message) + end) end) - end) + end end end From cf1a91b136ee43245b21fd987f55eeef5a2e550f Mon Sep 17 00:00:00 2001 From: Andy Zhang Date: Mon, 4 Dec 2023 11:02:33 +0800 Subject: [PATCH 21/25] Revert "Revert "tests(helpers): pickup reload helper from #8670"" This reverts commit 0c1e2ebf98cd6cff726cbfc91cf9cfeb630d9afc. --- spec/02-integration/02-cmd/03-reload_spec.lua | 99 +++---------------- spec/helpers.lua | 75 ++++++++++++++ 2 files changed, 89 insertions(+), 85 deletions(-) diff --git a/spec/02-integration/02-cmd/03-reload_spec.lua b/spec/02-integration/02-cmd/03-reload_spec.lua index 884ed2fddb..ec802663db 100644 --- a/spec/02-integration/02-cmd/03-reload_spec.lua +++ b/spec/02-integration/02-cmd/03-reload_spec.lua @@ -2,31 +2,6 @@ local helpers = require "spec.helpers" local cjson = require "cjson" -local function get_kong_workers() - local workers - helpers.wait_until(function() - local pok, admin_client = pcall(helpers.admin_client) - if not pok then - return false - end - local res = admin_client:send { - method = "GET", - path = "/", - } - if not res or res.status ~= 200 then - return false - end - local body = assert.res_status(200, res) - local json = cjson.decode(body) - - admin_client:close() - workers = json.pids.workers - return true - end, 10) - return workers -end - - local function assert_wait_call(fn, ...) local res local args = { ... } @@ -38,52 +13,6 @@ local function assert_wait_call(fn, ...) end -local function wait_until_no_common_workers(workers, expected_total, strategy) - if strategy == "cassandra" then - ngx.sleep(0.5) - end - helpers.wait_until(function() - local pok, admin_client = pcall(helpers.admin_client) - if not pok then - return false - end - local res = assert(admin_client:send { - method = "GET", - path = "/", - }) - assert.res_status(200, res) - local json = cjson.decode(assert.res_status(200, res)) - admin_client:close() - - local new_workers = json.pids.workers - local total = 0 - local common = 0 - if new_workers then - for _, v in ipairs(new_workers) do - total = total + 1 - for _, v_old in ipairs(workers) do - if v == v_old then - common = common + 1 - break - end - end - end - end - return common == 0 and total == (expected_total or total) - end) -end - - -local function kong_reload(strategy, ...) - local workers = get_kong_workers() - local ok, err = helpers.kong_exec(...) - if ok then - wait_until_no_common_workers(workers, 1, strategy) - end - return ok, err -end - - for _, strategy in helpers.each_strategy() do describe("kong reload #" .. strategy, function() @@ -104,7 +33,7 @@ describe("kong reload #" .. strategy, function() local nginx_pid = assert_wait_call(helpers.file.read, helpers.test_conf.nginx_pid) -- kong_exec uses test conf too, so same prefix - assert(kong_reload(strategy, "reload --prefix " .. helpers.test_conf.prefix)) + assert(helpers.reload_kong(strategy, "reload --prefix " .. helpers.test_conf.prefix)) local nginx_pid_after = assert_wait_call(helpers.file.read, helpers.test_conf.nginx_pid) @@ -121,14 +50,14 @@ describe("kong reload #" .. strategy, function() local client = helpers.http_client("0.0.0.0", 9002, 5000) client:close() - local workers = get_kong_workers() + local workers = helpers.get_kong_workers() local nginx_pid = assert(helpers.file.read(helpers.test_conf.nginx_pid), "no nginx master PID") assert(helpers.kong_exec("reload --conf spec/fixtures/reload.conf")) - wait_until_no_common_workers(workers, 1) + helpers.wait_until_no_common_workers(workers, 1) -- same master PID assert.equal(nginx_pid, helpers.file.read(helpers.test_conf.nginx_pid)) @@ -147,7 +76,7 @@ describe("kong reload #" .. strategy, function() local client = helpers.http_client("0.0.0.0", 9002, 5000) client:close() - local workers = get_kong_workers() + local workers = helpers.get_kong_workers() local nginx_pid = assert(helpers.file.read(helpers.test_conf.nginx_pid), "no nginx master PID") @@ -156,7 +85,7 @@ describe("kong reload #" .. strategy, function() proxy_listen = "0.0.0.0:9000" })) - wait_until_no_common_workers(workers, 1) + helpers.wait_until_no_common_workers(workers, 1) -- same master PID assert.equal(nginx_pid, helpers.file.read(helpers.test_conf.nginx_pid)) @@ -171,7 +100,7 @@ describe("kong reload #" .. strategy, function() proxy_listen = "0.0.0.0:9002" }, nil, true)) - local workers = get_kong_workers() + local workers = helpers.get_kong_workers() -- http_client errors out if cannot connect local client = helpers.http_client("0.0.0.0", 9002, 5000) @@ -181,7 +110,7 @@ describe("kong reload #" .. strategy, function() .. " --nginx-conf spec/fixtures/custom_nginx.template")) - wait_until_no_common_workers(workers, 1) + helpers.wait_until_no_common_workers(workers, 1) -- new server client = helpers.http_client(helpers.mock_upstream_host, @@ -213,7 +142,7 @@ describe("kong reload #" .. strategy, function() local pids_1 = json.pids client:close() - assert(kong_reload(strategy, "reload --prefix " .. helpers.test_conf.prefix)) + assert(helpers.reload_kong(strategy, "reload --prefix " .. helpers.test_conf.prefix)) client = helpers.admin_client() local res = assert(client:get("/")) @@ -250,7 +179,7 @@ describe("kong reload #" .. strategy, function() local node_id_1 = json.node_id client:close() - assert(kong_reload(strategy, "reload --prefix " .. helpers.test_conf.prefix)) + assert(helpers.reload_kong(strategy, "reload --prefix " .. helpers.test_conf.prefix)) client = helpers.admin_client() local res = assert(client:get("/")) @@ -326,7 +255,7 @@ describe("kong reload #" .. strategy, function() - example.test ]], yaml_file) - assert(kong_reload(strategy, "reload --prefix " .. helpers.test_conf.prefix, { + assert(helpers.reload_kong(strategy, "reload --prefix " .. helpers.test_conf.prefix, { declarative_config = yaml_file, })) @@ -396,7 +325,7 @@ describe("kong reload #" .. strategy, function() return true end) - assert(kong_reload(strategy, "reload --prefix " .. helpers.test_conf.prefix)) + assert(helpers.reload_kong(strategy, "reload --prefix " .. helpers.test_conf.prefix)) admin_client = assert(helpers.admin_client()) local res = assert(admin_client:send { @@ -493,7 +422,7 @@ describe("kong reload #" .. strategy, function() return true end) - assert(kong_reload(strategy, "reload --prefix " .. helpers.test_conf.prefix)) + assert(helpers.reload_kong(strategy, "reload --prefix " .. helpers.test_conf.prefix)) admin_client = assert(helpers.admin_client()) local res = assert(admin_client:send { @@ -584,7 +513,7 @@ describe("kong reload #" .. strategy, function() weight: 100 ]], yaml_file) - assert(kong_reload(strategy, "reload --prefix " .. helpers.test_conf.prefix, { + assert(helpers.reload_kong(strategy, "reload --prefix " .. helpers.test_conf.prefix, { declarative_config = yaml_file, })) @@ -717,7 +646,7 @@ describe("key-auth plugin invalidation on dbless reload #off", function() keyauth_credentials: - key: my-new-key ]], yaml_file) - assert(kong_reload("off", "reload --prefix " .. helpers.test_conf.prefix, { + assert(helpers.reload_kong("off", "reload --prefix " .. helpers.test_conf.prefix, { declarative_config = yaml_file, })) diff --git a/spec/helpers.lua b/spec/helpers.lua index 644b40df32..ad4c30fda4 100644 --- a/spec/helpers.lua +++ b/spec/helpers.lua @@ -2712,6 +2712,78 @@ local function restart_kong(env, tables, fixtures) end +local function wait_until_no_common_workers(workers, expected_total, strategy) + if strategy == "cassandra" then + ngx.sleep(0.5) + end + wait_until(function() + local pok, admin_client = pcall(admin_client) + if not pok then + return false + end + local res = assert(admin_client:send { + method = "GET", + path = "/", + }) + luassert.res_status(200, res) + local json = cjson.decode(luassert.res_status(200, res)) + admin_client:close() + + local new_workers = json.pids.workers + local total = 0 + local common = 0 + if new_workers then + for _, v in ipairs(new_workers) do + total = total + 1 + for _, v_old in ipairs(workers) do + if v == v_old then + common = common + 1 + break + end + end + end + end + return common == 0 and total == (expected_total or total) + end, 30) +end + + +local function get_kong_workers() + local workers + wait_until(function() + local pok, admin_client = pcall(admin_client) + if not pok then + return false + end + local res = admin_client:send { + method = "GET", + path = "/", + } + if not res or res.status ~= 200 then + return false + end + local body = luassert.res_status(200, res) + local json = cjson.decode(body) + + admin_client:close() + workers = json.pids.workers + return true + end, 10) + return workers +end + + +--- Reload Kong and wait all workers are restarted. +local function reload_kong(strategy, ...) + local workers = get_kong_workers() + local ok, err = kong_exec(...) + if ok then + wait_until_no_common_workers(workers, 1, strategy) + end + return ok, err +end + + --- Simulate a Hybrid mode DP and connect to the CP specified in `opts`. -- @function clustering_client -- @param opts Options to use, the `host`, `port`, `cert` and `cert_key` fields @@ -2888,6 +2960,9 @@ end start_kong = start_kong, stop_kong = stop_kong, restart_kong = restart_kong, + reload_kong = reload_kong, + get_kong_workers = get_kong_workers, + wait_until_no_common_workers = wait_until_no_common_workers, start_grpc_target = start_grpc_target, stop_grpc_target = stop_grpc_target, From e4ed6f3cd5c0c2920144bfedf476b080fcd7f887 Mon Sep 17 00:00:00 2001 From: Andy Zhang Date: Mon, 4 Dec 2023 11:02:33 +0800 Subject: [PATCH 22/25] Revert "Revert "tests(*): modify tests that need to access mockbin.com from integration test (#10893)"" This reverts commit 67eb5ff195d7dbb8249ed3a2f757dab01337ece0. --- spec/02-integration/05-proxy/06-ssl_spec.lua | 52 +++++++++++++++----- 1 file changed, 41 insertions(+), 11 deletions(-) diff --git a/spec/02-integration/05-proxy/06-ssl_spec.lua b/spec/02-integration/05-proxy/06-ssl_spec.lua index c6833d8f3a..28385246c9 100644 --- a/spec/02-integration/05-proxy/06-ssl_spec.lua +++ b/spec/02-integration/05-proxy/06-ssl_spec.lua @@ -1,6 +1,7 @@ local ssl_fixtures = require "spec.fixtures.ssl" local helpers = require "spec.helpers" local cjson = require "cjson" +local fmt = string.format local function get_cert(server_name) @@ -12,6 +13,32 @@ local function get_cert(server_name) return stdout end +local mock_tls_server_port = helpers.get_available_port() + +local fixtures = { + dns_mock = helpers.dns_mock.new(), + http_mock = { + test_upstream_tls_server = fmt([[ + server { + server_name example2.com; + listen %s ssl; + + ssl_certificate ../spec/fixtures/mtls_certs/example2.com.crt; + ssl_certificate_key ../spec/fixtures/mtls_certs/example2.com.key; + + location = / { + echo 'it works'; + } + } + ]], mock_tls_server_port) + }, +} + +fixtures.dns_mock:A { + name = "example2.com", + address = "127.0.0.1", +} + for _, strategy in helpers.each_strategy() do describe("SSL [#" .. strategy .. "]", function() local proxy_client @@ -126,16 +153,18 @@ for _, strategy in helpers.each_strategy() do preserve_host = false, } - local service_mockbin = assert(bp.services:insert { - name = "service-mockbin", - url = "https://mockbin.com/request", + local service_example2 = assert(bp.services:insert { + name = "service-example2", + protocol = "https", + host = "example2.com", + port = mock_tls_server_port, }) assert(bp.routes:insert { protocols = { "http" }, - hosts = { "mockbin.com" }, + hosts = { "example2.com" }, paths = { "/" }, - service = service_mockbin, + service = service_example2, }) assert(bp.routes:insert { @@ -204,13 +233,14 @@ for _, strategy in helpers.each_strategy() do -- /wildcard tests - assert(helpers.start_kong { + assert(helpers.start_kong ({ database = strategy, nginx_conf = "spec/fixtures/custom_nginx.template", trusted_ips = "127.0.0.1", nginx_http_proxy_ssl_verify = "on", nginx_http_proxy_ssl_trusted_certificate = "../spec/fixtures/kong_spec.crt", - }) + nginx_http_proxy_ssl_verify_depth = "5", + }, nil, nil, fixtures)) proxy_client = helpers.proxy_client() https_client = helpers.proxy_ssl_client() @@ -228,13 +258,13 @@ for _, strategy in helpers.each_strategy() do method = "GET", path = "/", headers = { - Host = "mockbin.com", + Host = "example2.com", }, }) local body = assert.res_status(502, res) assert.matches("An invalid response was received from the upstream server", body) assert.logfile().has.line("upstream SSL certificate verify error: " .. - "(20:unable to get local issuer certificate) " .. + "(21:unable to verify the first certificate) " .. "while SSL handshaking to upstream", true, 2) end) @@ -540,7 +570,7 @@ for _, strategy in helpers.each_strategy() do snis = { "example.com" }, service = service, } - + bp.routes:insert { protocols = { "tls" }, snis = { "foobar.example.com." }, @@ -564,7 +594,7 @@ for _, strategy in helpers.each_strategy() do stream_listen = "127.0.0.1:9020 ssl" }) - + end) lazy_teardown(function() From 4dea806367627f78f7c137b2d9f1255632dacb5d Mon Sep 17 00:00:00 2001 From: Andy Zhang Date: Mon, 4 Dec 2023 11:02:33 +0800 Subject: [PATCH 23/25] Revert "Revert "tests(*): fix flaky hybrid mode ocsp tests (#10912)"" This reverts commit 5f4d9649ebef22329086aed8eae95b283d9edc17. --- .../09-hybrid_mode/05-ocsp_spec.lua | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/spec/02-integration/09-hybrid_mode/05-ocsp_spec.lua b/spec/02-integration/09-hybrid_mode/05-ocsp_spec.lua index 3f6275c150..d2dab2aaef 100644 --- a/spec/02-integration/09-hybrid_mode/05-ocsp_spec.lua +++ b/spec/02-integration/09-hybrid_mode/05-ocsp_spec.lua @@ -41,6 +41,8 @@ for _, strategy in helpers.each_strategy() do cluster_ca_cert = "spec/fixtures/ocsp_certs/ca.crt", })) + set_ocsp_status("good") + assert(helpers.start_kong({ role = "data_plane", database = "off", @@ -54,8 +56,6 @@ for _, strategy in helpers.each_strategy() do cluster_server_name = "kong_clustering", cluster_ca_cert = "spec/fixtures/ocsp_certs/ca.crt", })) - - set_ocsp_status("good") end) lazy_teardown(function() @@ -110,6 +110,8 @@ for _, strategy in helpers.each_strategy() do cluster_ca_cert = "spec/fixtures/ocsp_certs/ca.crt", })) + set_ocsp_status("revoked") + assert(helpers.start_kong({ role = "data_plane", database = "off", @@ -123,8 +125,6 @@ for _, strategy in helpers.each_strategy() do cluster_server_name = "kong_clustering", cluster_ca_cert = "spec/fixtures/ocsp_certs/ca.crt", })) - - set_ocsp_status("revoked") end) lazy_teardown(function() @@ -177,6 +177,8 @@ for _, strategy in helpers.each_strategy() do cluster_ca_cert = "spec/fixtures/ocsp_certs/ca.crt", })) + set_ocsp_status("error") + assert(helpers.start_kong({ role = "data_plane", database = "off", @@ -190,8 +192,6 @@ for _, strategy in helpers.each_strategy() do cluster_server_name = "kong_clustering", cluster_ca_cert = "spec/fixtures/ocsp_certs/ca.crt", })) - - set_ocsp_status("error") end) lazy_teardown(function() @@ -247,6 +247,8 @@ for _, strategy in helpers.each_strategy() do cluster_ca_cert = "spec/fixtures/ocsp_certs/ca.crt", })) + set_ocsp_status("revoked") + assert(helpers.start_kong({ role = "data_plane", database = "off", @@ -260,8 +262,6 @@ for _, strategy in helpers.each_strategy() do cluster_server_name = "kong_clustering", cluster_ca_cert = "spec/fixtures/ocsp_certs/ca.crt", })) - - set_ocsp_status("revoked") end) lazy_teardown(function() @@ -318,6 +318,8 @@ for _, strategy in helpers.each_strategy() do cluster_ca_cert = "spec/fixtures/ocsp_certs/ca.crt", })) + set_ocsp_status("revoked") + assert(helpers.start_kong({ role = "data_plane", database = "off", @@ -331,8 +333,6 @@ for _, strategy in helpers.each_strategy() do cluster_server_name = "kong_clustering", cluster_ca_cert = "spec/fixtures/ocsp_certs/ca.crt", })) - - set_ocsp_status("revoked") end) lazy_teardown(function() @@ -385,6 +385,8 @@ for _, strategy in helpers.each_strategy() do cluster_ca_cert = "spec/fixtures/ocsp_certs/ca.crt", })) + set_ocsp_status("error") + assert(helpers.start_kong({ role = "data_plane", database = "off", @@ -398,8 +400,6 @@ for _, strategy in helpers.each_strategy() do cluster_server_name = "kong_clustering", cluster_ca_cert = "spec/fixtures/ocsp_certs/ca.crt", })) - - set_ocsp_status("error") end) lazy_teardown(function() From 8bb559c98c674f46204ff937ba597ad454e47d28 Mon Sep 17 00:00:00 2001 From: Andy Zhang Date: Mon, 4 Dec 2023 11:02:33 +0800 Subject: [PATCH 24/25] Revert "Revert "test(cmd): fix flaky `can receive USR1` test (#10903)"" This reverts commit 9a49049d77f8e71719b15404bdfccc110de6f202. --- spec/02-integration/02-cmd/13-signals_spec.lua | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/spec/02-integration/02-cmd/13-signals_spec.lua b/spec/02-integration/02-cmd/13-signals_spec.lua index 9f9c9e38c1..2128748902 100644 --- a/spec/02-integration/02-cmd/13-signals_spec.lua +++ b/spec/02-integration/02-cmd/13-signals_spec.lua @@ -15,10 +15,12 @@ describe("signals", function() assert(helpers.start_kong()) helpers.signal(nil, "-USR1") - local conf = helpers.get_running_conf() - local _, code = helpers.execute("grep -F '(SIGUSR1) received from' " .. - conf.nginx_err_logs, true) - assert.equal(0, code) + helpers.wait_until(function() + local conf = helpers.get_running_conf() + local _, code = helpers.execute("grep -F '(SIGUSR1) received from' " .. + conf.nginx_err_logs, true) + return 0 == code, "SIGUSR1 not received" + end) end) it("can receive USR2 #flaky", function() From 0733e36bf6b87bbafbf2b9355d7b4b8a6fce8b87 Mon Sep 17 00:00:00 2001 From: Andy Zhang Date: Mon, 4 Dec 2023 11:02:33 +0800 Subject: [PATCH 25/25] Revert "Revert "tests: re-enable and fix flaky tests in spec/02-integration/03-db/07-tags_spec.lua (#10715) (#11118)"" This reverts commit 2d5eca6dd246314be400d3f569abec5c4c485171. --- spec/02-integration/03-db/07-tags_spec.lua | 60 ++++++++++++---------- spec/helpers.lua | 21 ++++++++ 2 files changed, 54 insertions(+), 27 deletions(-) diff --git a/spec/02-integration/03-db/07-tags_spec.lua b/spec/02-integration/03-db/07-tags_spec.lua index c120d740dd..767097d21c 100644 --- a/spec/02-integration/03-db/07-tags_spec.lua +++ b/spec/02-integration/03-db/07-tags_spec.lua @@ -219,23 +219,6 @@ for _, strategy in helpers.each_strategy() do describe("page() by tag", function() local single_tag_count = 5 local total_entities_count = 100 - for i = 1, total_entities_count do - local service = { - host = "anotherexample-" .. i .. ".org", - name = "service-paging" .. i, - tags = { "paging", "team_paging_" .. fmod(i, 5), "irrelevant_tag" } - } - local row, err, err_t = bp.services:insert(service) - assert.is_nil(err) - assert.is_nil(err_t) - assert.same(service.tags, row.tags) - end - - if strategy == "off" then - local entities = assert(bp.done()) - local dc = assert(declarative_config.load(helpers.test_conf.loaded_plugins)) - declarative.load_into_cache(dc:flatten(entities)) - end local scenarios = { -- { tags[], expected_result_count } { @@ -262,6 +245,26 @@ for _, strategy in helpers.each_strategy() do local paging_size = { total_entities_count/single_tag_count, } + lazy_setup(function() + for i = 1, total_entities_count do + local service = { + host = "anotherexample-" .. i .. ".org", + name = "service-paging" .. i, + tags = { "paging", "team_paging_" .. fmod(i, 5), "irrelevant_tag" } + } + local row, err, err_t = bp.services:insert(service) + assert.is_nil(err) + assert.is_nil(err_t) + assert.same(service.tags, row.tags) + end + + if strategy == "off" then + local entities = assert(bp.done()) + local dc = assert(declarative_config.load(helpers.test_conf.loaded_plugins)) + declarative.load_into_cache(dc:flatten(entities)) + end + end) + for s_idx, scenario in ipairs(scenarios) do local opts, expected_count = unpack(scenario) @@ -342,20 +345,23 @@ for _, strategy in helpers.each_strategy() do assert.stub(ngx.log).was_not_called() end) - it("#flaky and returns as normal if page size is large enough", function() + it("and returns as normal if page size is large enough", function() stub(ngx, "log") - local rows, err, err_t, offset = db.services:page(enough_page_size, nil, - { tags = { "paging", "team_paging_1" }, tags_cond = 'and' }) - assert(is_valid_page(rows, err, err_t)) - assert.equal(enough_page_size, #rows) - if offset then - rows, err, err_t, offset = db.services:page(enough_page_size, offset, + -- cassandra is a bit slow on CI, so we need to wait a bit + helpers.pwait_until(function() + local rows, err, err_t, offset = db.services:page(enough_page_size, nil, { tags = { "paging", "team_paging_1" }, tags_cond = 'and' }) assert(is_valid_page(rows, err, err_t)) - assert.equal(0, #rows) - assert.is_nil(offset) - end + assert.equal(enough_page_size, #rows) + if offset then + rows, err, err_t, offset = db.services:page(enough_page_size, offset, + { tags = { "paging", "team_paging_1" }, tags_cond = 'and' }) + assert(is_valid_page(rows, err, err_t)) + assert.equal(0, #rows) + assert.is_nil(offset) + end + end) assert.stub(ngx.log).was_not_called() end) diff --git a/spec/helpers.lua b/spec/helpers.lua index ad4c30fda4..50b12314df 100644 --- a/spec/helpers.lua +++ b/spec/helpers.lua @@ -1342,6 +1342,26 @@ local function wait_until(f, timeout, step) end + +--- Waits until no Lua error occurred +-- The check function will repeatedly be called (with a fixed interval), until +-- there is no Lua error occurred +-- +-- NOTE: this is a regular Lua function, not a Luassert assertion. +-- @function pwait_until +-- @param f check function +-- @param timeout (optional) maximum time to wait after which an error is +-- thrown, defaults to 5. +-- @param step (optional) interval between checks, defaults to 0.05. +-- @return nothing. It returns when the condition is met, or throws an error +-- when it times out. +local function pwait_until(f, timeout, step) + wait_until(function() + return pcall(f) + end, timeout, step) +end + + --- Waits for invalidation of a cached key by polling the mgt-api -- and waiting for a 404 response. Throws an error on timeout. -- @@ -2921,6 +2941,7 @@ end grpc_client = grpc_client, http2_client = http2_client, wait_until = wait_until, + pwait_until = pwait_until, wait_pid = wait_pid, tcp_server = tcp_server, udp_server = udp_server,