From 81cf796f2ab2af13a4efc267a56eef23429e3fed Mon Sep 17 00:00:00 2001 From: Douglas Lee Date: Mon, 12 Aug 2024 15:23:49 +0800 Subject: [PATCH] update --- .../unreleased/kong/feat-queue-concurrency-limit.yml | 5 +++++ .../unreleased/kong/http-log-concurrent-optimize.yml | 3 --- kong/clustering/compat/removed_fields.lua | 10 +++++----- kong/tools/queue.lua | 10 +++++----- kong/tools/queue_schema.lua | 7 +++---- spec/01-unit/27-queue_spec.lua | 3 +++ .../09-hybrid_mode/09-config-compat_spec.lua | 4 ++++ 7 files changed, 25 insertions(+), 17 deletions(-) create mode 100644 changelog/unreleased/kong/feat-queue-concurrency-limit.yml delete mode 100644 changelog/unreleased/kong/http-log-concurrent-optimize.yml diff --git a/changelog/unreleased/kong/feat-queue-concurrency-limit.yml b/changelog/unreleased/kong/feat-queue-concurrency-limit.yml new file mode 100644 index 000000000000..7dbfc5e9b9ff --- /dev/null +++ b/changelog/unreleased/kong/feat-queue-concurrency-limit.yml @@ -0,0 +1,5 @@ +message: | + Added a new configuration `concurrency_limit`(integer, default to 1) for Queue to specify the number of delivery timers. + Note that setting concurrency_limit to -1 means no limit, and each HTTP Log will create a individual timer to send. +type: feature +scope: Core diff --git a/changelog/unreleased/kong/http-log-concurrent-optimize.yml b/changelog/unreleased/kong/http-log-concurrent-optimize.yml deleted file mode 100644 index 71e5de84e64f..000000000000 --- a/changelog/unreleased/kong/http-log-concurrent-optimize.yml +++ /dev/null @@ -1,3 +0,0 @@ -message: "**HTTP-Log**: Added a new configuration `queue.concurrency`" -type: feature -scope: Plugin diff --git a/kong/clustering/compat/removed_fields.lua b/kong/clustering/compat/removed_fields.lua index 94bdc42de5e9..94a33aac1526 100644 --- a/kong/clustering/compat/removed_fields.lua +++ b/kong/clustering/compat/removed_fields.lua @@ -166,7 +166,7 @@ return { opentelemetry = { "traces_endpoint", "logs_endpoint", - "queue.concurrency", + "queue.concurrency_limit", }, ai_proxy = { "max_request_body_size", @@ -212,16 +212,16 @@ return { "always_use_authenticated_groups", }, http_log = { - "queue.concurrency", + "queue.concurrency_limit", }, statsd = { - "queue.concurrency", + "queue.concurrency_limit", }, datadog = { - "queue.concurrency", + "queue.concurrency_limit", }, zipkin = { - "queue.concurrency", + "queue.concurrency_limit", }, }, } diff --git a/kong/tools/queue.lua b/kong/tools/queue.lua index 5a17e056649c..e96244a522e8 100644 --- a/kong/tools/queue.lua +++ b/kong/tools/queue.lua @@ -241,7 +241,7 @@ local function get_or_create_queue(queue_conf, handler, handler_conf) queue = setmetatable(queue, Queue_mt) - if queue.concurrency == 1 then + if queue.concurrency_limit == 1 then kong.timer:named_at("queue " .. key, 0, function(_, q) while q:count() > 0 do q:log_debug("processing queue") @@ -514,8 +514,8 @@ local function enqueue(self, entry) return nil, "entry must be a non-nil Lua value" end - - if self.concurrency == 0 then + if self.concurrency_limit == -1 then -- unlimited concurrency + -- do not enqueue when concurrency_limit is unlimited local ok, err = timer_at(0, function(premature) if premature then return @@ -639,8 +639,8 @@ function Queue.enqueue(queue_conf, handler, handler_conf, value) ) assert( - type(queue_conf.concurrency) == "number", - "arg #1 (queue_conf) concurrency must be a number" + type(queue_conf.concurrency_limit) == "number", + "arg #1 (queue_conf) concurrency_limit must be a number" ) local queue = get_or_create_queue(queue_conf, handler, handler_conf) diff --git a/kong/tools/queue_schema.lua b/kong/tools/queue_schema.lua index 01720297f07a..51d73981bd8b 100644 --- a/kong/tools/queue_schema.lua +++ b/kong/tools/queue_schema.lua @@ -49,12 +49,11 @@ return Schema.define { between = { 0.001, 1000000 }, -- effectively unlimited maximum description = "Maximum time in seconds between retries, caps exponential backoff." } }, - { concurrency = { + { concurrency_limit = { type = "integer", default = 1, - between = { 0, 1 }, - required = true, - description = "TBD11111111" + one_of = { -1, 1 }, + description = "The number of of queue delivery timers. -1 indicates unlimited." } }, } diff --git a/spec/01-unit/27-queue_spec.lua b/spec/01-unit/27-queue_spec.lua index 5f94c9d73094..5d9eeeea7e7f 100644 --- a/spec/01-unit/27-queue_spec.lua +++ b/spec/01-unit/27-queue_spec.lua @@ -799,6 +799,7 @@ describe("plugin queue", function() max_retry_time = 60, initial_retry_delay = 1, max_retry_delay = 60, + concurrency_limit = 1, } local function enqueue(queue_conf, entry) @@ -836,6 +837,7 @@ describe("plugin queue", function() max_retry_time = 60, initial_retry_delay = 1, max_retry_delay = 60, + concurrency_limit = 1, } -- should be true if the queue does not exist @@ -861,6 +863,7 @@ describe("plugin queue", function() max_retry_time = 60, initial_retry_delay = 1, max_retry_delay = 60, + concurrency_limit = 1, } -- should be true if the queue does not exist diff --git a/spec/02-integration/09-hybrid_mode/09-config-compat_spec.lua b/spec/02-integration/09-hybrid_mode/09-config-compat_spec.lua index 9eecc8ec7a45..15ae94852017 100644 --- a/spec/02-integration/09-hybrid_mode/09-config-compat_spec.lua +++ b/spec/02-integration/09-hybrid_mode/09-config-compat_spec.lua @@ -256,6 +256,7 @@ describe("CP/DP config compat transformations #" .. strategy, function() expected_otel_prior_35.config.traces_endpoint = nil expected_otel_prior_35.config.logs_endpoint = nil expected_otel_prior_35.config.endpoint = "http://1.1.1.1:12345/v1/trace" + expected_otel_prior_35.config.queue.concurrency_limit = nil do_assert(uuid(), "3.4.0", expected_otel_prior_35) @@ -281,6 +282,7 @@ describe("CP/DP config compat transformations #" .. strategy, function() expected_otel_prior_34.config.traces_endpoint = nil expected_otel_prior_34.config.logs_endpoint = nil expected_otel_prior_34.config.endpoint = "http://1.1.1.1:12345/v1/trace" + expected_otel_prior_34.config.queue.concurrency_limit = nil do_assert(uuid(), "3.3.0", expected_otel_prior_34) -- cleanup @@ -307,6 +309,7 @@ describe("CP/DP config compat transformations #" .. strategy, function() expected_zipkin_prior_35.config.header_type = "preserve" expected_zipkin_prior_35.config.default_header_type = "b3" expected_zipkin_prior_35.config.propagation = nil + expected_zipkin_prior_35.config.queue.concurrency_limit = nil do_assert(uuid(), "3.4.0", expected_zipkin_prior_35) -- cleanup @@ -328,6 +331,7 @@ describe("CP/DP config compat transformations #" .. strategy, function() expected_zipkin_prior_34.config.header_type = "preserve" expected_zipkin_prior_34.config.default_header_type = "b3" expected_zipkin_prior_34.config.propagation = nil + expected_zipkin_prior_34.config.queue.concurrency_limit = nil do_assert(uuid(), "3.3.0", expected_zipkin_prior_34) -- cleanup