From a49557bbf30902f19b14fdc4c4b24f329e7890db Mon Sep 17 00:00:00 2001 From: Datong Sun Date: Thu, 5 Sep 2024 00:15:51 -0700 Subject: [PATCH] WIP --- kong/clustering/services/sync/hooks.lua | 6 +- kong/clustering/services/sync/init.lua | 16 +- kong/clustering/services/sync/rpc.lua | 146 ++++++++++++++---- .../services/sync/strategies/postgres.lua | 35 +++++ kong/db/declarative/export.lua | 40 +++++ kong/db/declarative/init.lua | 1 + kong/db/migrations/core/024_370_to_380.lua | 2 +- kong/init.lua | 10 +- 8 files changed, 210 insertions(+), 46 deletions(-) diff --git a/kong/clustering/services/sync/hooks.lua b/kong/clustering/services/sync/hooks.lua index b9b4d3d0fa00..a5292a6fa47e 100644 --- a/kong/clustering/services/sync/hooks.lua +++ b/kong/clustering/services/sync/hooks.lua @@ -82,9 +82,9 @@ function _M:register_dao_hooks(is_cp) end local latest_version = self.strategy:get_latest_version() - for _, node in ipairs(get_all_nodes_with_sync_cap()) do - res, err = kong.rpc:call(node, "kong.sync.v2.notify_new_version", latest_version) + res, err = kong.rpc:call(node, "kong.sync.v2.notify_new_version", + { { namespace = "default", new_version = latest_version, }, }) if not res then if not err:find("requested capability does not exist", nil, true) then ngx.log(ngx.ERR, "unable to notify new version: ", err) @@ -109,8 +109,6 @@ function _M:register_dao_hooks(is_cp) return self.strategy:cancel_txn() end - -- no err, we should commit delete operation - local res, err = self.strategy:commit_txn() if not res then return self.strategy:cancel_txn() diff --git a/kong/clustering/services/sync/init.lua b/kong/clustering/services/sync/init.lua index 593a9aaec823..423372f5a828 100644 --- a/kong/clustering/services/sync/init.lua +++ b/kong/clustering/services/sync/init.lua @@ -7,7 +7,7 @@ local strategy = require("kong.clustering.services.sync.strategies.postgres") local rpc = require("kong.clustering.services.sync.rpc") -function _M.new(db) +function _M.new(db, is_cp) local strategy = strategy.new(db) local self = { @@ -15,20 +15,24 @@ function _M.new(db) strategy = strategy, hooks = hooks.new(strategy), rpc = rpc.new(strategy), + is_cp = is_cp, } return setmetatable(self, _MT) end -function _M:init(manager, is_cp) - self.hooks:register_dao_hooks(is_cp) - self.rpc:init(manager, is_cp) +function _M:init(manager) + self.hooks:register_dao_hooks(self.is_cp) + self.rpc:init(manager, self.is_cp) end -function _M:init_worker_dp() - if ngx.worker.id() == 0 then +function _M:init_worker() + if self.is_cp then + self.strategy:init_worker() + + elseif ngx.worker.id() == 0 then assert(self.rpc:sync_once(5)) end end diff --git a/kong/clustering/services/sync/rpc.lua b/kong/clustering/services/sync/rpc.lua index bcb49dcc1a7c..721c3cbed3df 100644 --- a/kong/clustering/services/sync/rpc.lua +++ b/kong/clustering/services/sync/rpc.lua @@ -4,6 +4,7 @@ local _MT = { __index = _M, } local semaphore = require("ngx.semaphore") local lmdb = require("resty.lmdb") +local txn = require("resty.lmdb.transaction") local declarative = require("kong.db.declarative") local constants = require("kong.constants") local concurrency = require("kong.concurrency") @@ -13,6 +14,8 @@ local DECLARATIVE_HASH_KEY = constants.DECLARATIVE_HASH_KEY local SYNC_MUTEX_OPTS = { name = "get_delta", timeout = 0, } local ngx_log = ngx.log local ngx_ERR = ngx.ERR +local ngx_INFO = ngx.INFO +local ngx_DEBUG = ngx.DEBUG function _M.new(strategy) @@ -26,7 +29,11 @@ end function _M:init(manager, is_cp) if is_cp then - manager.callbacks:register("kong.sync.v2.get_delta", function(node_id, version) + -- CP + -- Method: kong.sync.v2.get_delta + -- Params: versions: list of current versions of the database + -- { { namespace = "default", current_version = 1000, }, } + manager.callbacks:register("kong.sync.v2.get_delta", function(node_id, current_versions) local rpc_peers if kong.rpc then rpc_peers = kong.rpc:get_peers() @@ -35,28 +42,78 @@ function _M:init(manager, is_cp) local ok, err = kong.db.clustering_data_planes:upsert({ id = node_id }, { last_seen = ngx.time(), hostname = node_id, - ip = "127.0.0.1", - version = "3.6.0.0", + ip = "127.0.7.1", + version = "3.7.0.0", sync_status = "normal", config_hash = string.format("%032d", version), - rpc_capabilities = rpc_peers and rpc_peers[node_id] or {}, + rpc_capabilities = rpc_peers and rpc_peers[node_id] or {}, }) if not ok then ngx.log(ngx.ERR, "unable to update clustering data plane status: ", err) end - return self.strategy:get_delta(version) + for _, current_version in ipairs(current_versions) do + if current_version.namespace == "default" then + local res, err = self.strategy:get_delta(current_version.current_version) + if not res then + return nil, err + end + + if #res == 0 then + ngx_log(ngx_DEBUG, "[kong.sync.v2] no delta for node_id: ", node_id, + ", current_version: ", current_version.current_version, + ", node is already up to date" ) + return { { namespace = "default", deltas = res, wipe = false, }, } + end + + -- some deltas are returned, are they contiguous? + if res[1].version ~= current_version.current_version + 1 then + -- we need to full sync because holes are found + + ngx_log(ngx_INFO, "[kong.sync.v2] delta for node_id no longer available: ", node_id, + ", current_version: ", current_version.current_version, + ", forcing a full sync") + + + local deltas err = declarative.export_config_sync() + if not deltas then + return nil, err + end + + return { { namespace = "default", deltas = deltas, wipe = true, }, } + end + + return { { namespace = "default", deltas = res, wipe = false, }, } + end + end + + return nil, "default namespace does not exist" end) else -- DP - manager.callbacks:register("kong.sync.v2.notify_new_version", function(node_id, version) - local lmdb_ver = tonumber(declarative.get_current_hash()) or 0 - if lmdb_ver < version then - return self:sync_once() + -- Method: kong.sync.v2.notify_new_version + -- Params: new_versions: list of namespaces and their new versions, like: + -- { { namespace = "default", new_version = 1000, }, } + manager.callbacks:register("kong.sync.v2.notify_new_version", function(node_id, new_versions) + -- currently only default is supported, and anything else is ignored + for _, new_version in ipairs(new_versions) do + if new_version.namespace == "default" then + local version = new_version.new_version + if not version then + return nil, "'new_version' key does not exist" + end + + local lmdb_ver = tonumber(declarative.get_current_hash()) or 0 + if lmdb_ver < version then + return self:sync_once() + end + + return true + end end - return true + return nil, "default namespace does not exist inside params" end) end end @@ -70,37 +127,64 @@ function _M:sync_once(delay) local res, err = concurrency.with_worker_mutex(SYNC_MUTEX_OPTS, function() for i = 1, 2 do - local delta, err = kong.rpc:call("control_plane", "kong.sync.v2.get_delta", + local ns_deltas, err = kong.rpc:call("control_plane", "kong.sync.v2.get_delta", tonumber(declarative.get_current_hash()) or 0) - if not delta then + if not ns_deltas then ngx.log(ngx.ERR, "sync get_delta error: ", err) return true end local version = 0 - for _, d in ipairs(delta) do - if d.row ~= ngx.null then - assert(kong.db[d.type]:delete({ - id = d.id, - })) - assert(kong.db[d.type]:insert(d.row)) - - else - assert(kong.db[d.type]:delete({ - id = d.id, - })) - end - - if d.version ~= version then - version = d.version - assert(lmdb.set(DECLARATIVE_HASH_KEY, string.format("%032d", version))) + for _, ns_delta in ipairs(ns_deltas) do + if ns_delta.namespace == "default" then + local t = txn.begin(512) + + if ns_delta.wipe then + t:db_drop(false) + + local ok, err = t:commit() + if not ok then + return nil, err + end + + t:reset() + end + + for _, delta in ipairs(ns_delta.delta) do + if d.row ~= ngx.null then + assert(kong.db[d.type]:delete({ + id = d.id, + })) + assert(kong.db[d.type]:insert(d.row)) + + else + assert(kong.db[d.type]:delete({ + id = d.id, + })) + end + + if delta.version ~= version then + version = delta.version + end + end + + t:set(DECLARATIVE_HASH_KEY, string.format("%032d", version)) + local ok, err = t:commit() + if not ok then + return nil, err + end + + if ns_delta.wipe then + kong.core_cache:purge() + kong.cache:purge() + end + + return true end end - if version == 0 then - return true - end + return nil, "default namespace does not exist inside params" end return true diff --git a/kong/clustering/services/sync/strategies/postgres.lua b/kong/clustering/services/sync/strategies/postgres.lua index 57110af4353b..78ad1c4d4492 100644 --- a/kong/clustering/services/sync/strategies/postgres.lua +++ b/kong/clustering/services/sync/strategies/postgres.lua @@ -8,6 +8,9 @@ local cjson = require("cjson.safe") local string_format = string.format local table_concat = table.concat local cjson_encode = cjson.encode +local ngx_log = ngx.log +local ngx_ERR = ngx.ERR +local ngx_DEBUG = ngx.DEBUG function _M.new(db) @@ -19,6 +22,38 @@ function _M.new(db) end +local PURGE_QUERY = [[ + DELETE FROM clustering_sync_version + WHERE "version" < ( + SELECT MAX("version") - %d + FROM clustering_sync_version + ); +]] + + +function _M:init_worker() + ngx.timer.every(3600, function(premature) + if premature then + ngx_log(ngx_DEBUG, "[incremental] worker exiting, killing incremental cleanup timer") + + return + end + + local res, err = self.connector:query(string_format(PURGE_QUERY, 100)) + if not res then + ngx_log(ngx_ERR, + "[incremental] unable to purge old data from incremental delta table, err: ", + err) + + return + end + + ngx_log(ngx_DEBUG, + "[incremental] successfully purged old data from incremental delta table") + end) +end + + local NEW_VERSION_QUERY = [[ DO $$ DECLARE diff --git a/kong/db/declarative/export.lua b/kong/db/declarative/export.lua index c3b6b8c1366b..b94d7b3be77e 100644 --- a/kong/db/declarative/export.lua +++ b/kong/db/declarative/export.lua @@ -117,9 +117,20 @@ local function export_from_db_impl(emitter, skip_ws, skip_disabled_entities, exp return nil, err end + local sync_version + if emitter.want_sync_version then + ok, err = db.connector:query("SELECT max(version) from clustering_sync_version", "read") + if not ok then + return nil, err + end + + sync_version = ok[1][1] + end + emitter:emit_toplevel({ _format_version = "3.0", _transform = false, + _sync_version = sync_version, -- only used by sync emitter, DP doesn't care about this }) local disabled_services = {} @@ -339,6 +350,34 @@ local function sanitize_output(entities) end +local sync_emitter = { + emit_toplevel = function(self, tbl) + self.out = {} + self.out_n = 0 + self.sync_version = tbl._sync_version + end, + + emit_entity = function(self, entity_name, entity_data) + self.out_n = self.out_n + 1 + self.out[self.out_n] = { row = entity_data, version = self.sync_version, } + end, + + done = function(self) + return self.out + end, +} + + +function sync_emitter.new() + return setmetatable({ want_sync_version = true, }, { __index = sync_emitter }) +end + + +local function export_config_sync() + return export_from_db_impl(proto_emitter.new(), false, false, false) +end + + return { convert_nulls = convert_nulls, to_yaml_string = to_yaml_string, @@ -347,6 +386,7 @@ return { export_from_db = export_from_db, export_config = export_config, export_config_proto = export_config_proto, + export_config_sync = export_config_sync, sanitize_output = sanitize_output, } diff --git a/kong/db/declarative/init.lua b/kong/db/declarative/init.lua index a6a13878b016..89514d361d84 100644 --- a/kong/db/declarative/init.lua +++ b/kong/db/declarative/init.lua @@ -245,6 +245,7 @@ _M.to_yaml_file = declarative_export.to_yaml_file _M.export_from_db = declarative_export.export_from_db _M.export_config = declarative_export.export_config _M.export_config_proto = declarative_export.export_config_proto +_M.export_config_sync = declarative_export.export_config_sync _M.sanitize_output = declarative_export.sanitize_output diff --git a/kong/db/migrations/core/024_370_to_380.lua b/kong/db/migrations/core/024_370_to_380.lua index ce3d64e90297..f876d74e7ce8 100644 --- a/kong/db/migrations/core/024_370_to_380.lua +++ b/kong/db/migrations/core/024_370_to_380.lua @@ -12,7 +12,7 @@ return { "id" UUID NOT NULL, "ws_id" UUID NOT NULL, "row" JSON, - FOREIGN KEY (version) REFERENCES clustering_sync_version(version) + FOREIGN KEY (version) REFERENCES clustering_sync_version(version) ON DELETE CASCADE ); CREATE INDEX clustering_sync_delta_version_idx ON clustering_sync_delta (version); END; diff --git a/kong/init.lua b/kong/init.lua index f61c4bc634d7..eafef9729948 100644 --- a/kong/init.lua +++ b/kong/init.lua @@ -697,8 +697,8 @@ function Kong.init() if config.cluster_rpc then kong.rpc = require("kong.clustering.rpc.manager").new(config, kong.node.get_id()) - kong.sync = require("kong.clustering.services.sync").new(db) - kong.sync:init(kong.rpc, is_control_plane(config)) + kong.sync = require("kong.clustering.services.sync").new(db, is_control_plane(config)) + kong.sync:init(kong.rpc) if is_data_plane(config) then require("kong.clustering.services.debug").init(kong.rpc) @@ -997,14 +997,16 @@ function Kong.init_worker() cluster_tls.get_cluster_cert_key(kong.configuration)) end) - kong.sync:init_worker_dp() - else -- control_plane kong.rpc.concentrator:start() end end --end + if kong.sync and is_http_module then + kong.sync:init_worker() + end + ok, err = wasm.init_worker() if not ok then err = "wasm nginx worker initialization failed: " .. tostring(err)