Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
dndx committed Sep 5, 2024
1 parent 9bc1338 commit a49557b
Show file tree
Hide file tree
Showing 8 changed files with 210 additions and 46 deletions.
6 changes: 2 additions & 4 deletions kong/clustering/services/sync/hooks.lua
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,9 @@ function _M:register_dao_hooks(is_cp)
end

local latest_version = self.strategy:get_latest_version()

for _, node in ipairs(get_all_nodes_with_sync_cap()) do
res, err = kong.rpc:call(node, "kong.sync.v2.notify_new_version", latest_version)
res, err = kong.rpc:call(node, "kong.sync.v2.notify_new_version",
{ { namespace = "default", new_version = latest_version, }, })
if not res then
if not err:find("requested capability does not exist", nil, true) then
ngx.log(ngx.ERR, "unable to notify new version: ", err)
Expand All @@ -109,8 +109,6 @@ function _M:register_dao_hooks(is_cp)
return self.strategy:cancel_txn()
end

-- no err, we should commit delete operation

local res, err = self.strategy:commit_txn()
if not res then
return self.strategy:cancel_txn()
Expand Down
16 changes: 10 additions & 6 deletions kong/clustering/services/sync/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -7,28 +7,32 @@ local strategy = require("kong.clustering.services.sync.strategies.postgres")
local rpc = require("kong.clustering.services.sync.rpc")


function _M.new(db)
function _M.new(db, is_cp)
local strategy = strategy.new(db)

local self = {
db = db,
strategy = strategy,
hooks = hooks.new(strategy),
rpc = rpc.new(strategy),
is_cp = is_cp,
}

return setmetatable(self, _MT)
end


function _M:init(manager, is_cp)
self.hooks:register_dao_hooks(is_cp)
self.rpc:init(manager, is_cp)
function _M:init(manager)
self.hooks:register_dao_hooks(self.is_cp)
self.rpc:init(manager, self.is_cp)
end


function _M:init_worker_dp()
if ngx.worker.id() == 0 then
function _M:init_worker()
if self.is_cp then
self.strategy:init_worker()

elseif ngx.worker.id() == 0 then
assert(self.rpc:sync_once(5))
end
end
Expand Down
146 changes: 115 additions & 31 deletions kong/clustering/services/sync/rpc.lua
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ local _MT = { __index = _M, }

local semaphore = require("ngx.semaphore")
local lmdb = require("resty.lmdb")
local txn = require("resty.lmdb.transaction")
local declarative = require("kong.db.declarative")
local constants = require("kong.constants")
local concurrency = require("kong.concurrency")
Expand All @@ -13,6 +14,8 @@ local DECLARATIVE_HASH_KEY = constants.DECLARATIVE_HASH_KEY
local SYNC_MUTEX_OPTS = { name = "get_delta", timeout = 0, }
local ngx_log = ngx.log
local ngx_ERR = ngx.ERR
local ngx_INFO = ngx.INFO
local ngx_DEBUG = ngx.DEBUG


function _M.new(strategy)
Expand All @@ -26,7 +29,11 @@ end

