Skip to content

Commit

Permalink
feat(queue): added a new configuration concurrency_limit(integer, d…
Browse files Browse the repository at this point in the history
…efault to 1) for Queue to specify the number of delivery timers (#13332)

Added a new configuration concurrency_limit(integer, default to 1) for Queue to specify the number of delivery timers.

Note that setting concurrency_limit to -1 means no limit, and each HTTP Log will create a individual timer to send.

FTI-6022
  • Loading branch information
vm-001 authored Aug 19, 2024
1 parent 88cc074 commit c5566bb
Show file tree
Hide file tree
Showing 9 changed files with 146 additions and 46 deletions.
5 changes: 5 additions & 0 deletions changelog/unreleased/kong/feat-queue-concurrency-limit.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
message: |
Added a new configuration `concurrency_limit`(integer, default to 1) for Queue to specify the number of delivery timers.
Note that setting `concurrency_limit` to `-1` means no limit at all, and each HTTP log entry would create an individual timer for sending.
type: feature
scope: Core
13 changes: 13 additions & 0 deletions kong/clustering/compat/removed_fields.lua
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ return {
opentelemetry = {
"traces_endpoint",
"logs_endpoint",
"queue.concurrency_limit",
},
ai_proxy = {
"max_request_body_size",
Expand Down Expand Up @@ -212,5 +213,17 @@ return {
acl = {
"always_use_authenticated_groups",
},
http_log = {
"queue.concurrency_limit",
},
statsd = {
"queue.concurrency_limit",
},
datadog = {
"queue.concurrency_limit",
},
zipkin = {
"queue.concurrency_limit",
},
},
}
115 changes: 71 additions & 44 deletions kong/tools/queue.lua
Original file line number Diff line number Diff line change
Expand Up @@ -240,16 +240,18 @@ local function get_or_create_queue(queue_conf, handler, handler_conf)

queue = setmetatable(queue, Queue_mt)

kong.timer:named_at("queue " .. key, 0, function(_, q)
while q:count() > 0 do
q:log_debug("processing queue")
q:process_once()
end
q:log_debug("done processing queue")
queues[key] = nil
end, queue)
if queue.concurrency_limit == 1 then
kong.timer:named_at("queue " .. key, 0, function(_, q)
while q:count() > 0 do
q:log_debug("processing queue")
q:process_once()
end
q:log_debug("done processing queue")
queues[key] = nil
end, queue)
queues[key] = queue
end

queues[key] = queue

queue:log_debug("queue created")

Expand Down Expand Up @@ -314,6 +316,45 @@ function Queue.can_enqueue(queue_conf, entry)
return _can_enqueue(queue, entry)
end

local function handle(self, entries)
local entry_count = #entries

local start_time = now()
local retry_count = 0
while true do
self:log_debug("passing %d entries to handler", entry_count)
local status, ok, err = pcall(self.handler, self.handler_conf, entries)
if status and ok == true then
self:log_debug("handler processed %d entries successfully", entry_count)
break
end

if not status then
-- protected call failed, ok is the error message
err = ok
end

self:log_warn("handler could not process entries: %s", tostring(err or "no error details returned by handler"))

if not err then
self:log_err("handler returned falsy value but no error information")
end

if (now() - start_time) > self.max_retry_time then
self:log_err(
"could not send entries due to max_retry_time exceeded. %d queue entries were lost",
entry_count)
break
end

-- Delay before retrying. The delay time is calculated by multiplying the configured initial_retry_delay with
-- 2 to the power of the number of retries, creating an exponential increase over the course of each retry.
-- The maximum time between retries is capped by the max_retry_delay configuration parameter.
sleep(math_min(self.max_retry_delay, 2 ^ retry_count * self.initial_retry_delay))
retry_count = retry_count + 1
end
end


-- Delete the frontmost entry from the queue and adjust the current utilization variables.
function Queue:delete_frontmost_entry()
Expand Down Expand Up @@ -387,41 +428,7 @@ function Queue:process_once()
self.already_dropped_entries = false
end

local start_time = now()
local retry_count = 0
while true do
self:log_debug("passing %d entries to handler", entry_count)
local status
status, ok, err = pcall(self.handler, self.handler_conf, batch)
if status and ok == true then
self:log_debug("handler processed %d entries successfully", entry_count)
break
end

if not status then
-- protected call failed, ok is the error message
err = ok
end

self:log_warn("handler could not process entries: %s", tostring(err or "no error details returned by handler"))

if not err then
self:log_err("handler returned falsy value but no error information")
end

if (now() - start_time) > self.max_retry_time then
self:log_err(
"could not send entries, giving up after %d retries. %d queue entries were lost",
retry_count, entry_count)
break
end

-- Delay before retrying. The delay time is calculated by multiplying the configured initial_retry_delay with
-- 2 to the power of the number of retries, creating an exponential increase over the course of each retry.
-- The maximum time between retries is capped by the max_retry_delay configuration parameter.
sleep(math_min(self.max_retry_delay, 2 ^ retry_count * self.initial_retry_delay))
retry_count = retry_count + 1
end
handle(self, batch)
end


Expand Down Expand Up @@ -506,6 +513,21 @@ local function enqueue(self, entry)
return nil, "entry must be a non-nil Lua value"
end

if self.concurrency_limit == -1 then -- unlimited concurrency
-- do not enqueue when concurrency_limit is unlimited
local ok, err = kong.timer:at(0, function(premature)
if premature then
return
end
handle(self, { entry })
end)
if not ok then
return nil, "failed to crete timer: " .. err
end
return true
end


if self:count() >= self.max_entries * CAPACITY_WARNING_THRESHOLD then
if not self.warned then
self:log_warn('queue at %s%% capacity', CAPACITY_WARNING_THRESHOLD * 100)
Expand Down Expand Up @@ -615,6 +637,11 @@ function Queue.enqueue(queue_conf, handler, handler_conf, value)
"arg #1 (queue_conf) max_bytes must be a number or nil"
)

assert(
type(queue_conf.concurrency_limit) == "number",
"arg #1 (queue_conf) concurrency_limit must be a number"
)

local queue = get_or_create_queue(queue_conf, handler, handler_conf)
return enqueue(queue, value)
end
Expand Down
7 changes: 7 additions & 0 deletions kong/tools/queue_schema.lua
Original file line number Diff line number Diff line change
Expand Up @@ -49,5 +49,12 @@ return Schema.define {
between = { 0.001, 1000000 }, -- effectively unlimited maximum
description = "Maximum time in seconds between retries, caps exponential backoff."
} },
{ concurrency_limit = {
type = "integer",
default = 1,
one_of = { -1, 1 },
description = "The number of of queue delivery timers. -1 indicates unlimited."
} },

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ describe("declarative config: process_auto_fields", function()
max_coalescing_delay = 1,
max_retry_delay = 60,
max_retry_time = 60,
concurrency_limit = 1,
},
}
},
Expand Down Expand Up @@ -236,6 +237,7 @@ describe("declarative config: process_auto_fields", function()
max_coalescing_delay = 1,
max_retry_delay = 60,
max_retry_time = 60,
concurrency_limit = 1,
},
}
},
Expand Down Expand Up @@ -353,6 +355,7 @@ describe("declarative config: process_auto_fields", function()
max_coalescing_delay = 1,
max_retry_delay = 60,
max_retry_time = 60,
concurrency_limit = 1,
},
}
},
Expand Down Expand Up @@ -674,6 +677,7 @@ describe("declarative config: process_auto_fields", function()
max_coalescing_delay = 1,
max_retry_delay = 60,
max_retry_time = 60,
concurrency_limit = 1,
},
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ describe("declarative config: flatten", function()
max_retry_delay = 60,
max_retry_time = 60,
max_bytes = null,
concurrency_limit = 1,
},
}
},
Expand Down Expand Up @@ -409,6 +410,7 @@ describe("declarative config: flatten", function()
max_retry_delay = 60,
max_retry_time = 60,
max_bytes = null,
concurrency_limit = 1,
},
},
consumer = {
Expand Down Expand Up @@ -611,7 +613,8 @@ describe("declarative config: flatten", function()
max_retry_delay = 60,
max_retry_time = 60,
max_bytes = null,
}
concurrency_limit = 1,
},
},
consumer = null,
created_at = 1234567890,
Expand Down Expand Up @@ -1128,6 +1131,7 @@ describe("declarative config: flatten", function()
max_retry_delay = 60,
max_retry_time = 60,
max_bytes = null,
concurrency_limit = 1,
},
},
consumer = null,
Expand Down
5 changes: 4 additions & 1 deletion spec/01-unit/27-queue_spec.lua
Original file line number Diff line number Diff line change
Expand Up @@ -786,7 +786,7 @@ describe("plugin queue", function()
assert.equal("One", processed[1])
assert.equal("Three", processed[2])
assert.match_re(log_messages, 'WARN \\[\\] queue continue-processing: handler could not process entries: .*: hard error')
assert.match_re(log_messages, 'ERR \\[\\] queue continue-processing: could not send entries, giving up after \\d retries. 1 queue entries were lost')
assert.match_re(log_messages, 'ERR \\[\\] queue continue-processing: could not send entries due to max_retry_time exceeded. \\d queue entries were lost')
end)

