From 24abfa55eb4cdde8f75c450c91b89ba179b79237 Mon Sep 17 00:00:00 2001 From: Douglas Lee Date: Wed, 17 Jul 2024 14:58:17 +0800 Subject: [PATCH] fix(plugins/http-log): improve concurrency when max_batch_size is set to 1 --- .../kong/http-log-concurrent-optimize.yml | 4 + kong/plugins/http-log/handler.lua | 18 +- kong/tools/queue.lua | 206 ++++++++++-------- spec/01-unit/27-queue_spec.lua | 2 +- spec/03-plugins/03-http-log/01-log_spec.lua | 19 +- 5 files changed, 153 insertions(+), 96 deletions(-) create mode 100644 changelog/unreleased/kong/http-log-concurrent-optimize.yml diff --git a/changelog/unreleased/kong/http-log-concurrent-optimize.yml b/changelog/unreleased/kong/http-log-concurrent-optimize.yml new file mode 100644 index 000000000000..47fa5f52d3a7 --- /dev/null +++ b/changelog/unreleased/kong/http-log-concurrent-optimize.yml @@ -0,0 +1,4 @@ +message: | + **HTTP-Log**: When `queue.max_batch_size` is 1, log entries are now sent in separate, parallel HTTP requests. Previously, they were sent sequentially in FIFO order. +type: bugfix +scope: Plugin diff --git a/kong/plugins/http-log/handler.lua b/kong/plugins/http-log/handler.lua index fd1d0cd48eeb..382a0654335b 100644 --- a/kong/plugins/http-log/handler.lua +++ b/kong/plugins/http-log/handler.lua @@ -14,6 +14,7 @@ local tonumber = tonumber local fmt = string.format local pairs = pairs local max = math.max +local timer_at = ngx.timer.at local sandbox_opts = { env = { kong = kong, ngx = ngx } } @@ -189,12 +190,27 @@ function HttpLogHandler:log(conf) local queue_conf = Queue.get_plugin_params("http-log", conf, make_queue_name(conf)) kong.log.debug("Queue name automatically configured based on configuration parameters to: ", queue_conf.name) + local entry = cjson.encode(kong.log.serialize()) + + if queue_conf.max_batch_size == 1 then + local queue = Queue.create(queue_conf, send_entries, conf) + local ok, err = timer_at(0, function(premature) + if premature then + return + end + queue:handle({ entry }) + end) + if not ok then + kong.log.err("failed to create timer: ", err) + end + return + end local ok, err = Queue.enqueue( queue_conf, send_entries, conf, - cjson.encode(kong.log.serialize()) + entry ) if not ok then kong.log.err("Failed to enqueue log entry to log server: ", err) diff --git a/kong/tools/queue.lua b/kong/tools/queue.lua index dc6a8fb6a108..8f45ca24ee6b 100644 --- a/kong/tools/queue.lua +++ b/kong/tools/queue.lua @@ -203,6 +203,14 @@ end -- @param opts table, requires `name`, optionally includes `retry_count`, `max_coalescing_delay` and `max_batch_size` -- @return table: a Queue object. local function get_or_create_queue(queue_conf, handler, handler_conf) + assert(type(queue_conf) == "table", + "arg #1 (queue_conf) must be a table") + assert(type(handler) == "function", + "arg #2 (handler) must be a function") + assert(handler_conf == nil or type(handler_conf) == "table", + "arg #3 (handler_conf) must be a table or nil") + assert(type(queue_conf.name) == "string", + "arg #1 (queue_conf) must include a name") local name = assert(queue_conf.name) local key = _make_queue_key(name) @@ -215,7 +223,70 @@ local function get_or_create_queue(queue_conf, handler, handler_conf) return queue end - queue = { + queue = Queue.create(queue_conf, handler, handler_conf) + + kong.timer:named_at("queue " .. key, 0, function(_, q) + while q:count() > 0 do + q:log_debug("processing queue") + q:process_once() + end + q:log_debug("done processing queue") + queues[key] = nil + end, queue) + + queues[key] = queue + + queue:log_debug("queue created") + + return queue +end + +function Queue.create(queue_conf, handler, handler_conf) + assert(type(queue_conf) == "table", + "arg #1 (queue_conf) must be a table") + assert(type(handler) == "function", + "arg #2 (handler) must be a function") + assert(handler_conf == nil or type(handler_conf) == "table", + "arg #3 (handler_conf) must be a table or nil") + assert(type(queue_conf.name) == "string", + "arg #1 (queue_conf) must include a name") + + + assert( + type(queue_conf.max_batch_size) == "number", + "arg #1 (queue_conf) max_batch_size must be a number" + ) + assert( + type(queue_conf.max_coalescing_delay) == "number", + "arg #1 (queue_conf) max_coalescing_delay must be a number" + ) + assert( + type(queue_conf.max_entries) == "number", + "arg #1 (queue_conf) max_entries must be a number" + ) + assert( + type(queue_conf.max_retry_time) == "number", + "arg #1 (queue_conf) max_retry_time must be a number" + ) + assert( + type(queue_conf.initial_retry_delay) == "number", + "arg #1 (queue_conf) initial_retry_delay must be a number" + ) + assert( + type(queue_conf.max_retry_delay) == "number", + "arg #1 (queue_conf) max_retry_delay must be a number" + ) + + local max_bytes_type = type(queue_conf.max_bytes) + assert( + max_bytes_type == "nil" or max_bytes_type == "number", + "arg #1 (queue_conf) max_bytes must be a number or nil" + ) + + local name = assert(queue_conf.name) + local key = _make_queue_key(name) + + local queue = { -- Queue parameters from the enqueue call name = name, key = key, @@ -238,22 +309,7 @@ local function get_or_create_queue(queue_conf, handler, handler_conf) queue[option] = value end - queue = setmetatable(queue, Queue_mt) - - kong.timer:named_at("queue " .. key, 0, function(_, q) - while q:count() > 0 do - q:log_debug("processing queue") - q:process_once() - end - q:log_debug("done processing queue") - queues[key] = nil - end, queue) - - queues[key] = queue - - queue:log_debug("queue created") - - return queue + return setmetatable(queue, Queue_mt) end @@ -338,6 +394,45 @@ function Queue:drop_oldest_entry() self:delete_frontmost_entry() end +function Queue:handle(entries) + local entry_count = #entries + + local start_time = now() + local retry_count = 0 + while true do + self:log_debug("passing %d entries to handler", entry_count) + local status, ok, err = pcall(self.handler, self.handler_conf, entries) + if status and ok == true then + self:log_debug("handler processed %d entries successfully", entry_count) + break + end + + if not status then + -- protected call failed, ok is the error message + err = ok + end + + self:log_warn("handler could not process entries: %s", tostring(err or "no error details returned by handler")) + + if not err then + self:log_err("handler returned falsy value but no error information") + end + + if (now() - start_time) > self.max_retry_time then + self:log_err( + "could not send entries due to max_retry_time exceeded. %d queue entries were lost", + entry_count) + break + end + + -- Delay before retrying. The delay time is calculated by multiplying the configured initial_retry_delay with + -- 2 to the power of the number of retries, creating an exponential increase over the course of each retry. + -- The maximum time between retries is capped by the max_retry_delay configuration parameter. + sleep(math_min(self.max_retry_delay, 2 ^ retry_count * self.initial_retry_delay)) + retry_count = retry_count + 1 + end +end + -- Process one batch of entries from the queue. Returns truthy if entries were processed, falsy if there was an -- error or no items were on the queue to be processed. @@ -387,41 +482,7 @@ function Queue:process_once() self.already_dropped_entries = false end - local start_time = now() - local retry_count = 0 - while true do - self:log_debug("passing %d entries to handler", entry_count) - local status - status, ok, err = pcall(self.handler, self.handler_conf, batch) - if status and ok == true then - self:log_debug("handler processed %d entries successfully", entry_count) - break - end - - if not status then - -- protected call failed, ok is the error message - err = ok - end - - self:log_warn("handler could not process entries: %s", tostring(err or "no error details returned by handler")) - - if not err then - self:log_err("handler returned falsy value but no error information") - end - - if (now() - start_time) > self.max_retry_time then - self:log_err( - "could not send entries, giving up after %d retries. %d queue entries were lost", - retry_count, entry_count) - break - end - - -- Delay before retrying. The delay time is calculated by multiplying the configured initial_retry_delay with - -- 2 to the power of the number of retries, creating an exponential increase over the course of each retry. - -- The maximum time between retries is capped by the max_retry_delay configuration parameter. - sleep(math_min(self.max_retry_delay, 2 ^ retry_count * self.initial_retry_delay)) - retry_count = retry_count + 1 - end + self:handle(batch) end @@ -574,47 +635,6 @@ end function Queue.enqueue(queue_conf, handler, handler_conf, value) - - assert(type(queue_conf) == "table", - "arg #1 (queue_conf) must be a table") - assert(type(handler) == "function", - "arg #2 (handler) must be a function") - assert(handler_conf == nil or type(handler_conf) == "table", - "arg #3 (handler_conf) must be a table or nil") - assert(type(queue_conf.name) == "string", - "arg #1 (queue_conf) must include a name") - - assert( - type(queue_conf.max_batch_size) == "number", - "arg #1 (queue_conf) max_batch_size must be a number" - ) - assert( - type(queue_conf.max_coalescing_delay) == "number", - "arg #1 (queue_conf) max_coalescing_delay must be a number" - ) - assert( - type(queue_conf.max_entries) == "number", - "arg #1 (queue_conf) max_entries must be a number" - ) - assert( - type(queue_conf.max_retry_time) == "number", - "arg #1 (queue_conf) max_retry_time must be a number" - ) - assert( - type(queue_conf.initial_retry_delay) == "number", - "arg #1 (queue_conf) initial_retry_delay must be a number" - ) - assert( - type(queue_conf.max_retry_delay) == "number", - "arg #1 (queue_conf) max_retry_delay must be a number" - ) - - local max_bytes_type = type(queue_conf.max_bytes) - assert( - max_bytes_type == "nil" or max_bytes_type == "number", - "arg #1 (queue_conf) max_bytes must be a number or nil" - ) - local queue = get_or_create_queue(queue_conf, handler, handler_conf) return enqueue(queue, value) end diff --git a/spec/01-unit/27-queue_spec.lua b/spec/01-unit/27-queue_spec.lua index 548be6b42bbe..5f94c9d73094 100644 --- a/spec/01-unit/27-queue_spec.lua +++ b/spec/01-unit/27-queue_spec.lua @@ -786,7 +786,7 @@ describe("plugin queue", function() assert.equal("One", processed[1]) assert.equal("Three", processed[2]) assert.match_re(log_messages, 'WARN \\[\\] queue continue-processing: handler could not process entries: .*: hard error') - assert.match_re(log_messages, 'ERR \\[\\] queue continue-processing: could not send entries, giving up after \\d retries. 1 queue entries were lost') + assert.match_re(log_messages, 'ERR \\[\\] queue continue-processing: could not send entries due to max_retry_time exceeded. \\d queue entries were lost') end) it("sanity check for function Queue.is_full() & Queue.can_enqueue()", function() diff --git a/spec/03-plugins/03-http-log/01-log_spec.lua b/spec/03-plugins/03-http-log/01-log_spec.lua index fb96cb03d38e..5f1a19ac4a75 100644 --- a/spec/03-plugins/03-http-log/01-log_spec.lua +++ b/spec/03-plugins/03-http-log/01-log_spec.lua @@ -92,7 +92,10 @@ for _, strategy in helpers.each_strategy() do http_endpoint = "http://" .. helpers.mock_upstream_host .. ":" .. helpers.mock_upstream_port - .. "/post_log/http_tag" + .. "/post_log/http_tag", + queue = { + max_batch_size = 2, + } } } @@ -638,6 +641,20 @@ for _, strategy in helpers.each_strategy() do admin_client:close() end) + + it("should not use queue when max_batch_size is 1", function() + reset_log("http") + local res = proxy_client:get("/status/200", { + headers = { + ["Host"] = "http_logging.test" + } + }) + assert.res_status(200, res) + + local entries = get_log("http", 1) + assert.same("127.0.0.1", entries[1].client_ip) + assert.logfile().has.no.line("processing queue", true) -- should not use queue + end) end) -- test both with a single worker for a deterministic test,