Skip to content

Commit

Permalink
fix(opentelemetry): increase default queue batch size
Browse files Browse the repository at this point in the history
migration to update the wrongly set default queue batch size to 200

adapt test to run only for 3.x

(cherry picked from commit b0940b2)
  • Loading branch information
samugi committed Feb 16, 2024
1 parent 73ff67c commit a74938a
Show file tree
Hide file tree
Showing 6 changed files with 247 additions and 1 deletion.
3 changes: 3 additions & 0 deletions kong-3.3.2-0.rockspec
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ build = {
["kong.db.migrations.operations.210_to_211"] = "kong/db/migrations/operations/210_to_211.lua",
["kong.db.migrations.operations.212_to_213"] = "kong/db/migrations/operations/212_to_213.lua",
["kong.db.migrations.operations.280_to_300"] = "kong/db/migrations/operations/280_to_300.lua",
["kong.db.migrations.operations.331_to_332"] = "kong/db/migrations/operations/331_to_332.lua",
["kong.db.migrations.migrate_path_280_300"] = "kong/db/migrations/migrate_path_280_300.lua",
["kong.db.declarative.migrations"] = "kong/db/declarative/migrations/init.lua",
["kong.db.declarative.migrations.route_path"] = "kong/db/declarative/migrations/route_path.lua",
Expand Down Expand Up @@ -516,6 +517,8 @@ build = {
["kong.plugins.azure-functions.handler"] = "kong/plugins/azure-functions/handler.lua",
["kong.plugins.azure-functions.schema"] = "kong/plugins/azure-functions/schema.lua",

["kong.plugins.opentelemetry.migrations"] = "kong/plugins/opentelemetry/migrations/init.lua",
["kong.plugins.opentelemetry.migrations.001_331_to_332"] = "kong/plugins/opentelemetry/migrations/001_331_to_332.lua",
["kong.plugins.opentelemetry.handler"] = "kong/plugins/opentelemetry/handler.lua",
["kong.plugins.opentelemetry.schema"] = "kong/plugins/opentelemetry/schema.lua",
["kong.plugins.opentelemetry.proto"] = "kong/plugins/opentelemetry/proto.lua",
Expand Down
120 changes: 120 additions & 0 deletions kong/db/migrations/operations/331_to_332.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
-- Helper module for 331_to_332 migration operations.
--
-- Operations are versioned and specific to a migration so they remain
-- fixed in time and are not modified for use in future migrations.
--
-- If you want to reuse these operations in a future migration,
-- copy the functions over to a new versioned module.


local function render(template, keys)
return (template:gsub("$%(([A-Z_]+)%)", keys))
end


--------------------------------------------------------------------------------
-- Postgres operations for Workspace migration
--------------------------------------------------------------------------------


local postgres = {

up = {},

teardown = {

------------------------------------------------------------------------------
-- General function to fixup a plugin configuration
fixup_plugin_config = function(_, connector, plugin_name, fixup_fn)
local pgmoon_json = require("pgmoon.json")
local select_plugin = render(
"SELECT id, name, config FROM plugins WHERE name = '$(NAME)'", {
NAME = plugin_name
})

local plugins, err = connector:query(select_plugin)
if not plugins then
return nil, err
end

for _, plugin in ipairs(plugins) do
local fix = fixup_fn(plugin.config)
if fix then
local sql = render(
"UPDATE plugins SET config = $(NEW_CONFIG)::jsonb WHERE id = '$(ID)'", {
NEW_CONFIG = pgmoon_json.encode_json(plugin.config),
ID = plugin.id,
})

local _, err = connector:query(sql)
if err then
return nil, err
end
end
end

return true
end,
},

}

--------------------------------------------------------------------------------
-- Cassandra operations for Workspace migration
--------------------------------------------------------------------------------


local cassandra = {

up = {},

teardown = {

------------------------------------------------------------------------------
-- General function to fixup a plugin configuration
fixup_plugin_config = function(_, connector, plugin_name, fixup_fn)
local coordinator = assert(connector:get_stored_connection())
local cassandra = require("cassandra")
local cjson = require("cjson")

for rows, err in coordinator:iterate("SELECT id, name, config FROM plugins") do
if err then
return nil, err
end

for i = 1, #rows do
local plugin = rows[i]
if plugin.name == plugin_name then
if type(plugin.config) ~= "string" then
return nil, "plugin config is not a string"
end
local config = cjson.decode(plugin.config)
local fix = fixup_fn(config)

if fix then
local _, err = coordinator:execute("UPDATE plugins SET config = ? WHERE id = ?", {
cassandra.text(cjson.encode(config)),
cassandra.uuid(plugin.id)
})
if err then
return nil, err
end
end
end
end
end

return true
end,

}

}

--------------------------------------------------------------------------------


return {
postgres = postgres,
cassandra = cassandra,
}
28 changes: 28 additions & 0 deletions kong/plugins/opentelemetry/migrations/001_331_to_332.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
local operations = require "kong.db.migrations.operations.331_to_332"


local function ws_migration_teardown(ops)
return function(connector)
return ops:fixup_plugin_config(connector, "opentelemetry", function(config)
if config.queue.max_batch_size == 1 then
config.queue.max_batch_size = 200
return true
end

return false
end)
end
end


return {
postgres = {
up = "",
teardown = ws_migration_teardown(operations.postgres.teardown),
},

cassandra = {
up = "",
teardown = ws_migration_teardown(operations.cassandra.teardown),
},
}
3 changes: 3 additions & 0 deletions kong/plugins/opentelemetry/migrations/init.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
return {
"001_331_to_332",
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@

local cjson = require "cjson"
local uh = require "spec.upgrade_helpers"


if uh.database_type() == 'postgres' then
local handler = uh.get_busted_handler("3.3.0", "3.6.0")
handler("opentelemetry plugin migration", function()
lazy_setup(function()
assert(uh.start_kong())
end)

lazy_teardown(function ()
assert(uh.stop_kong(nil, true))
end)

uh.setup(function ()
local admin_client = assert(uh.admin_client())

local res = assert(admin_client:send {
method = "POST",
path = "/plugins/",
body = {
name = "opentelemetry",
config = {
endpoint = "http://localhost:8080/v1/traces",
}
},
headers = {
["Content-Type"] = "application/json"
}
})
local body = assert.res_status(201, res)
local json = cjson.decode(body)
-- assert that value of old default is 1
assert.equals(json.config.queue.max_batch_size, 1)
admin_client:close()
end)

uh.new_after_finish("has updated opentelemetry queue max_batch_size configuration", function ()
local admin_client = assert(uh.admin_client())
local res = assert(admin_client:send {
method = "GET",
path = "/plugins/"
})
local body = cjson.decode(assert.res_status(200, res))
assert.equal(1, #body.data)

local plugin = body.data[1]
assert.equal("opentelemetry", plugin.name)
assert.equals(200, plugin.config.queue.max_batch_size)
admin_client:close()
end)
end)
end
39 changes: 38 additions & 1 deletion spec/upgrade_helpers.lua
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,42 @@ local function all_phases(phrase, f)
return it_when("all_phases", phrase, f)
end


--- Get a Busted test handler for migration tests.
--
-- This convenience function determines the appropriate Busted handler
-- (`busted.describe` or `busted.pending`) based on the "old Kong version"
-- that migrations are running on and the specified version range.
--
-- @function get_busted_handler
-- @param min_version The minimum Kong version (inclusive)
-- @param max_version The maximum Kong version (inclusive)
-- @return `busted.describe` if Kong's version is within the specified range,
-- `busted.pending` otherwise.
-- @usage
-- local handler = get_busted_handler("3.3.0", "3.6.0")
-- handler("some migration test", function() ... end)
local get_busted_handler
do
local function get_version_num(v1, v2)
if v2 then
assert(#v2 == #v1, string.format("different version format: %s and %s", v1, v2))
end
return assert(tonumber((v1:gsub("%.", ""))), "invalid version: " .. v1)
end

function get_busted_handler(min_version, max_version)
local old_version_var = assert(os.getenv("OLD_KONG_VERSION"), "old version not set")
local old_version = string.match(old_version_var, "[^/]+$")

local old_version_num = get_version_num(old_version)
local min_v_num = min_version and get_version_num(min_version, old_version) or 0
local max_v_num = max_version and get_version_num(max_version, old_version) or math.huge

return old_version_num >= min_v_num and old_version_num <= max_v_num and busted.describe or busted.pending
end
end

return {
database_type = database_type,
get_database = get_database,
Expand All @@ -180,5 +216,6 @@ return {
old_after_up = old_after_up,
new_after_up = new_after_up,
new_after_finish = new_after_finish,
all_phases = all_phases
all_phases = all_phases,
get_busted_handler = get_busted_handler,
}

0 comments on commit a74938a

Please sign in to comment.