Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(plugins/http-log): improve concurrency when max_batch_size is set to 1 #13384

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions changelog/unreleased/kong/http-log-concurrent-optimize.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
message: |
**HTTP-Log**: When `queue.max_batch_size` is 1, log entries are now sent in separate, parallel HTTP requests. Previously, they were sent sequentially in FIFO order.
type: bugfix
scope: Plugin
18 changes: 17 additions & 1 deletion kong/plugins/http-log/handler.lua
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ local tonumber = tonumber
local fmt = string.format
local pairs = pairs
local max = math.max
local timer_at = ngx.timer.at


local sandbox_opts = { env = { kong = kong, ngx = ngx } }
Expand Down Expand Up @@ -189,12 +190,27 @@ function HttpLogHandler:log(conf)

local queue_conf = Queue.get_plugin_params("http-log", conf, make_queue_name(conf))
kong.log.debug("Queue name automatically configured based on configuration parameters to: ", queue_conf.name)
local entry = cjson.encode(kong.log.serialize())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pay attention to @chronolaw PR: #13421

We now use the clustering_utils.json_decode or clustering_utils.json_encode.


if queue_conf.max_batch_size == 1 then
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a hidden feature. We can update the official doc https://docs.konghq.com/gateway/latest/kong-plugins/queue.

local queue = Queue.create(queue_conf, send_entries, conf)
local ok, err = timer_at(0, function(premature)
if premature then
return
end
queue:handle({ entry })
end)
if not ok then
kong.log.err("failed to create timer: ", err)
end
return
end

local ok, err = Queue.enqueue(
queue_conf,
send_entries,
conf,
cjson.encode(kong.log.serialize())
entry
)
if not ok then
kong.log.err("Failed to enqueue log entry to log server: ", err)
Expand Down
206 changes: 113 additions & 93 deletions kong/tools/queue.lua
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,14 @@ end
-- @param opts table, requires `name`, optionally includes `retry_count`, `max_coalescing_delay` and `max_batch_size`
-- @return table: a Queue object.
local function get_or_create_queue(queue_conf, handler, handler_conf)
assert(type(queue_conf) == "table",
"arg #1 (queue_conf) must be a table")
assert(type(handler) == "function",
"arg #2 (handler) must be a function")
assert(handler_conf == nil or type(handler_conf) == "table",
"arg #3 (handler_conf) must be a table or nil")
assert(type(queue_conf.name) == "string",
"arg #1 (queue_conf) must include a name")

local name = assert(queue_conf.name)
local key = _make_queue_key(name)
Expand All @@ -215,7 +223,70 @@ local function get_or_create_queue(queue_conf, handler, handler_conf)
return queue
end

