From f580b61f614d1684b498a84768a700bb0dc42e6a Mon Sep 17 00:00:00 2001 From: Chrono Date: Fri, 8 Nov 2024 08:49:13 +0800 Subject: [PATCH] refactor(clustering/rpc): code clean of full sync in rpc (#13823) https://konghq.atlassian.net/browse/KAG-5728 --- kong/clustering/services/sync/rpc.lua | 45 ++++++++++++++------------- 1 file changed, 24 insertions(+), 21 deletions(-) diff --git a/kong/clustering/services/sync/rpc.lua b/kong/clustering/services/sync/rpc.lua index 2cefa4a33417..f115ceff02b1 100644 --- a/kong/clustering/services/sync/rpc.lua +++ b/kong/clustering/services/sync/rpc.lua @@ -39,13 +39,25 @@ function _M.new(strategy) end -function _M:init_cp(manager) - local purge_delay = manager.conf.cluster_data_plane_purge_delay +local function inc_sync_result(res) + return { default = { deltas = res, wipe = false, }, } +end - local function gen_delta_result(res, wipe) - return { default = { deltas = res, wipe = wipe, }, } + +local function full_sync_result() + local deltas, err = declarative.export_config_sync() + if not deltas then + return nil, err end + -- wipe dp lmdb, full sync + return { default = { deltas = deltas, wipe = true, }, } +end + + +function _M:init_cp(manager) + local purge_delay = manager.conf.cluster_data_plane_purge_delay + -- CP -- Method: kong.sync.v2.get_delta -- Params: versions: list of current versions of the database @@ -97,16 +109,11 @@ function _M:init_cp(manager) ", current_version: ", default_namespace_version, ", forcing a full sync") - - local deltas, err = declarative.export_config_sync() - if not deltas then - return nil, err - end - - -- wipe dp lmdb, full sync - return gen_delta_result(deltas, true) + return full_sync_result() end + -- do we need an incremental sync? + local res, err = self.strategy:get_delta(default_namespace_version) if not res then return nil, err @@ -117,13 +124,13 @@ function _M:init_cp(manager) "[kong.sync.v2] no delta for node_id: ", node_id, ", current_version: ", default_namespace_version, ", node is already up to date" ) - return gen_delta_result(res, false) + return inc_sync_result(res) end -- some deltas are returned, are they contiguous? if res[1].version == default_namespace_version + 1 then -- doesn't wipe dp lmdb, incremental sync - return gen_delta_result(res, false) + return inc_sync_result(res) end -- we need to full sync because holes are found @@ -135,13 +142,7 @@ function _M:init_cp(manager) ", current_version: ", default_namespace_version, ", forcing a full sync") - local deltas, err = declarative.export_config_sync() - if not deltas then - return nil, err - end - - -- wipe dp lmdb, full sync - return gen_delta_result(deltas, true) + return full_sync_result() end) end @@ -364,6 +365,8 @@ end function _M:sync_once(delay) + --- XXX TODO: check rpc connection is ready + return start_sync_timer(ngx.timer.at, delay or 0) end