it("sanity check for function Queue.is_full() & Queue.can_enqueue()", function()
Expand All @@ -799,6 +799,7 @@ describe("plugin queue", function()
max_retry_time = 60,
initial_retry_delay = 1,
max_retry_delay = 60,
concurrency_limit = 1,
}

local function enqueue(queue_conf, entry)
Expand Down Expand Up @@ -836,6 +837,7 @@ describe("plugin queue", function()
max_retry_time = 60,
initial_retry_delay = 1,
max_retry_delay = 60,
concurrency_limit = 1,
}

-- should be true if the queue does not exist
Expand All @@ -861,6 +863,7 @@ describe("plugin queue", function()
max_retry_time = 60,
initial_retry_delay = 1,
max_retry_delay = 60,
concurrency_limit = 1,
}

-- should be true if the queue does not exist
Expand Down
4 changes: 4 additions & 0 deletions spec/02-integration/09-hybrid_mode/09-config-compat_spec.lua
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ describe("CP/DP config compat transformations #" .. strategy, function()
expected_otel_prior_35.config.traces_endpoint = nil
expected_otel_prior_35.config.logs_endpoint = nil
expected_otel_prior_35.config.endpoint = "http://1.1.1.1:12345/v1/trace"
expected_otel_prior_35.config.queue.concurrency_limit = nil

do_assert(uuid(), "3.4.0", expected_otel_prior_35)

Expand All @@ -281,6 +282,7 @@ describe("CP/DP config compat transformations #" .. strategy, function()
expected_otel_prior_34.config.traces_endpoint = nil
expected_otel_prior_34.config.logs_endpoint = nil
expected_otel_prior_34.config.endpoint = "http://1.1.1.1:12345/v1/trace"
expected_otel_prior_34.config.queue.concurrency_limit = nil
do_assert(uuid(), "3.3.0", expected_otel_prior_34)

-- cleanup
Expand All @@ -307,6 +309,7 @@ describe("CP/DP config compat transformations #" .. strategy, function()
expected_zipkin_prior_35.config.header_type = "preserve"
expected_zipkin_prior_35.config.default_header_type = "b3"
expected_zipkin_prior_35.config.propagation = nil
expected_zipkin_prior_35.config.queue.concurrency_limit = nil
do_assert(uuid(), "3.4.0", expected_zipkin_prior_35)

-- cleanup
Expand All @@ -328,6 +331,7 @@ describe("CP/DP config compat transformations #" .. strategy, function()
expected_zipkin_prior_34.config.header_type = "preserve"
expected_zipkin_prior_34.config.default_header_type = "b3"
expected_zipkin_prior_34.config.propagation = nil
expected_zipkin_prior_34.config.queue.concurrency_limit = nil
do_assert(uuid(), "3.3.0", expected_zipkin_prior_34)

-- cleanup
Expand Down
33 changes: 33 additions & 0 deletions spec/03-plugins/03-http-log/01-log_spec.lua
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,25 @@ for _, strategy in helpers.each_strategy() do
}
}

local route1_4 = bp.routes:insert {
hosts = { "no_queue.test" },
service = service1
}

bp.plugins:insert {
route = { id = route1_4.id },
name = "http-log",
config = {
http_endpoint = "http://" .. helpers.mock_upstream_host
.. ":"
.. helpers.mock_upstream_port
.. "/post_log/http",
queue = {
concurrency_limit = -1,
},
}
}

helpers.setenv(vault_env_name, vault_env_value)

assert(helpers.start_kong({
Expand Down Expand Up @@ -638,6 +657,20 @@ for _, strategy in helpers.each_strategy() do

admin_client:close()
end)

it("should not use queue when queue.concurrency_limit is -1", function()
reset_log("http")
local res = proxy_client:get("/status/200", {
headers = {
["Host"] = "no_queue.test"
}
})
assert.res_status(200, res)

local entries = get_log("http", 1)
assert.same("127.0.0.1", entries[1].client_ip)
assert.logfile().has.no.line("processing queue", true) -- should not use queue
end)
end)

-- test both with a single worker for a deterministic test,
Expand Down

1 comment on commit c5566bb

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bazel Build

Docker image available kong/kong:c5566bb21c4459654e1d7d145518b63984329c22
Artifacts available https://github.com/Kong/kong/actions/runs/10446631733

Please sign in to comment.