Skip to content

Commit

Permalink
fix(dns): eliminate asynchronous timer in syncQuery() to prevent de…
Browse files Browse the repository at this point in the history
…adlock risk (#11900)

* Revert "fix(conf): set default value of `dns_no_sync` to `on` (#11869)"

This reverts commit 3be2513.

* fix(dns): introduce the synchronous query in syncQuery() to prevent hang risk

Originally the first request to `syncQuery()` will trigger an asynchronous timer
event, which added the risk of thread pool hanging.

With this patch, cold synchronously DNS query will always happen in the current
thread if current phase supports yielding.

Fix FTI-5348

---------

Co-authored-by: Datong Sun <[email protected]>
  • Loading branch information
chobits and dndx committed Nov 16, 2023
1 parent 4c80884 commit b917ae8
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 120 deletions.
3 changes: 3 additions & 0 deletions changelog/unreleased/kong/fix_dns_blocking.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
message: Eliminate asynchronous timer in syncQuery() to prevent hang risk
type: bugfix
scope: Core
141 changes: 73 additions & 68 deletions kong/resty/dns/client.lua
Original file line number Diff line number Diff line change
Expand Up @@ -637,7 +637,9 @@ _M.init = function(options)

config = options -- store it in our module level global

resolve_max_wait = options.timeout / 1000 * options.retrans -- maximum time to wait for the dns resolver to hit its timeouts
-- maximum time to wait for the dns resolver to hit its timeouts
-- + 1s to ensure some delay in timer execution and semaphore return are accounted for
resolve_max_wait = options.timeout / 1000 * options.retrans + 1

return true
end
Expand Down Expand Up @@ -721,44 +723,62 @@ local function individualQuery(qname, r_opts, try_list)
return result, nil, try_list
end


local queue = setmetatable({}, {__mode = "v"})

local function enqueue_query(key, qname, r_opts, try_list)
local item = {
key = key,
semaphore = semaphore(),
qname = qname,
r_opts = cycle_aware_deep_copy(r_opts),
try_list = try_list,
expire_time = time() + resolve_max_wait,
}
queue[key] = item
return item
end


local function dequeue_query(item)
if queue[item.key] == item then
-- query done, but by now many others might be waiting for our result.
-- 1) stop new ones from adding to our lock/semaphore
queue[item.key] = nil
-- 2) release all waiting threads
item.semaphore:post(math_max(item.semaphore:count() * -1, 1))
item.semaphore = nil
end
end


local function queue_get_query(key, try_list)
local item = queue[key]

if not item then
return nil
end

-- bug checks: release it actively if the waiting query queue is blocked
if item.expire_time < time() then
local err = "stale query, key:" .. key
add_status_to_try_list(try_list, err)
log(ALERT, PREFIX, err)
dequeue_query(item)
return nil
end

return item
end


-- to be called as a timer-callback, performs a query and returns the results
-- in the `item` table.
local function executeQuery(premature, item)
if premature then return end

local r, err = resolver:new(config)
if not r then
item.result, item.err = r, "failed to create a resolver: " .. err
else
--[[
log(DEBUG, PREFIX, "Query executing: ", item.qname, ":", item.r_opts.qtype, " ", fquery(item))
--]]
add_status_to_try_list(item.try_list, "querying")
item.result, item.err = r:query(item.qname, item.r_opts)
if item.result then
--[[
log(DEBUG, PREFIX, "Query answer: ", item.qname, ":", item.r_opts.qtype, " ", fquery(item),
" ", frecord(item.result))
--]]
parseAnswer(item.qname, item.r_opts.qtype, item.result, item.try_list)
--[[
log(DEBUG, PREFIX, "Query parsed answer: ", item.qname, ":", item.r_opts.qtype, " ", fquery(item),
" ", frecord(item.result))
else
log(DEBUG, PREFIX, "Query error: ", item.qname, ":", item.r_opts.qtype, " err=", tostring(err))
--]]
end
end
item.result, item.err = individualQuery(item.qname, item.r_opts, item.try_list)

-- query done, but by now many others might be waiting for our result.
-- 1) stop new ones from adding to our lock/semaphore
queue[item.key] = nil
-- 2) release all waiting threads
item.semaphore:post(math_max(item.semaphore:count() * -1, 1))
item.semaphore = nil
ngx.sleep(0)
dequeue_query(item)
end


