Skip to content

Commit

Permalink
fix(incremental): reduce the use of timers (#13732)
Browse files Browse the repository at this point in the history
  • Loading branch information
chobits authored and chronolaw committed Oct 12, 2024
1 parent f70a38b commit a4bacf4
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 106 deletions.
8 changes: 1 addition & 7 deletions kong/clustering/services/sync/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,7 @@ function _M:init_worker()
-- sync to CP ASAP
assert(self.rpc:sync_once(FIRST_SYNC_DELAY))

assert(ngx.timer.every(EACH_SYNC_DELAY, function(premature)
if premature then
return
end

assert(self.rpc:sync_once())
end))
assert(self.rpc:sync_every(EACH_SYNC_DELAY))
end


Expand Down
212 changes: 113 additions & 99 deletions kong/clustering/services/sync/rpc.lua
Original file line number Diff line number Diff line change
Expand Up @@ -182,137 +182,151 @@ function _M:init(manager, is_cp)
end


function _M:sync_once(delay)
local hdl, err = ngx.timer.at(delay or 0, function(premature)
if premature then
return
end

local res, err = concurrency.with_worker_mutex(SYNC_MUTEX_OPTS, function()
-- here must be 2 times
for _ = 1, 2 do
local ns_deltas, err = kong.rpc:call("control_plane", "kong.sync.v2.get_delta",
{ default =
{ version =
tonumber(declarative.get_current_hash()) or 0,
},
})
if not ns_deltas then
ngx_log(ngx_ERR, "sync get_delta error: ", err)
return true
end

local ns_delta
local function do_sync(premature)
if premature then
return
end

for namespace, delta in pairs(ns_deltas) do
if namespace == "default" then
ns_delta = delta
break -- should we break here?
end
end
local res, err = concurrency.with_worker_mutex(SYNC_MUTEX_OPTS, function()
-- here must be 2 times
for _ = 1, 2 do
local ns_deltas, err = kong.rpc:call("control_plane", "kong.sync.v2.get_delta",
{ default =
{ version =
tonumber(declarative.get_current_hash()) or 0,
},
})
if not ns_deltas then
ngx_log(ngx_ERR, "sync get_delta error: ", err)
return true
end

if not ns_delta then
return nil, "default namespace does not exist inside params"
end
local ns_delta

if #ns_delta.deltas == 0 then
ngx_log(ngx_DEBUG, "no delta to sync")
return true
for namespace, delta in pairs(ns_deltas) do
if namespace == "default" then
ns_delta = delta
break -- should we break here?
end
end

local t = txn.begin(512)
if not ns_delta then
return nil, "default namespace does not exist inside params"
end

if ns_delta.wipe then
t:db_drop(false)
end
if #ns_delta.deltas == 0 then
ngx_log(ngx_DEBUG, "no delta to sync")
return true
end

local db = kong.db
local t = txn.begin(512)

local version = 0
local crud_events = {}
local crud_events_n = 0
if ns_delta.wipe then
t:db_drop(false)
end

for _, delta in ipairs(ns_delta.deltas) do
local delta_type = delta.type
local delta_row = delta.row
local ev
local db = kong.db

if delta_row ~= ngx_null then
-- upsert the entity
-- does the entity already exists?
local old_entity, err = db[delta_type]:select(delta_row)
if err then
return nil, err
end
local version = 0
local crud_events = {}
local crud_events_n = 0

local crud_event_type = "create"
for _, delta in ipairs(ns_delta.deltas) do
local delta_type = delta.type
local delta_row = delta.row
local ev

if old_entity then
local res, err = delete_entity_for_txn(t, delta_type, old_entity, nil)
if not res then
return nil, err
end
if delta_row ~= ngx_null then
-- upsert the entity
-- does the entity already exists?
local old_entity, err = db[delta_type]:select(delta_row)
if err then
return nil, err
end

crud_event_type = "update"
end
local crud_event_type = "create"

local res, err = insert_entity_for_txn(t, delta_type, delta_row, nil)
if old_entity then
local res, err = delete_entity_for_txn(t, delta_type, old_entity, nil)
if not res then
return nil, err
end

ev = { delta_type, crud_event_type, delta_row, old_entity, }
crud_event_type = "update"
end

else
-- delete the entity
local old_entity, err = kong.db[delta_type]:select({ id = delta.id, }) -- TODO: composite key
if err then
return nil, err
end
local res, err = insert_entity_for_txn(t, delta_type, delta_row, nil)
if not res then
return nil, err
end

if old_entity then
local res, err = delete_entity_for_txn(t, delta_type, old_entity, nil)
if not res then
return nil, err
end
end
ev = { delta_type, crud_event_type, delta_row, old_entity, }

ev = { delta_type, "delete", old_entity, }
else
-- delete the entity
local old_entity, err = kong.db[delta_type]:select({ id = delta.id, }) -- TODO: composite key
if err then
return nil, err
end

crud_events_n = crud_events_n + 1
crud_events[crud_events_n] = ev

-- XXX TODO: could delta.version be nil or ngx.null
if type(delta.version) == "number" and delta.version ~= version then
version = delta.version
if old_entity then
local res, err = delete_entity_for_txn(t, delta_type, old_entity, nil)
if not res then
return nil, err
end
end

ev = { delta_type, "delete", old_entity, }
end

t:set(DECLARATIVE_HASH_KEY, fmt("%032d", version))
local ok, err = t:commit()
if not ok then
return nil, err
crud_events_n = crud_events_n + 1
crud_events[crud_events_n] = ev

-- XXX TODO: could delta.version be nil or ngx.null
if type(delta.version) == "number" and delta.version ~= version then
version = delta.version
end
end

t:set(DECLARATIVE_HASH_KEY, fmt("%032d", version))
local ok, err = t:commit()
if not ok then
return nil, err
end

if ns_delta.wipe then
kong.core_cache:purge()
kong.cache:purge()
if ns_delta.wipe then
kong.core_cache:purge()
kong.cache:purge()

else
for _, event in ipairs(crud_events) do
-- delta_type, crud_event_type, delta.row, old_entity
db[event[1]]:post_crud_event(event[2], event[3], event[4])
end
else
for _, event in ipairs(crud_events) do
-- delta_type, crud_event_type, delta.row, old_entity
db[event[1]]:post_crud_event(event[2], event[3], event[4])
end
end -- for _, delta
end
end -- for _, delta

return true
end)
if not res and err ~= "timeout" then
ngx_log(ngx_ERR, "unable to create worker mutex and sync: ", err)
end
return true
end)
if not res and err ~= "timeout" then
ngx_log(ngx_ERR, "unable to create worker mutex and sync: ", err)
end
end


function _M:sync_once(delay)
local hdl, err = ngx.timer.at(delay or 0, do_sync)

if not hdl then
return nil, err
end

return true
end


function _M:sync_every(delay)
local hdl, err = ngx.timer.every(delay, do_sync)

if not hdl then
return nil, err
Expand Down

0 comments on commit a4bacf4

Please sign in to comment.