function _M:init(manager, is_cp)
if is_cp then
manager.callbacks:register("kong.sync.v2.get_delta", function(node_id, version)
-- CP
-- Method: kong.sync.v2.get_delta
-- Params: versions: list of current versions of the database
-- { { namespace = "default", current_version = 1000, }, }
manager.callbacks:register("kong.sync.v2.get_delta", function(node_id, current_versions)
local rpc_peers
if kong.rpc then
rpc_peers = kong.rpc:get_peers()
Expand All @@ -35,28 +42,78 @@ function _M:init(manager, is_cp)
local ok, err = kong.db.clustering_data_planes:upsert({ id = node_id }, {
last_seen = ngx.time(),
hostname = node_id,
ip = "127.0.0.1",
version = "3.6.0.0",
ip = "127.0.7.1",
version = "3.7.0.0",
sync_status = "normal",
config_hash = string.format("%032d", version),
rpc_capabilities = rpc_peers and rpc_peers[node_id] or {},
rpc_capabilities = rpc_peers and rpc_peers[node_id] or {},
})
if not ok then
ngx.log(ngx.ERR, "unable to update clustering data plane status: ", err)
end

return self.strategy:get_delta(version)
for _, current_version in ipairs(current_versions) do
if current_version.namespace == "default" then
local res, err = self.strategy:get_delta(current_version.current_version)
if not res then
return nil, err
end

if #res == 0 then
ngx_log(ngx_DEBUG, "[kong.sync.v2] no delta for node_id: ", node_id,
", current_version: ", current_version.current_version,
", node is already up to date" )
return { { namespace = "default", deltas = res, wipe = false, }, }
end

-- some deltas are returned, are they contiguous?
if res[1].version ~= current_version.current_version + 1 then
-- we need to full sync because holes are found

ngx_log(ngx_INFO, "[kong.sync.v2] delta for node_id no longer available: ", node_id,
", current_version: ", current_version.current_version,
", forcing a full sync")


local deltas err = declarative.export_config_sync()
if not deltas then
return nil, err
end

return { { namespace = "default", deltas = deltas, wipe = true, }, }
end

return { { namespace = "default", deltas = res, wipe = false, }, }
end
end

return nil, "default namespace does not exist"
end)

else
-- DP
manager.callbacks:register("kong.sync.v2.notify_new_version", function(node_id, version)
local lmdb_ver = tonumber(declarative.get_current_hash()) or 0
if lmdb_ver < version then
return self:sync_once()
-- Method: kong.sync.v2.notify_new_version
-- Params: new_versions: list of namespaces and their new versions, like:
-- { { namespace = "default", new_version = 1000, }, }
manager.callbacks:register("kong.sync.v2.notify_new_version", function(node_id, new_versions)
-- currently only default is supported, and anything else is ignored
for _, new_version in ipairs(new_versions) do
if new_version.namespace == "default" then
local version = new_version.new_version
if not version then
return nil, "'new_version' key does not exist"
end

local lmdb_ver = tonumber(declarative.get_current_hash()) or 0
if lmdb_ver < version then
return self:sync_once()
end

return true
end
end

return true
return nil, "default namespace does not exist inside params"
end)
end
end
Expand All @@ -70,37 +127,64 @@ function _M:sync_once(delay)

local res, err = concurrency.with_worker_mutex(SYNC_MUTEX_OPTS, function()
for i = 1, 2 do
local delta, err = kong.rpc:call("control_plane", "kong.sync.v2.get_delta",
local ns_deltas, err = kong.rpc:call("control_plane", "kong.sync.v2.get_delta",
tonumber(declarative.get_current_hash()) or 0)
if not delta then
if not ns_deltas then
ngx.log(ngx.ERR, "sync get_delta error: ", err)
return true
end

local version = 0

for _, d in ipairs(delta) do
if d.row ~= ngx.null then
assert(kong.db[d.type]:delete({
id = d.id,
}))
assert(kong.db[d.type]:insert(d.row))

else
assert(kong.db[d.type]:delete({
id = d.id,
}))
end

if d.version ~= version then
version = d.version
assert(lmdb.set(DECLARATIVE_HASH_KEY, string.format("%032d", version)))
for _, ns_delta in ipairs(ns_deltas) do
if ns_delta.namespace == "default" then
local t = txn.begin(512)

if ns_delta.wipe then
t:db_drop(false)

local ok, err = t:commit()
if not ok then
return nil, err
end

t:reset()
end

for _, delta in ipairs(ns_delta.delta) do
if d.row ~= ngx.null then
assert(kong.db[d.type]:delete({
id = d.id,
}))
assert(kong.db[d.type]:insert(d.row))

else
assert(kong.db[d.type]:delete({
id = d.id,
}))
end

if delta.version ~= version then
version = delta.version
end
end

t:set(DECLARATIVE_HASH_KEY, string.format("%032d", version))
local ok, err = t:commit()
if not ok then
return nil, err
end

if ns_delta.wipe then
kong.core_cache:purge()
kong.cache:purge()
end

return true
end
end

if version == 0 then
return true
end
return nil, "default namespace does not exist inside params"
end

return true
Expand Down
35 changes: 35 additions & 0 deletions kong/clustering/services/sync/strategies/postgres.lua
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ local cjson = require("cjson.safe")
local string_format = string.format
local table_concat = table.concat
local cjson_encode = cjson.encode
local ngx_log = ngx.log
local ngx_ERR = ngx.ERR
local ngx_DEBUG = ngx.DEBUG


function _M.new(db)
Expand All @@ -19,6 +22,38 @@ function _M.new(db)
end


local PURGE_QUERY = [[
DELETE FROM clustering_sync_version
WHERE "version" < (
SELECT MAX("version") - %d
FROM clustering_sync_version
);
]]


function _M:init_worker()
ngx.timer.every(3600, function(premature)
if premature then
ngx_log(ngx_DEBUG, "[incremental] worker exiting, killing incremental cleanup timer")

return
end

local res, err = self.connector:query(string_format(PURGE_QUERY, 100))
if not res then
ngx_log(ngx_ERR,
"[incremental] unable to purge old data from incremental delta table, err: ",
err)

return
end

ngx_log(ngx_DEBUG,
"[incremental] successfully purged old data from incremental delta table")
end)
end


local NEW_VERSION_QUERY = [[
DO $$
DECLARE
Expand Down
40 changes: 40 additions & 0 deletions kong/db/declarative/export.lua
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,20 @@ local function export_from_db_impl(emitter, skip_ws, skip_disabled_entities, exp
return nil, err
end

local sync_version
if emitter.want_sync_version then
ok, err = db.connector:query("SELECT max(version) from clustering_sync_version", "read")
if not ok then
return nil, err
end

sync_version = ok[1][1]
end

emitter:emit_toplevel({
_format_version = "3.0",
_transform = false,
_sync_version = sync_version, -- only used by sync emitter, DP doesn't care about this
})

local disabled_services = {}
Expand Down Expand Up @@ -339,6 +350,34 @@ local function sanitize_output(entities)
end


local sync_emitter = {
emit_toplevel = function(self, tbl)
self.out = {}
self.out_n = 0
self.sync_version = tbl._sync_version
end,

emit_entity = function(self, entity_name, entity_data)
self.out_n = self.out_n + 1
self.out[self.out_n] = { row = entity_data, version = self.sync_version, }
end,

done = function(self)
return self.out
end,
}


function sync_emitter.new()
return setmetatable({ want_sync_version = true, }, { __index = sync_emitter })
end


local function export_config_sync()
return export_from_db_impl(proto_emitter.new(), false, false, false)
end


return {
convert_nulls = convert_nulls,
to_yaml_string = to_yaml_string,
Expand All @@ -347,6 +386,7 @@ return {
export_from_db = export_from_db,
export_config = export_config,
export_config_proto = export_config_proto,
export_config_sync = export_config_sync,

sanitize_output = sanitize_output,
}
1 change: 1 addition & 0 deletions kong/db/declarative/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ _M.to_yaml_file = declarative_export.to_yaml_file
_M.export_from_db = declarative_export.export_from_db
_M.export_config = declarative_export.export_config
_M.export_config_proto = declarative_export.export_config_proto
_M.export_config_sync = declarative_export.export_config_sync
_M.sanitize_output = declarative_export.sanitize_output


Expand Down
2 changes: 1 addition & 1 deletion kong/db/migrations/core/024_370_to_380.lua
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ return {
"id" UUID NOT NULL,
"ws_id" UUID NOT NULL,
"row" JSON,
FOREIGN KEY (version) REFERENCES clustering_sync_version(version)
FOREIGN KEY (version) REFERENCES clustering_sync_version(version) ON DELETE CASCADE
);
CREATE INDEX clustering_sync_delta_version_idx ON clustering_sync_delta (version);
END;
Expand Down
Loading

0 comments on commit a49557b

Please sign in to comment.