Expand All @@ -772,7 +792,7 @@ end
-- the `semaphore` field will be removed). Upon error it returns `nil+error`.
local function asyncQuery(qname, r_opts, try_list)
local key = qname..":"..r_opts.qtype
local item = queue[key]
local item = queue_get_query(key, try_list)
if item then
--[[
log(DEBUG, PREFIX, "Query async (exists): ", key, " ", fquery(item))
Expand All @@ -781,14 +801,7 @@ local function asyncQuery(qname, r_opts, try_list)
return item -- already in progress, return existing query
end

item = {
key = key,
semaphore = semaphore(),
qname = qname,
r_opts = deepcopy(r_opts),
try_list = try_list,
}
queue[key] = item
item = enqueue_query(key, qname, r_opts, try_list)

local ok, err = timer_at(0, executeQuery, item)
if not ok then
Expand All @@ -814,40 +827,24 @@ end
-- @return `result + nil + try_list`, or `nil + err + try_list` in case of errors
local function syncQuery(qname, r_opts, try_list)
local key = qname..":"..r_opts.qtype
local item = queue[key]

-- if nothing is in progress, we start a new async query
local item = queue_get_query(key, try_list)

-- If nothing is in progress, we start a new sync query
if not item then
local err
item, err = asyncQuery(qname, r_opts, try_list)
if not item then
return item, err, try_list
end
else
add_status_to_try_list(try_list, "in progress (sync)")
end
item = enqueue_query(key, qname, r_opts, try_list)

local supported_semaphore_wait_phases = {
rewrite = true,
access = true,
content = true,
timer = true,
ssl_cert = true,
ssl_session_fetch = true,
}
item.result, item.err = individualQuery(qname, item.r_opts, try_list)

local ngx_phase = get_phase()
dequeue_query(item)

if not supported_semaphore_wait_phases[ngx_phase] then
-- phase not supported by `semaphore:wait`
-- return existing query (item)
--
-- this will avoid:
-- "dns lookup pool exceeded retries" (second try and subsequent retries)
-- "API disabled in the context of init_worker_by_lua" (first try)
return item, nil, try_list
return item.result, item.err, try_list
end

-- If the query is already in progress, we wait for it.

add_status_to_try_list(try_list, "in progress (sync)")

-- block and wait for the async query to complete
local ok, err = item.semaphore:wait(resolve_max_wait)
if ok and item.result then
Expand All @@ -860,6 +857,14 @@ local function syncQuery(qname, r_opts, try_list)
return item.result, item.err, try_list
end

-- bug checks
if not ok and not item.err then
item.err = err -- only first expired wait() reports error
log(ALERT, PREFIX, "semaphore:wait(", resolve_max_wait, ") failed: ", err,
", count: ", item.semaphore and item.semaphore:count(),
", qname: ", qname)
end

err = err or item.err or "unknown"
add_status_to_try_list(try_list, "error: "..err)

Expand Down
22 changes: 16 additions & 6 deletions spec/01-unit/21-dns-client/02-client_spec.lua
Original file line number Diff line number Diff line change
Expand Up @@ -582,7 +582,10 @@ describe("[DNS client]", function()
}
}))
query_func = function(self, original_query_func, name, options)
ngx.sleep(5)
-- The first request uses syncQuery not waiting on the
-- aysncQuery timer, so the low-level r:query() could not sleep(5s),
-- it can only sleep(timeout).
ngx.sleep(math.min(timeout, 5))
return nil
end
local start_time = ngx.now()
Expand Down Expand Up @@ -1745,9 +1748,12 @@ describe("[DNS client]", function()
end)

