Skip to content

Commit

Permalink
Revert "refactor(clustering/sync): clean the logic of sync_once (#13956
Browse files Browse the repository at this point in the history
…)" (#13972)

This reverts commit 127d0a2.
  • Loading branch information
chronolaw authored and ProBrian committed Dec 13, 2024
1 parent bbafa05 commit 9a3b3fb
Showing 1 changed file with 31 additions and 17 deletions.
48 changes: 31 additions & 17 deletions kong/clustering/services/sync/rpc.lua
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ local CLUSTERING_DATA_PLANES_LATEST_VERSION_KEY = constants.CLUSTERING_DATA_PLAN
local DECLARATIVE_DEFAULT_WORKSPACE_KEY = constants.DECLARATIVE_DEFAULT_WORKSPACE_KEY
local CLUSTERING_SYNC_STATUS = constants.CLUSTERING_SYNC_STATUS
local SYNC_MUTEX_OPTS = { name = "get_delta", timeout = 0, }
local SYNC_MAX_RETRY = 5
local MAX_RETRY = 5


local assert = assert
Expand Down Expand Up @@ -392,8 +392,7 @@ local function do_sync()
end


local sync_handler
sync_handler = function(premature, try_counter)
local function sync_handler(premature)
if premature then
return
end
Expand All @@ -402,37 +401,52 @@ sync_handler = function(premature, try_counter)
if not res and err ~= "timeout" then
ngx_log(ngx_ERR, "unable to create worker mutex and sync: ", err)
end
end

-- try_counter is not set, only run once
if not try_counter then
return
end

if try_counter <= 0 then
ngx_log(ngx_ERR, "sync_once try count exceeded.")
return
local sync_once_impl


local function start_sync_once_timer(retry_count)
local ok, err = ngx.timer.at(0, sync_once_impl, retry_count or 0)
if not ok then
return nil, err
end

assert(try_counter >= 1)
return true
end

local latest_notified_version = ngx.shared.kong:get(CLUSTERING_DATA_PLANES_LATEST_VERSION_KEY)
if not latest_notified_version then
ngx_log(ngx_DEBUG, "no version notified yet")

function sync_once_impl(premature, retry_count)
if premature then
return
end

sync_handler()

local latest_notified_version = ngx.shared.kong:get(CLUSTERING_DATA_PLANES_LATEST_VERSION_KEY)
local current_version = tonumber(declarative.get_current_hash()) or 0
if current_version >= latest_notified_version then

if not latest_notified_version then
ngx_log(ngx_DEBUG, "no version notified yet")
return
end

-- retry if the version is not updated
return ngx.timer.at(0, sync_handler, try_counter - 1)
if current_version < latest_notified_version then
retry_count = retry_count or 0
if retry_count > MAX_RETRY then
ngx_log(ngx_ERR, "sync_once retry count exceeded. retry_count: ", retry_count)
return
end

return start_sync_once_timer(retry_count + 1)
end
end


function _M:sync_once(delay)
return ngx.timer.at(delay or 0, sync_handler, SYNC_MAX_RETRY)
return ngx.timer.at(delay or 0, sync_once_impl, 0)
end


Expand Down

0 comments on commit 9a3b3fb

Please sign in to comment.