From 53c167da84ab83242373eeb7a575fe8dba7d80f6 Mon Sep 17 00:00:00 2001 From: Chrono Date: Tue, 3 Dec 2024 16:36:05 +0800 Subject: [PATCH] Revert "refactor(clustering/sync): clean the logic of sync_once (#13956)" This reverts commit 127d0a237cefb77dff64d5fc6b3424cd79ac9a0f. --- kong/clustering/services/sync/rpc.lua | 48 +++++++++++++++++---------- 1 file changed, 31 insertions(+), 17 deletions(-) diff --git a/kong/clustering/services/sync/rpc.lua b/kong/clustering/services/sync/rpc.lua index fdd3d0cb6f02..4100afbb9675 100644 --- a/kong/clustering/services/sync/rpc.lua +++ b/kong/clustering/services/sync/rpc.lua @@ -17,7 +17,7 @@ local CLUSTERING_DATA_PLANES_LATEST_VERSION_KEY = constants.CLUSTERING_DATA_PLAN local DECLARATIVE_DEFAULT_WORKSPACE_KEY = constants.DECLARATIVE_DEFAULT_WORKSPACE_KEY local CLUSTERING_SYNC_STATUS = constants.CLUSTERING_SYNC_STATUS local SYNC_MUTEX_OPTS = { name = "get_delta", timeout = 0, } -local SYNC_MAX_RETRY = 5 +local MAX_RETRY = 5 local assert = assert @@ -392,8 +392,7 @@ local function do_sync() end -local sync_handler -sync_handler = function(premature, try_counter) +local function sync_handler(premature) if premature then return end @@ -402,37 +401,52 @@ sync_handler = function(premature, try_counter) if not res and err ~= "timeout" then ngx_log(ngx_ERR, "unable to create worker mutex and sync: ", err) end +end - -- try_counter is not set, only run once - if not try_counter then - return - end - if try_counter <= 0 then - ngx_log(ngx_ERR, "sync_once try count exceeded.") - return +local sync_once_impl + + +local function start_sync_once_timer(retry_count) + local ok, err = ngx.timer.at(0, sync_once_impl, retry_count or 0) + if not ok then + return nil, err end - assert(try_counter >= 1) + return true +end - local latest_notified_version = ngx.shared.kong:get(CLUSTERING_DATA_PLANES_LATEST_VERSION_KEY) - if not latest_notified_version then - ngx_log(ngx_DEBUG, "no version notified yet") + +function sync_once_impl(premature, retry_count) + if premature then return end + sync_handler() + + local latest_notified_version = ngx.shared.kong:get(CLUSTERING_DATA_PLANES_LATEST_VERSION_KEY) local current_version = tonumber(declarative.get_current_hash()) or 0 - if current_version >= latest_notified_version then + + if not latest_notified_version then + ngx_log(ngx_DEBUG, "no version notified yet") return end -- retry if the version is not updated - return ngx.timer.at(0, sync_handler, try_counter - 1) + if current_version < latest_notified_version then + retry_count = retry_count or 0 + if retry_count > MAX_RETRY then + ngx_log(ngx_ERR, "sync_once retry count exceeded. retry_count: ", retry_count) + return + end + + return start_sync_once_timer(retry_count + 1) + end end function _M:sync_once(delay) - return ngx.timer.at(delay or 0, sync_handler, SYNC_MAX_RETRY) + return ngx.timer.at(delay or 0, sync_once_impl, 0) end