Skip to content

Commit

Permalink
fix(dns): eliminate asynchronous timer in syncQuery() to prevent de…
Browse files Browse the repository at this point in the history
…adlock risk (#11900)

* fix(dns): introduce the synchronous query in syncQuery() to prevent hang risk

Originally the first request to `syncQuery()` will trigger an asynchronous timer
event, which added the risk of thread pool hanging.

With this patch, cold synchronously DNS query will always happen in the current
thread if current phase supports yielding.

Fix FTI-5348

---------

Co-authored-by: Datong Sun <[email protected]>
  • Loading branch information
chobits and dndx committed Nov 30, 2023
1 parent 81fc4aa commit 36ce119
Show file tree
Hide file tree
Showing 5 changed files with 132 additions and 120 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG/unreleased/kong/fix_dns_blocking.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
message: Eliminate asynchronous timer in syncQuery() to prevent hang risk
type: bugfix
scope: Core
144 changes: 74 additions & 70 deletions kong/resty/dns/client.lua
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@ local time = ngx.now
local log = ngx.log
local ERR = ngx.ERR
local WARN = ngx.WARN
local ALERT = ngx.ALERT
local DEBUG = ngx.DEBUG
--[[
DEBUG = ngx.WARN
--]]
local PREFIX = "[dns-client] "
local timer_at = ngx.timer.at
local get_phase = ngx.get_phase

local math_min = math.min
local math_max = math.max
Expand Down Expand Up @@ -642,7 +642,9 @@ _M.init = function(options)

config = options -- store it in our module level global

resolve_max_wait = options.timeout / 1000 * options.retrans -- maximum time to wait for the dns resolver to hit its timeouts
-- maximum time to wait for the dns resolver to hit its timeouts
-- + 1s to ensure some delay in timer execution and semaphore return are accounted for
resolve_max_wait = options.timeout / 1000 * options.retrans + 1

return true
end
Expand Down Expand Up @@ -733,44 +735,61 @@ local function individualQuery(qname, r_opts, try_list)
end

local queue = setmetatable({}, {__mode = "v"})

local function enqueue_query(key, qname, r_opts, try_list)
local item = {
key = key,
semaphore = semaphore(),
qname = qname,
r_opts = deepcopy(r_opts),
try_list = try_list,
expire_time = time() + resolve_max_wait,
}
queue[key] = item
return item
end


local function dequeue_query(item)
if queue[item.key] == item then
-- query done, but by now many others might be waiting for our result.
-- 1) stop new ones from adding to our lock/semaphore
queue[item.key] = nil
-- 2) release all waiting threads
item.semaphore:post(math_max(item.semaphore:count() * -1, 1))
item.semaphore = nil
end
end


local function queue_get_query(key, try_list)
local item = queue[key]

if not item then
return nil
end

-- bug checks: release it actively if the waiting query queue is blocked
if item.expire_time < time() then
local err = "stale query, key:" .. key
add_status_to_try_list(try_list, err)
log(ALERT, PREFIX, err)
dequeue_query(item)
return nil
end

return item
end


-- to be called as a timer-callback, performs a query and returns the results
-- in the `item` table.
local function executeQuery(premature, item)
if premature then return end

local r, err = resolver:new(config)
if not r then
item.result, item.err = r, "failed to create a resolver: " .. err
else
--[[
log(DEBUG, PREFIX, "Query executing: ", item.qname, ":", item.r_opts.qtype, " ", fquery(item))
--]]
add_status_to_try_list(item.try_list, "querying")
item.result, item.err = r:query(item.qname, item.r_opts)
if item.result then
--[[
log(DEBUG, PREFIX, "Query answer: ", item.qname, ":", item.r_opts.qtype, " ", fquery(item),
" ", frecord(item.result))
--]]
parseAnswer(item.qname, item.r_opts.qtype, item.result, item.try_list)
--[[
log(DEBUG, PREFIX, "Query parsed answer: ", item.qname, ":", item.r_opts.qtype, " ", fquery(item),
" ", frecord(item.result))
else
log(DEBUG, PREFIX, "Query error: ", item.qname, ":", item.r_opts.qtype, " err=", tostring(err))
--]]
end
end
item.result, item.err = individualQuery(item.qname, item.r_opts, item.try_list)

-- query done, but by now many others might be waiting for our result.
-- 1) stop new ones from adding to our lock/semaphore
queue[item.key] = nil
-- 2) release all waiting threads
item.semaphore:post(math_max(item.semaphore:count() * -1, 1))
item.semaphore = nil
ngx.sleep(0)
-- 3) destroy the resolver -- ditto in individualQuery
r:destroy()
dequeue_query(item)
end