it("timeout while waiting", function()

local timeout = 500
local ip = "1.4.2.3"
-- basically the local function _synchronized_query
assert(client.init({
timeout = 500,
timeout = timeout,
retrans = 1,
resolvConf = {
-- resolv.conf without `search` and `domain` options
Expand All @@ -1758,7 +1764,7 @@ describe("[DNS client]", function()
-- insert a stub thats waits and returns a fixed record
local name = TEST_DOMAIN
query_func = function()
local ip = "1.4.2.3"
local ip = ip
local entry = {
{
type = client.TYPE_A,
Expand All @@ -1770,7 +1776,9 @@ describe("[DNS client]", function()
touch = 0,
expire = gettime() + 10,
}
sleep(0.5) -- wait before we return the results
-- wait before we return the results
-- `+ 2` s ensures that the semaphore:wait() expires
sleep(timeout/1000 + 2)
return entry
end

Expand Down Expand Up @@ -1800,10 +1808,12 @@ describe("[DNS client]", function()
ngx.thread.wait(coros[i]) -- this wait will resume the scheduled ones
end

-- all results are equal, as they all will wait for the first response
for i = 1, 10 do
-- results[1~9] are equal, as they all will wait for the first response
for i = 1, 9 do
assert.equal("timeout", results[i])
end
-- results[10] comes from synchronous DNS access of the first request
assert.equal(ip, results[10][1]["address"])
end)
end)

Expand Down
7 changes: 3 additions & 4 deletions t/03-dns-client/01-phases.t
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use Test::Nginx::Socket;

plan tests => repeat_each() * (blocks() * 5);
plan tests => repeat_each() * (blocks() * 4 + 1);

workers(6);

Expand Down Expand Up @@ -59,8 +59,7 @@ qq {
GET /t
--- response_body
answers: nil
err: dns client error: 101 empty record received
--- no_error_log
err: nil
--- error_log
[error]
dns lookup pool exceeded retries
API disabled in the context of init_worker_by_lua
80 changes: 38 additions & 42 deletions t/03-dns-client/02-timer-usage.t
Original file line number Diff line number Diff line change
Expand Up @@ -2,76 +2,72 @@ use Test::Nginx::Socket;

plan tests => repeat_each() * (blocks() * 5);

workers(6);
workers(1);

no_shuffle();
run_tests();

__DATA__

=== TEST 1: reuse timers for queries of same name, independent on # of workers
--- http_config eval
qq {
init_worker_by_lua_block {
local client = require("kong.resty.dns.client")
assert(client.init({
nameservers = { "8.8.8.8" },
hosts = {}, -- empty tables to parse to prevent defaulting to /etc/hosts
resolvConf = {}, -- and resolv.conf files
order = { "A" },
}))
local host = "httpbin.org"
local typ = client.TYPE_A
for i = 1, 10 do
client.resolve(host, { qtype = typ })
end
local host = "mockbin.org"
for i = 1, 10 do
client.resolve(host, { qtype = typ })
end
workers = ngx.worker.count()
timers = ngx.timer.pending_count()
}
}
=== TEST 1: stale result triggers async timer
--- config
location = /t {
access_by_lua_block {
-- init
local client = require("kong.resty.dns.client")
assert(client.init())
local host = "httpbin.org"
assert(client.init({
nameservers = { "127.0.0.53" },
hosts = {}, -- empty tables to parse to prevent defaulting to /etc/hosts
resolvConf = {}, -- and resolv.conf files
order = { "A" },
validTtl = 1,
}))
local host = "konghq.com"
local typ = client.TYPE_A
local answers, err = client.resolve(host, { qtype = typ })
-- first time
local answers, err, try_list = client.resolve(host, { qtype = typ })
if not answers then
ngx.say("failed to resolve: ", err)
return
end
ngx.say("first address name: ", answers[1].name)
ngx.say("first try_list: ", tostring(try_list))
-- sleep to wait for dns record to become stale
ngx.sleep(1.5)
host = "mockbin.org"
answers, err = client.resolve(host, { qtype = typ })
-- second time: use stale result and trigger async timer
answers, err, try_list = client.resolve(host, { qtype = typ })
if not answers then
ngx.say("failed to resolve: ", err)
return
end
ngx.say("second address name: ", answers[1].name)
ngx.say("second try_list: ", tostring(try_list))
ngx.say("workers: ", workers)
-- third time: use stale result and find triggered async timer
-- should be 2 timers maximum (1 for each hostname)
ngx.say("timers: ", timers)
answers, err, try_list = client.resolve(host, { qtype = typ })
if not answers then
ngx.say("failed to resolve: ", err)
return
end
ngx.say("third address name: ", answers[1].name)
ngx.say("third try_list: ", tostring(try_list))
}
}
--- request
GET /t
--- response_body
first address name: httpbin.org
second address name: mockbin.org
workers: 6
timers: 2
first address name: konghq.com
first try_list: ["(short)konghq.com:1 - cache-miss","konghq.com:1 - cache-miss/querying"]
second address name: konghq.com
second try_list: ["(short)konghq.com:1 - cache-hit/stale","konghq.com:1 - cache-hit/stale/scheduled"]
third address name: konghq.com
third try_list: ["(short)konghq.com:1 - cache-hit/stale","konghq.com:1 - cache-hit/stale/in progress (async)"]
--- no_error_log
[error]
dns lookup pool exceeded retries
Expand Down

0 comments on commit b917ae8

Please sign in to comment.