-
Notifications
You must be signed in to change notification settings - Fork 4.8k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
fix(opentelemetry): increase default queue batch size
migration to update the wrongly set default queue batch size to 200 adapt test to run only for 3.x (cherry picked from commit b0940b2)
- Loading branch information
1 parent
2f9ee04
commit 01479b9
Showing
4 changed files
with
154 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,120 @@ | ||
-- Helper module for 331_to_332 migration operations. | ||
-- | ||
-- Operations are versioned and specific to a migration so they remain | ||
-- fixed in time and are not modified for use in future migrations. | ||
-- | ||
-- If you want to reuse these operations in a future migration, | ||
-- copy the functions over to a new versioned module. | ||
|
||
|
||
local function render(template, keys) | ||
return (template:gsub("$%(([A-Z_]+)%)", keys)) | ||
end | ||
|
||
|
||
-------------------------------------------------------------------------------- | ||
-- Postgres operations for Workspace migration | ||
-------------------------------------------------------------------------------- | ||
|
||
|
||
local postgres = { | ||
|
||
up = {}, | ||
|
||
teardown = { | ||
|
||
------------------------------------------------------------------------------ | ||
-- General function to fixup a plugin configuration | ||
fixup_plugin_config = function(_, connector, plugin_name, fixup_fn) | ||
local pgmoon_json = require("pgmoon.json") | ||
local select_plugin = render( | ||
"SELECT id, name, config FROM plugins WHERE name = '$(NAME)'", { | ||
NAME = plugin_name | ||
}) | ||
|
||
local plugins, err = connector:query(select_plugin) | ||
if not plugins then | ||
return nil, err | ||
end | ||
|
||
for _, plugin in ipairs(plugins) do | ||
local fix = fixup_fn(plugin.config) | ||
if fix then | ||
local sql = render( | ||
"UPDATE plugins SET config = $(NEW_CONFIG)::jsonb WHERE id = '$(ID)'", { | ||
NEW_CONFIG = pgmoon_json.encode_json(plugin.config), | ||
ID = plugin.id, | ||
}) | ||
|
||
local _, err = connector:query(sql) | ||
if err then | ||
return nil, err | ||
end | ||
end | ||
end | ||
|
||
return true | ||
end, | ||
}, | ||
|
||
} | ||
|
||
-------------------------------------------------------------------------------- | ||
-- Cassandra operations for Workspace migration | ||
-------------------------------------------------------------------------------- | ||
|
||
|
||
local cassandra = { | ||
|
||
up = {}, | ||
|
||
teardown = { | ||
|
||
------------------------------------------------------------------------------ | ||
-- General function to fixup a plugin configuration | ||
fixup_plugin_config = function(_, connector, plugin_name, fixup_fn) | ||
local coordinator = assert(connector:get_stored_connection()) | ||
local cassandra = require("cassandra") | ||
local cjson = require("cjson") | ||
|
||
for rows, err in coordinator:iterate("SELECT id, name, config FROM plugins") do | ||
if err then | ||
return nil, err | ||
end | ||
|
||
for i = 1, #rows do | ||
local plugin = rows[i] | ||
if plugin.name == plugin_name then | ||
if type(plugin.config) ~= "string" then | ||
return nil, "plugin config is not a string" | ||
end | ||
local config = cjson.decode(plugin.config) | ||
local fix = fixup_fn(config) | ||
|
||
if fix then | ||
local _, err = coordinator:execute("UPDATE plugins SET config = ? WHERE id = ?", { | ||
cassandra.text(cjson.encode(config)), | ||
cassandra.uuid(plugin.id) | ||
}) | ||
if err then | ||
return nil, err | ||
end | ||
end | ||
end | ||
end | ||
end | ||
|
||
return true | ||
end, | ||
|
||
} | ||
|
||
} | ||
|
||
-------------------------------------------------------------------------------- | ||
|
||
|
||
return { | ||
postgres = postgres, | ||
cassandra = cassandra, | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
local operations = require "kong.db.migrations.operations.331_to_332" | ||
|
||
|
||
local function ws_migration_teardown(ops) | ||
return function(connector) | ||
return ops:fixup_plugin_config(connector, "opentelemetry", function(config) | ||
if config.queue.max_batch_size == 1 then | ||
config.queue.max_batch_size = 200 | ||
return true | ||
end | ||
|
||
return false | ||
end) | ||
end | ||
end | ||
|
||
|
||
return { | ||
postgres = { | ||
up = "", | ||
teardown = ws_migration_teardown(operations.postgres.teardown), | ||
}, | ||
|
||
cassandra = { | ||
up = "", | ||
teardown = ws_migration_teardown(operations.cassandra.teardown), | ||
}, | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
return { | ||
"001_331_to_332", | ||
} |