Expand All @@ -784,7 +803,7 @@ end
-- the `semaphore` field will be removed). Upon error it returns `nil+error`.
local function asyncQuery(qname, r_opts, try_list)
local key = qname..":"..r_opts.qtype
local item = queue[key]
local item = queue_get_query(key, try_list)
if item then
--[[
log(DEBUG, PREFIX, "Query async (exists): ", key, " ", fquery(item))
Expand All @@ -793,14 +812,7 @@ local function asyncQuery(qname, r_opts, try_list)
return item -- already in progress, return existing query
end

item = {
key = key,
semaphore = semaphore(),
qname = qname,
r_opts = deepcopy(r_opts),
try_list = try_list,
}
queue[key] = item
item = enqueue_query(key, qname, r_opts, try_list)

local ok, err = timer_at(0, executeQuery, item)
if not ok then
Expand All @@ -826,40 +838,24 @@ end
-- @return `result + nil + try_list`, or `nil + err + try_list` in case of errors
local function syncQuery(qname, r_opts, try_list)
local key = qname..":"..r_opts.qtype
local item = queue[key]

-- if nothing is in progress, we start a new async query
local item = queue_get_query(key, try_list)

-- If nothing is in progress, we start a new sync query
if not item then
local err
item, err = asyncQuery(qname, r_opts, try_list)
if not item then
return item, err, try_list
end
else
add_status_to_try_list(try_list, "in progress (sync)")
end
item = enqueue_query(key, qname, r_opts, try_list)

local supported_semaphore_wait_phases = {
rewrite = true,
access = true,
content = true,
timer = true,
ssl_cert = true,
ssl_session_fetch = true,
}
item.result, item.err = individualQuery(qname, item.r_opts, try_list)

local ngx_phase = get_phase()
dequeue_query(item)

if not supported_semaphore_wait_phases[ngx_phase] then
-- phase not supported by `semaphore:wait`
-- return existing query (item)
--
-- this will avoid:
-- "dns lookup pool exceeded retries" (second try and subsequent retries)
-- "API disabled in the context of init_worker_by_lua" (first try)
return item, nil, try_list
return item.result, item.err, try_list
end

-- If the query is already in progress, we wait for it.

add_status_to_try_list(try_list, "in progress (sync)")

-- block and wait for the async query to complete
local ok, err = item.semaphore:wait(resolve_max_wait)
if ok and item.result then
Expand All @@ -872,6 +868,14 @@ local function syncQuery(qname, r_opts, try_list)
return item.result, item.err, try_list
end

-- bug checks
if not ok and not item.err then
item.err = err -- only first expired wait() reports error
log(ALERT, PREFIX, "semaphore:wait(", resolve_max_wait, ") failed: ", err,
", count: ", item.semaphore and item.semaphore:count(),
", qname: ", qname)
end

err = err or item.err or "unknown"
add_status_to_try_list(try_list, "error: "..err)

Expand Down
22 changes: 16 additions & 6 deletions spec/01-unit/21-dns-client/02-client_spec.lua
Original file line number Diff line number Diff line change
Expand Up @@ -584,7 +584,10 @@ describe("[DNS client]", function()
}
}))
query_func = function(self, original_query_func, name, options)
ngx.sleep(5)
-- The first request uses syncQuery not waiting on the
-- aysncQuery timer, so the low-level r:query() could not sleep(5s),
-- it can only sleep(timeout).
ngx.sleep(math.min(timeout, 5))
return nil
end
local start_time = ngx.now()
Expand Down Expand Up @@ -1742,9 +1745,12 @@ describe("[DNS client]", function()
end)

it("timeout while waiting", function()

local timeout = 500
local ip = "1.4.2.3"
-- basically the local function _synchronized_query
assert(client.init({
timeout = 500,
timeout = timeout,
retrans = 1,
resolvConf = {
-- resolv.conf without `search` and `domain` options
Expand All @@ -1755,7 +1761,7 @@ describe("[DNS client]", function()
-- insert a stub thats waits and returns a fixed record
local name = TEST_DOMAIN
query_func = function()
local ip = "1.4.2.3"
local ip = ip
local entry = {
{
type = client.TYPE_A,
Expand All @@ -1767,7 +1773,9 @@ describe("[DNS client]", function()
touch = 0,
expire = gettime() + 10,
}
sleep(0.5) -- wait before we return the results
-- wait before we return the results
-- `+ 2` s ensures that the semaphore:wait() expires
sleep(timeout/1000 + 2)
return entry
end

Expand Down Expand Up @@ -1797,10 +1805,12 @@ describe("[DNS client]", function()
ngx.thread.wait(coros[i]) -- this wait will resume the scheduled ones
end

-- all results are equal, as they all will wait for the first response
for i = 1, 10 do
-- results[1~9] are equal, as they all will wait for the first response
for i = 1, 9 do
assert.equal("timeout", results[i])
end
-- results[10] comes from synchronous DNS access of the first request
assert.equal(ip, results[10][1]["address"])
end)
end)

Expand Down
7 changes: 3 additions & 4 deletions t/03-dns-client/01-phases.t
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use Test::Nginx::Socket;

plan tests => repeat_each() * (blocks() * 5);
plan tests => repeat_each() * (blocks() * 4 + 1);

workers(6);

Expand Down Expand Up @@ -59,8 +59,7 @@ qq {
GET /t
--- response_body
answers: nil
err: dns client error: 101 empty record received
--- no_error_log
err: nil
--- error_log
[error]
dns lookup pool exceeded retries
API disabled in the context of init_worker_by_lua
Loading

0 comments on commit 36ce119

Please sign in to comment.