Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(clustering/rpc): simplify the implementation #14026

Merged
merged 14 commits into from
Dec 23, 2024
1 change: 1 addition & 0 deletions kong-3.10.0-0.rockspec
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,7 @@ build = {
["kong.db.migrations.core.022_350_to_360"] = "kong/db/migrations/core/022_350_to_360.lua",
["kong.db.migrations.core.023_360_to_370"] = "kong/db/migrations/core/023_360_to_370.lua",
["kong.db.migrations.core.024_380_to_390"] = "kong/db/migrations/core/024_380_to_390.lua",
["kong.db.migrations.core.025_390_to_3100"] = "kong/db/migrations/core/025_390_to_3100.lua",
["kong.db.migrations.operations.200_to_210"] = "kong/db/migrations/operations/200_to_210.lua",
["kong.db.migrations.operations.212_to_213"] = "kong/db/migrations/operations/212_to_213.lua",
["kong.db.migrations.operations.280_to_300"] = "kong/db/migrations/operations/280_to_300.lua",
Expand Down
58 changes: 2 additions & 56 deletions kong/clustering/services/sync/hooks.lua
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ local EMPTY = require("kong.tools.table").EMPTY


local ipairs = ipairs
local ngx_null = ngx.null
local ngx_log = ngx.log
local ngx_ERR = ngx.ERR
local ngx_DEBUG = ngx.DEBUG
Expand Down Expand Up @@ -74,29 +73,8 @@ function _M:notify_all_nodes()
end


local function gen_delta(entity, name, options, ws_id, is_delete)
-- composite key, like { id = ... }
local schema = kong.db[name].schema
local pk = schema:extract_pk_values(entity)

assert(schema:validate_primary_key(pk))

local delta = {
type = name,
pk = pk,
ws_id = ws_id,
entity = is_delete and ngx_null or entity,
}

return delta
end


function _M:entity_delta_writer(entity, name, options, ws_id, is_delete)
local d = gen_delta(entity, name, options, ws_id, is_delete)
local deltas = { d, }

local res, err = self.strategy:insert_delta(deltas)
local res, err = self.strategy:insert_delta()
if not res then
self.strategy:cancel_txn()
return nil, err
Expand Down Expand Up @@ -164,39 +142,7 @@ function _M:register_dao_hooks()

ngx_log(ngx_DEBUG, "[kong.sync.v2] new delta due to deleting ", name)

-- set lmdb value to ngx_null then return entity

local d = gen_delta(entity, name, options, ws_id, true)
local deltas = { d, }

-- delete other related entities
for i, item in ipairs(cascade_entries or EMPTY) do
local e = item.entity
local name = item.dao.schema.name

ngx_log(ngx_DEBUG, "[kong.sync.v2] new delta due to cascade deleting ", name)

d = gen_delta(e, name, options, e.ws_id, true)

-- #1 item is initial entity
deltas[i + 1] = d
end

local res, err = self.strategy:insert_delta(deltas)
if not res then
self.strategy:cancel_txn()
return nil, err
end

res, err = self.strategy:commit_txn()
if not res then
self.strategy:cancel_txn()
return nil, err
end

self:notify_all_nodes()

return entity -- for other hooks
return self:entity_delta_writer(entity, name, options, ws_id)
end

local dao_hooks = {
chobits marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
53 changes: 4 additions & 49 deletions kong/clustering/services/sync/rpc.lua
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,9 @@ local fmt = string.format
local ngx_null = ngx.null
local ngx_log = ngx.log
local ngx_ERR = ngx.ERR
local ngx_INFO = ngx.INFO
local ngx_DEBUG = ngx.DEBUG


-- number of versions behind before a full sync is forced
local DEFAULT_FULL_SYNC_THRESHOLD = 512


function _M.new(strategy)
local self = {
strategy = strategy,
Expand All @@ -43,8 +38,8 @@ function _M.new(strategy)
end


local function inc_sync_result(res)
return { default = { deltas = res, wipe = false, }, }
local function empty_sync_result()
return { default = { deltas = {}, wipe = false, }, }
end


Expand All @@ -62,10 +57,6 @@ end
function _M:init_cp(manager)
local purge_delay = manager.conf.cluster_data_plane_purge_delay

-- number of versions behind before a full sync is forced
local FULL_SYNC_THRESHOLD = manager.conf.cluster_full_sync_threshold or
DEFAULT_FULL_SYNC_THRESHOLD

-- CP
-- Method: kong.sync.v2.get_delta
-- Params: versions: list of current versions of the database
Expand Down Expand Up @@ -107,48 +98,12 @@ function _M:init_cp(manager)
return nil, err
end

-- is the node empty? If so, just do a full sync to bring it up to date faster
if default_namespace_version == 0 or
chobits marked this conversation as resolved.
Show resolved Hide resolved
latest_version - default_namespace_version > FULL_SYNC_THRESHOLD
then
-- we need to full sync because holes are found

ngx_log(ngx_INFO,
"[kong.sync.v2] database is empty or too far behind for node_id: ", node_id,
", current_version: ", default_namespace_version,
", forcing a full sync")

default_namespace_version < latest_version then
return full_sync_result()
end

-- do we need an incremental sync?

local res, err = self.strategy:get_delta(default_namespace_version)
if not res then
return nil, err
end

if isempty(res) then
-- node is already up to date
return inc_sync_result(res)
end

-- some deltas are returned, are they contiguous?
if res[1].version == default_namespace_version + 1 then
-- doesn't wipe dp lmdb, incremental sync
return inc_sync_result(res)
end

-- we need to full sync because holes are found
-- in the delta, meaning the oldest version is no longer
-- available

ngx_log(ngx_INFO,
"[kong.sync.v2] delta for node_id no longer available: ", node_id,
", current_version: ", default_namespace_version,
", forcing a full sync")

return full_sync_result()
return empty_sync_result()
end)
end

Expand Down
75 changes: 3 additions & 72 deletions kong/clustering/services/sync/strategies/postgres.lua
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,7 @@ local _M = {}
local _MT = { __index = _M }


local cjson = require("cjson.safe")
local buffer = require("string.buffer")


local string_format = string.format
local cjson_encode = cjson.encode
local ngx_null = ngx.null
local ngx_log = ngx.log
local ngx_ERR = ngx.ERR


local KEEP_VERSION_COUNT = 100
local CLEANUP_TIME_DELAY = 3600 -- 1 hour


function _M.new(db)
Expand All @@ -26,32 +14,8 @@ function _M.new(db)
end


local PURGE_QUERY = [[
DELETE FROM clustering_sync_version
WHERE "version" < (
SELECT MAX("version") - %d
FROM clustering_sync_version
);
]]


-- reserved for future
function _M:init_worker()
local function cleanup_handler(premature)
if premature then
return
end

local res, err = self.connector:query(string_format(PURGE_QUERY, KEEP_VERSION_COUNT))
if not res then
ngx_log(ngx_ERR,
"[incremental] unable to purge old data from incremental delta table, err: ",
err)

return
end
end

assert(ngx.timer.every(CLEANUP_TIME_DELAY, cleanup_handler))
end


Expand All @@ -61,37 +25,12 @@ local NEW_VERSION_QUERY = [[
new_version integer;
BEGIN
INSERT INTO clustering_sync_version DEFAULT VALUES RETURNING version INTO new_version;
INSERT INTO clustering_sync_delta (version, type, pk, ws_id, entity) VALUES %s;
END $$;
]]


-- deltas: {
-- { type = "service", "pk" = { id = "d78eb00f..." }, "ws_id" = "73478cf6...", entity = "JSON", }
-- { type = "route", "pk" = { id = "0a5bac5c..." }, "ws_id" = "73478cf6...", entity = "JSON", }
-- }
function _M:insert_delta(deltas)
local buf = buffer.new()

local count = #deltas
for i = 1, count do
local d = deltas[i]

buf:putf("(new_version, %s, %s, %s, %s)",
self.connector:escape_literal(d.type),
self.connector:escape_literal(cjson_encode(d.pk)),
self.connector:escape_literal(d.ws_id or kong.default_workspace),
self.connector:escape_literal(cjson_encode(d.entity)))

-- sql values should be separated by comma
if i < count then
buf:put(",")
end
end

local sql = string_format(NEW_VERSION_QUERY, buf:get())

return self.connector:query(sql)
function _M:insert_delta()
return self.connector:query(NEW_VERSION_QUERY)
end


Expand All @@ -112,14 +51,6 @@ function _M:get_latest_version()
end


function _M:get_delta(version)
local sql = "SELECT * FROM clustering_sync_delta" ..
" WHERE version > " .. self.connector:escape_literal(version) ..
" ORDER BY version ASC"
return self.connector:query(sql)
end


function _M:begin_txn()
return self.connector:query("BEGIN;")
end
Expand Down
1 change: 0 additions & 1 deletion kong/conf_loader/constants.lua
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,6 @@ local CONF_PARSERS = {
cluster_dp_labels = { typ = "array" },
cluster_rpc = { typ = "boolean" },
cluster_rpc_sync = { typ = "boolean" },
cluster_full_sync_threshold = { typ = "number" },
cluster_cjson = { typ = "boolean" },

kic = { typ = "boolean" },
Expand Down
9 changes: 0 additions & 9 deletions kong/db/migrations/core/024_380_to_390.lua
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,6 @@ return {
CREATE TABLE IF NOT EXISTS clustering_sync_version (
"version" SERIAL PRIMARY KEY
);
CREATE TABLE IF NOT EXISTS clustering_sync_delta (
"version" INT NOT NULL,
"type" TEXT NOT NULL,
"pk" JSON NOT NULL,
"ws_id" UUID NOT NULL,
"entity" JSON,
FOREIGN KEY (version) REFERENCES clustering_sync_version(version) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS clustering_sync_delta_version_idx ON clustering_sync_delta (version);
END;
$$;
]]
Expand Down
11 changes: 11 additions & 0 deletions kong/db/migrations/core/025_390_to_3100.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
return {
postgres = {
up = [[
DO $$
DROP TABLE IF EXISTS clustering_sync_delta;
DROP INDEX IF EXISTS clustering_sync_delta_version_idx;
END;
$$;
]]
}
}
1 change: 0 additions & 1 deletion kong/templates/kong_defaults.lua
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ cluster_use_proxy = off
cluster_dp_labels = NONE
cluster_rpc = off
cluster_rpc_sync = off
cluster_full_sync_threshold = 512
cluster_cjson = off
lmdb_environment_path = dbless.lmdb
Expand Down
4 changes: 4 additions & 0 deletions spec/02-integration/09-hybrid_mode/01-sync_spec.lua
Original file line number Diff line number Diff line change
Expand Up @@ -732,6 +732,10 @@ describe("CP/DP config sync #" .. strategy .. " rpc_sync=" .. rpc_sync, function
end
end, 5)

if rpc_sync == "on" then
ngx.sleep(0.5)
end

chobits marked this conversation as resolved.
Show resolved Hide resolved
for i = 5, 2, -1 do
res = proxy_client:get("/" .. i)
assert.res_status(404, res)
Expand Down
1 change: 0 additions & 1 deletion spec/02-integration/09-hybrid_mode/08-lazy_export_spec.lua
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ describe("lazy_export with #".. strategy .. " rpc_sync=" .. rpc_sync, function()
touch_config()
if rpc_sync == "on" then
assert.logfile().has.line("[kong.sync.v2] config push (connected client)", true)
assert.logfile().has.line("[kong.sync.v2] database is empty or too far behind for node_id", true)

else
assert.logfile().has.line("[clustering] exporting config", true)
Expand Down
Loading
Loading