Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
vm-001 committed Aug 12, 2024
1 parent 744c5e6 commit 81cf796
Show file tree
Hide file tree
Showing 7 changed files with 25 additions and 17 deletions.
5 changes: 5 additions & 0 deletions changelog/unreleased/kong/feat-queue-concurrency-limit.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
message: |
Added a new configuration `concurrency_limit`(integer, default to 1) for Queue to specify the number of delivery timers.
Note that setting concurrency_limit to -1 means no limit, and each HTTP Log will create a individual timer to send.
type: feature
scope: Core
3 changes: 0 additions & 3 deletions changelog/unreleased/kong/http-log-concurrent-optimize.yml

This file was deleted.

10 changes: 5 additions & 5 deletions kong/clustering/compat/removed_fields.lua
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ return {
opentelemetry = {
"traces_endpoint",
"logs_endpoint",
"queue.concurrency",
"queue.concurrency_limit",
},
ai_proxy = {
"max_request_body_size",
Expand Down Expand Up @@ -212,16 +212,16 @@ return {
"always_use_authenticated_groups",
},
http_log = {
"queue.concurrency",
"queue.concurrency_limit",
},
statsd = {
"queue.concurrency",
"queue.concurrency_limit",
},
datadog = {
"queue.concurrency",
"queue.concurrency_limit",
},
zipkin = {
"queue.concurrency",
"queue.concurrency_limit",
},
},
}
10 changes: 5 additions & 5 deletions kong/tools/queue.lua
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ local function get_or_create_queue(queue_conf, handler, handler_conf)

queue = setmetatable(queue, Queue_mt)

if queue.concurrency == 1 then
if queue.concurrency_limit == 1 then
kong.timer:named_at("queue " .. key, 0, function(_, q)
while q:count() > 0 do
q:log_debug("processing queue")
Expand Down Expand Up @@ -514,8 +514,8 @@ local function enqueue(self, entry)
return nil, "entry must be a non-nil Lua value"
end


if self.concurrency == 0 then
if self.concurrency_limit == -1 then -- unlimited concurrency
-- do not enqueue when concurrency_limit is unlimited
local ok, err = timer_at(0, function(premature)
if premature then
return
Expand Down Expand Up @@ -639,8 +639,8 @@ function Queue.enqueue(queue_conf, handler, handler_conf, value)
)

assert(
type(queue_conf.concurrency) == "number",
"arg #1 (queue_conf) concurrency must be a number"
type(queue_conf.concurrency_limit) == "number",
"arg #1 (queue_conf) concurrency_limit must be a number"
)

local queue = get_or_create_queue(queue_conf, handler, handler_conf)
Expand Down
7 changes: 3 additions & 4 deletions kong/tools/queue_schema.lua
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,11 @@ return Schema.define {
between = { 0.001, 1000000 }, -- effectively unlimited maximum
description = "Maximum time in seconds between retries, caps exponential backoff."
} },
{ concurrency = {
{ concurrency_limit = {
type = "integer",
default = 1,
between = { 0, 1 },
required = true,
description = "TBD11111111"
one_of = { -1, 1 },
description = "The number of of queue delivery timers. -1 indicates unlimited."
} },

}
Expand Down
3 changes: 3 additions & 0 deletions spec/01-unit/27-queue_spec.lua
Original file line number Diff line number Diff line change
Expand Up @@ -799,6 +799,7 @@ describe("plugin queue", function()
max_retry_time = 60,
initial_retry_delay = 1,
max_retry_delay = 60,
concurrency_limit = 1,
}

local function enqueue(queue_conf, entry)
Expand Down Expand Up @@ -836,6 +837,7 @@ describe("plugin queue", function()
max_retry_time = 60,
initial_retry_delay = 1,
max_retry_delay = 60,
concurrency_limit = 1,
}

-- should be true if the queue does not exist
Expand All @@ -861,6 +863,7 @@ describe("plugin queue", function()
max_retry_time = 60,
initial_retry_delay = 1,
max_retry_delay = 60,
concurrency_limit = 1,
}

-- should be true if the queue does not exist
Expand Down
4 changes: 4 additions & 0 deletions spec/02-integration/09-hybrid_mode/09-config-compat_spec.lua
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ describe("CP/DP config compat transformations #" .. strategy, function()
expected_otel_prior_35.config.traces_endpoint = nil
expected_otel_prior_35.config.logs_endpoint = nil
expected_otel_prior_35.config.endpoint = "http://1.1.1.1:12345/v1/trace"
expected_otel_prior_35.config.queue.concurrency_limit = nil

do_assert(uuid(), "3.4.0", expected_otel_prior_35)

Expand All @@ -281,6 +282,7 @@ describe("CP/DP config compat transformations #" .. strategy, function()
expected_otel_prior_34.config.traces_endpoint = nil
expected_otel_prior_34.config.logs_endpoint = nil
expected_otel_prior_34.config.endpoint = "http://1.1.1.1:12345/v1/trace"
expected_otel_prior_34.config.queue.concurrency_limit = nil
do_assert(uuid(), "3.3.0", expected_otel_prior_34)

-- cleanup
Expand All @@ -307,6 +309,7 @@ describe("CP/DP config compat transformations #" .. strategy, function()
expected_zipkin_prior_35.config.header_type = "preserve"
expected_zipkin_prior_35.config.default_header_type = "b3"
expected_zipkin_prior_35.config.propagation = nil
expected_zipkin_prior_35.config.queue.concurrency_limit = nil
do_assert(uuid(), "3.4.0", expected_zipkin_prior_35)

-- cleanup
Expand All @@ -328,6 +331,7 @@ describe("CP/DP config compat transformations #" .. strategy, function()
expected_zipkin_prior_34.config.header_type = "preserve"
expected_zipkin_prior_34.config.default_header_type = "b3"
expected_zipkin_prior_34.config.propagation = nil
expected_zipkin_prior_34.config.queue.concurrency_limit = nil
do_assert(uuid(), "3.3.0", expected_zipkin_prior_34)

-- cleanup
Expand Down

0 comments on commit 81cf796

Please sign in to comment.