queue = {
queue = Queue.create(queue_conf, handler, handler_conf)

kong.timer:named_at("queue " .. key, 0, function(_, q)
while q:count() > 0 do
q:log_debug("processing queue")
q:process_once()
end
q:log_debug("done processing queue")
queues[key] = nil
end, queue)

queues[key] = queue

queue:log_debug("queue created")

return queue
end

function Queue.create(queue_conf, handler, handler_conf)
assert(type(queue_conf) == "table",
"arg #1 (queue_conf) must be a table")
assert(type(handler) == "function",
"arg #2 (handler) must be a function")
assert(handler_conf == nil or type(handler_conf) == "table",
"arg #3 (handler_conf) must be a table or nil")
assert(type(queue_conf.name) == "string",
"arg #1 (queue_conf) must include a name")


assert(
type(queue_conf.max_batch_size) == "number",
"arg #1 (queue_conf) max_batch_size must be a number"
)
assert(
type(queue_conf.max_coalescing_delay) == "number",
"arg #1 (queue_conf) max_coalescing_delay must be a number"
)
assert(
type(queue_conf.max_entries) == "number",
"arg #1 (queue_conf) max_entries must be a number"
)
assert(
type(queue_conf.max_retry_time) == "number",
"arg #1 (queue_conf) max_retry_time must be a number"
)
assert(
type(queue_conf.initial_retry_delay) == "number",
"arg #1 (queue_conf) initial_retry_delay must be a number"
)
assert(
type(queue_conf.max_retry_delay) == "number",
"arg #1 (queue_conf) max_retry_delay must be a number"
)

local max_bytes_type = type(queue_conf.max_bytes)
assert(
max_bytes_type == "nil" or max_bytes_type == "number",
"arg #1 (queue_conf) max_bytes must be a number or nil"
)

local name = assert(queue_conf.name)
local key = _make_queue_key(name)

local queue = {
-- Queue parameters from the enqueue call
name = name,
key = key,
Expand All @@ -238,22 +309,7 @@ local function get_or_create_queue(queue_conf, handler, handler_conf)
queue[option] = value
end

queue = setmetatable(queue, Queue_mt)

kong.timer:named_at("queue " .. key, 0, function(_, q)
while q:count() > 0 do
q:log_debug("processing queue")
q:process_once()
end
q:log_debug("done processing queue")
queues[key] = nil
end, queue)

queues[key] = queue

queue:log_debug("queue created")

return queue
return setmetatable(queue, Queue_mt)
end


Expand Down Expand Up @@ -338,6 +394,45 @@ function Queue:drop_oldest_entry()
self:delete_frontmost_entry()
end

function Queue:handle(entries)
local entry_count = #entries

local start_time = now()
local retry_count = 0
while true do
self:log_debug("passing %d entries to handler", entry_count)
local status, ok, err = pcall(self.handler, self.handler_conf, entries)
if status and ok == true then
self:log_debug("handler processed %d entries successfully", entry_count)
break
end

if not status then
-- protected call failed, ok is the error message
err = ok
end

self:log_warn("handler could not process entries: %s", tostring(err or "no error details returned by handler"))

if not err then
self:log_err("handler returned falsy value but no error information")
end

if (now() - start_time) > self.max_retry_time then
self:log_err(
"could not send entries due to max_retry_time exceeded. %d queue entries were lost",
entry_count)
break
end

-- Delay before retrying. The delay time is calculated by multiplying the configured initial_retry_delay with
-- 2 to the power of the number of retries, creating an exponential increase over the course of each retry.
-- The maximum time between retries is capped by the max_retry_delay configuration parameter.
sleep(math_min(self.max_retry_delay, 2 ^ retry_count * self.initial_retry_delay))
retry_count = retry_count + 1
end
end


-- Process one batch of entries from the queue. Returns truthy if entries were processed, falsy if there was an
-- error or no items were on the queue to be processed.
Expand Down Expand Up @@ -387,41 +482,7 @@ function Queue:process_once()
self.already_dropped_entries = false
end

local start_time = now()
local retry_count = 0
while true do
self:log_debug("passing %d entries to handler", entry_count)
local status
status, ok, err = pcall(self.handler, self.handler_conf, batch)
if status and ok == true then
self:log_debug("handler processed %d entries successfully", entry_count)
break
end

if not status then
-- protected call failed, ok is the error message
err = ok
end

self:log_warn("handler could not process entries: %s", tostring(err or "no error details returned by handler"))

if not err then
self:log_err("handler returned falsy value but no error information")
end

if (now() - start_time) > self.max_retry_time then
self:log_err(
"could not send entries, giving up after %d retries. %d queue entries were lost",
retry_count, entry_count)
break
end

-- Delay before retrying. The delay time is calculated by multiplying the configured initial_retry_delay with
-- 2 to the power of the number of retries, creating an exponential increase over the course of each retry.
-- The maximum time between retries is capped by the max_retry_delay configuration parameter.
sleep(math_min(self.max_retry_delay, 2 ^ retry_count * self.initial_retry_delay))
retry_count = retry_count + 1
end
self:handle(batch)
end


Expand Down Expand Up @@ -574,47 +635,6 @@ end


function Queue.enqueue(queue_conf, handler, handler_conf, value)

assert(type(queue_conf) == "table",
"arg #1 (queue_conf) must be a table")
assert(type(handler) == "function",
"arg #2 (handler) must be a function")
assert(handler_conf == nil or type(handler_conf) == "table",
"arg #3 (handler_conf) must be a table or nil")
assert(type(queue_conf.name) == "string",
"arg #1 (queue_conf) must include a name")

assert(
type(queue_conf.max_batch_size) == "number",
"arg #1 (queue_conf) max_batch_size must be a number"
)
assert(
type(queue_conf.max_coalescing_delay) == "number",
"arg #1 (queue_conf) max_coalescing_delay must be a number"
)
assert(
type(queue_conf.max_entries) == "number",
"arg #1 (queue_conf) max_entries must be a number"
)
assert(
type(queue_conf.max_retry_time) == "number",
"arg #1 (queue_conf) max_retry_time must be a number"
)
assert(
type(queue_conf.initial_retry_delay) == "number",
"arg #1 (queue_conf) initial_retry_delay must be a number"
)
assert(
type(queue_conf.max_retry_delay) == "number",
"arg #1 (queue_conf) max_retry_delay must be a number"
)

local max_bytes_type = type(queue_conf.max_bytes)
assert(
max_bytes_type == "nil" or max_bytes_type == "number",
"arg #1 (queue_conf) max_bytes must be a number or nil"
)

local queue = get_or_create_queue(queue_conf, handler, handler_conf)
return enqueue(queue, value)
end
Expand Down
2 changes: 1 addition & 1 deletion spec/01-unit/27-queue_spec.lua
Original file line number Diff line number Diff line change
Expand Up @@ -786,7 +786,7 @@ describe("plugin queue", function()
assert.equal("One", processed[1])
assert.equal("Three", processed[2])
assert.match_re(log_messages, 'WARN \\[\\] queue continue-processing: handler could not process entries: .*: hard error')
assert.match_re(log_messages, 'ERR \\[\\] queue continue-processing: could not send entries, giving up after \\d retries. 1 queue entries were lost')
assert.match_re(log_messages, 'ERR \\[\\] queue continue-processing: could not send entries due to max_retry_time exceeded. \\d queue entries were lost')
end)

it("sanity check for function Queue.is_full() & Queue.can_enqueue()", function()
Expand Down
19 changes: 18 additions & 1 deletion spec/03-plugins/03-http-log/01-log_spec.lua
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,10 @@ for _, strategy in helpers.each_strategy() do
http_endpoint = "http://" .. helpers.mock_upstream_host
.. ":"
.. helpers.mock_upstream_port
.. "/post_log/http_tag"
.. "/post_log/http_tag",
queue = {
max_batch_size = 2,
}
}
}

Expand Down Expand Up @@ -638,6 +641,20 @@ for _, strategy in helpers.each_strategy() do

admin_client:close()
end)

it("should not use queue when max_batch_size is 1", function()
reset_log("http")
local res = proxy_client:get("/status/200", {
headers = {
["Host"] = "http_logging.test"
}
})
assert.res_status(200, res)

local entries = get_log("http", 1)
assert.same("127.0.0.1", entries[1].client_ip)
assert.logfile().has.no.line("processing queue", true) -- should not use queue
end)
end)

-- test both with a single worker for a deterministic test,
Expand Down
Loading