diff --git a/changelog/unreleased/kong/fix_dns_blocking.yml b/changelog/unreleased/kong/fix_dns_blocking.yml new file mode 100644 index 000000000000..a167c5fa1656 --- /dev/null +++ b/changelog/unreleased/kong/fix_dns_blocking.yml @@ -0,0 +1,3 @@ +message: Eliminate asynchronous timer in syncQuery() to prevent hang risk +type: bugfix +scope: Core diff --git a/kong/resty/dns/client.lua b/kong/resty/dns/client.lua index c5bfc55821ff..be6506795ee4 100644 --- a/kong/resty/dns/client.lua +++ b/kong/resty/dns/client.lua @@ -637,7 +637,9 @@ _M.init = function(options) config = options -- store it in our module level global - resolve_max_wait = options.timeout / 1000 * options.retrans -- maximum time to wait for the dns resolver to hit its timeouts + -- maximum time to wait for the dns resolver to hit its timeouts + -- + 1s to ensure some delay in timer execution and semaphore return are accounted for + resolve_max_wait = options.timeout / 1000 * options.retrans + 1 return true end @@ -721,44 +723,62 @@ local function individualQuery(qname, r_opts, try_list) return result, nil, try_list end - local queue = setmetatable({}, {__mode = "v"}) + +local function enqueue_query(key, qname, r_opts, try_list) + local item = { + key = key, + semaphore = semaphore(), + qname = qname, + r_opts = cycle_aware_deep_copy(r_opts), + try_list = try_list, + expire_time = time() + resolve_max_wait, + } + queue[key] = item + return item +end + + +local function dequeue_query(item) + if queue[item.key] == item then + -- query done, but by now many others might be waiting for our result. + -- 1) stop new ones from adding to our lock/semaphore + queue[item.key] = nil + -- 2) release all waiting threads + item.semaphore:post(math_max(item.semaphore:count() * -1, 1)) + item.semaphore = nil + end +end + + +local function queue_get_query(key, try_list) + local item = queue[key] + + if not item then + return nil + end + + -- bug checks: release it actively if the waiting query queue is blocked + if item.expire_time < time() then + local err = "stale query, key:" .. key + add_status_to_try_list(try_list, err) + log(ALERT, PREFIX, err) + dequeue_query(item) + return nil + end + + return item +end + + -- to be called as a timer-callback, performs a query and returns the results -- in the `item` table. local function executeQuery(premature, item) if premature then return end - local r, err = resolver:new(config) - if not r then - item.result, item.err = r, "failed to create a resolver: " .. err - else - --[[ - log(DEBUG, PREFIX, "Query executing: ", item.qname, ":", item.r_opts.qtype, " ", fquery(item)) - --]] - add_status_to_try_list(item.try_list, "querying") - item.result, item.err = r:query(item.qname, item.r_opts) - if item.result then - --[[ - log(DEBUG, PREFIX, "Query answer: ", item.qname, ":", item.r_opts.qtype, " ", fquery(item), - " ", frecord(item.result)) - --]] - parseAnswer(item.qname, item.r_opts.qtype, item.result, item.try_list) - --[[ - log(DEBUG, PREFIX, "Query parsed answer: ", item.qname, ":", item.r_opts.qtype, " ", fquery(item), - " ", frecord(item.result)) - else - log(DEBUG, PREFIX, "Query error: ", item.qname, ":", item.r_opts.qtype, " err=", tostring(err)) - --]] - end - end + item.result, item.err = individualQuery(item.qname, item.r_opts, item.try_list) - -- query done, but by now many others might be waiting for our result. - -- 1) stop new ones from adding to our lock/semaphore - queue[item.key] = nil - -- 2) release all waiting threads - item.semaphore:post(math_max(item.semaphore:count() * -1, 1)) - item.semaphore = nil - ngx.sleep(0) + dequeue_query(item) end @@ -772,7 +792,7 @@ end -- the `semaphore` field will be removed). Upon error it returns `nil+error`. local function asyncQuery(qname, r_opts, try_list) local key = qname..":"..r_opts.qtype - local item = queue[key] + local item = queue_get_query(key, try_list) if item then --[[ log(DEBUG, PREFIX, "Query async (exists): ", key, " ", fquery(item)) @@ -781,14 +801,7 @@ local function asyncQuery(qname, r_opts, try_list) return item -- already in progress, return existing query end - item = { - key = key, - semaphore = semaphore(), - qname = qname, - r_opts = deepcopy(r_opts), - try_list = try_list, - } - queue[key] = item + item = enqueue_query(key, qname, r_opts, try_list) local ok, err = timer_at(0, executeQuery, item) if not ok then @@ -814,40 +827,24 @@ end -- @return `result + nil + try_list`, or `nil + err + try_list` in case of errors local function syncQuery(qname, r_opts, try_list) local key = qname..":"..r_opts.qtype - local item = queue[key] - -- if nothing is in progress, we start a new async query + local item = queue_get_query(key, try_list) + + -- If nothing is in progress, we start a new sync query if not item then - local err - item, err = asyncQuery(qname, r_opts, try_list) - if not item then - return item, err, try_list - end - else - add_status_to_try_list(try_list, "in progress (sync)") - end + item = enqueue_query(key, qname, r_opts, try_list) - local supported_semaphore_wait_phases = { - rewrite = true, - access = true, - content = true, - timer = true, - ssl_cert = true, - ssl_session_fetch = true, - } + item.result, item.err = individualQuery(qname, item.r_opts, try_list) - local ngx_phase = get_phase() + dequeue_query(item) - if not supported_semaphore_wait_phases[ngx_phase] then - -- phase not supported by `semaphore:wait` - -- return existing query (item) - -- - -- this will avoid: - -- "dns lookup pool exceeded retries" (second try and subsequent retries) - -- "API disabled in the context of init_worker_by_lua" (first try) - return item, nil, try_list + return item.result, item.err, try_list end + -- If the query is already in progress, we wait for it. + + add_status_to_try_list(try_list, "in progress (sync)") + -- block and wait for the async query to complete local ok, err = item.semaphore:wait(resolve_max_wait) if ok and item.result then @@ -860,6 +857,14 @@ local function syncQuery(qname, r_opts, try_list) return item.result, item.err, try_list end + -- bug checks + if not ok and not item.err then + item.err = err -- only first expired wait() reports error + log(ALERT, PREFIX, "semaphore:wait(", resolve_max_wait, ") failed: ", err, + ", count: ", item.semaphore and item.semaphore:count(), + ", qname: ", qname) + end + err = err or item.err or "unknown" add_status_to_try_list(try_list, "error: "..err) diff --git a/spec/01-unit/21-dns-client/02-client_spec.lua b/spec/01-unit/21-dns-client/02-client_spec.lua index 106e47fde1b3..6a6715db1c7d 100644 --- a/spec/01-unit/21-dns-client/02-client_spec.lua +++ b/spec/01-unit/21-dns-client/02-client_spec.lua @@ -582,7 +582,10 @@ describe("[DNS client]", function() } })) query_func = function(self, original_query_func, name, options) - ngx.sleep(5) + -- The first request uses syncQuery not waiting on the + -- aysncQuery timer, so the low-level r:query() could not sleep(5s), + -- it can only sleep(timeout). + ngx.sleep(math.min(timeout, 5)) return nil end local start_time = ngx.now() @@ -1745,9 +1748,12 @@ describe("[DNS client]", function() end) it("timeout while waiting", function() + + local timeout = 500 + local ip = "1.4.2.3" -- basically the local function _synchronized_query assert(client.init({ - timeout = 500, + timeout = timeout, retrans = 1, resolvConf = { -- resolv.conf without `search` and `domain` options @@ -1758,7 +1764,7 @@ describe("[DNS client]", function() -- insert a stub thats waits and returns a fixed record local name = TEST_DOMAIN query_func = function() - local ip = "1.4.2.3" + local ip = ip local entry = { { type = client.TYPE_A, @@ -1770,7 +1776,9 @@ describe("[DNS client]", function() touch = 0, expire = gettime() + 10, } - sleep(0.5) -- wait before we return the results + -- wait before we return the results + -- `+ 2` s ensures that the semaphore:wait() expires + sleep(timeout/1000 + 2) return entry end @@ -1800,10 +1808,12 @@ describe("[DNS client]", function() ngx.thread.wait(coros[i]) -- this wait will resume the scheduled ones end - -- all results are equal, as they all will wait for the first response - for i = 1, 10 do + -- results[1~9] are equal, as they all will wait for the first response + for i = 1, 9 do assert.equal("timeout", results[i]) end + -- results[10] comes from synchronous DNS access of the first request + assert.equal(ip, results[10][1]["address"]) end) end) diff --git a/t/03-dns-client/01-phases.t b/t/03-dns-client/01-phases.t index e12cfab420cd..7f10aa9f6197 100644 --- a/t/03-dns-client/01-phases.t +++ b/t/03-dns-client/01-phases.t @@ -1,6 +1,6 @@ use Test::Nginx::Socket; -plan tests => repeat_each() * (blocks() * 5); +plan tests => repeat_each() * (blocks() * 4 + 1); workers(6); @@ -59,8 +59,7 @@ qq { GET /t --- response_body answers: nil -err: dns client error: 101 empty record received ---- no_error_log +err: nil +--- error_log [error] -dns lookup pool exceeded retries API disabled in the context of init_worker_by_lua diff --git a/t/03-dns-client/02-timer-usage.t b/t/03-dns-client/02-timer-usage.t index 24cc32bddb60..73c35ccb1c4e 100644 --- a/t/03-dns-client/02-timer-usage.t +++ b/t/03-dns-client/02-timer-usage.t @@ -2,76 +2,72 @@ use Test::Nginx::Socket; plan tests => repeat_each() * (blocks() * 5); -workers(6); +workers(1); no_shuffle(); run_tests(); __DATA__ - -=== TEST 1: reuse timers for queries of same name, independent on # of workers ---- http_config eval -qq { - init_worker_by_lua_block { - local client = require("kong.resty.dns.client") - assert(client.init({ - nameservers = { "8.8.8.8" }, - hosts = {}, -- empty tables to parse to prevent defaulting to /etc/hosts - resolvConf = {}, -- and resolv.conf files - order = { "A" }, - })) - local host = "httpbin.org" - local typ = client.TYPE_A - for i = 1, 10 do - client.resolve(host, { qtype = typ }) - end - - local host = "mockbin.org" - for i = 1, 10 do - client.resolve(host, { qtype = typ }) - end - - workers = ngx.worker.count() - timers = ngx.timer.pending_count() - } -} +=== TEST 1: stale result triggers async timer --- config location = /t { access_by_lua_block { + -- init local client = require("kong.resty.dns.client") - assert(client.init()) - local host = "httpbin.org" + assert(client.init({ + nameservers = { "127.0.0.53" }, + hosts = {}, -- empty tables to parse to prevent defaulting to /etc/hosts + resolvConf = {}, -- and resolv.conf files + order = { "A" }, + validTtl = 1, + })) + + local host = "konghq.com" local typ = client.TYPE_A - local answers, err = client.resolve(host, { qtype = typ }) + -- first time + + local answers, err, try_list = client.resolve(host, { qtype = typ }) if not answers then ngx.say("failed to resolve: ", err) + return end - ngx.say("first address name: ", answers[1].name) + ngx.say("first try_list: ", tostring(try_list)) + + -- sleep to wait for dns record to become stale + ngx.sleep(1.5) - host = "mockbin.org" - answers, err = client.resolve(host, { qtype = typ }) + -- second time: use stale result and trigger async timer + answers, err, try_list = client.resolve(host, { qtype = typ }) if not answers then ngx.say("failed to resolve: ", err) + return end - ngx.say("second address name: ", answers[1].name) + ngx.say("second try_list: ", tostring(try_list)) - ngx.say("workers: ", workers) + -- third time: use stale result and find triggered async timer - -- should be 2 timers maximum (1 for each hostname) - ngx.say("timers: ", timers) + answers, err, try_list = client.resolve(host, { qtype = typ }) + if not answers then + ngx.say("failed to resolve: ", err) + return + end + ngx.say("third address name: ", answers[1].name) + ngx.say("third try_list: ", tostring(try_list)) } } --- request GET /t --- response_body -first address name: httpbin.org -second address name: mockbin.org -workers: 6 -timers: 2 +first address name: konghq.com +first try_list: ["(short)konghq.com:1 - cache-miss","konghq.com:1 - cache-miss/querying"] +second address name: konghq.com +second try_list: ["(short)konghq.com:1 - cache-hit/stale","konghq.com:1 - cache-hit/stale/scheduled"] +third address name: konghq.com +third try_list: ["(short)konghq.com:1 - cache-hit/stale","konghq.com:1 - cache-hit/stale/in progress (async)"] --- no_error_log [error] dns lookup pool exceeded retries