Skip to content

Commit

Permalink
feat(dao): support diff transaction (#13586)
Browse files Browse the repository at this point in the history
* txn in strategy

* register_dao_hooks

* fix txn funcs

* comments

* hook dao:xxx:fail

* dao:delete:fail

* get_latest_version

* Revert "get_latest_version"

This reverts commit c97f868.

* select max
  • Loading branch information
chronolaw authored Sep 5, 2024
1 parent 2163b2b commit 9bc1338
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 51 deletions.
146 changes: 97 additions & 49 deletions kong/clustering/services/sync/hooks.lua
Original file line number Diff line number Diff line change
Expand Up @@ -45,69 +45,117 @@ end


function _M:register_dao_hooks(is_cp)
if is_cp then
hooks.register_hook("dao:insert:post", function(row, name, options, ws_id)
local deltas = {
{
["type"] = name,
id = row.id,
ws_id = ws_id,
row = row, },
}

local res, err = self.strategy:insert_delta(deltas)
if not res then
return nil, err
end
-- only control plane has these delta operations
if not is_cp then
return
end

local latest_version = self.strategy:get_latest_version()
-- dao:insert

hooks.register_hook("dao:insert:pre", function()
return self.strategy:begin_txn()
end)

hooks.register_hook("dao:insert:fail", function()
return self.strategy:cancel_txn()
end)

hooks.register_hook("dao:insert:post", function(row, name, options, ws_id)
local deltas = {
{
["type"] = name,
id = row.id,
ws_id = ws_id,
row = row, },
}

local res, err = self.strategy:insert_delta(deltas)
if not res then
self.strategy:cancel_txn()
return nil, err
end

res, err = self.strategy:commit_txn()
if not res then
self.strategy:cancel_txn()
return nil, err
end

for _, node in ipairs(get_all_nodes_with_sync_cap()) do
res, err = kong.rpc:call(node, "kong.sync.v2.notify_new_version", latest_version)
if not res then
if not err:find("requested capability does not exist", nil, true) then
ngx.log(ngx.ERR, "unable to notify new version: ", err)
end
local latest_version = self.strategy:get_latest_version()

else
ngx.log(ngx.ERR, "notified ", node, " ", latest_version)
for _, node in ipairs(get_all_nodes_with_sync_cap()) do
res, err = kong.rpc:call(node, "kong.sync.v2.notify_new_version", latest_version)
if not res then
if not err:find("requested capability does not exist", nil, true) then
ngx.log(ngx.ERR, "unable to notify new version: ", err)
end

else
ngx.log(ngx.ERR, "notified ", node, " ", latest_version)
end
end

return row, name, options, ws_id
end)
return row, name, options, ws_id
end)

hooks.register_hook("dao:delete:post", function(row, name, options, ws_id, cascade_entries)
local deltas = {
{
["type"] = name,
id = row.id,
ws_id = ws_id,
row = ngx.null, },
}
-- dao:delete

local res, err = self.strategy:insert_delta(deltas)
if not res then
return nil, err
end
hooks.register_hook("dao:delete:pre", function()
return self.strategy:begin_txn()
end)

local latest_version = self.strategy:get_latest_version()
hooks.register_hook("dao:delete:fail", function(err)
if err then
return self.strategy:cancel_txn()
end

for _, node in ipairs(get_all_nodes_with_sync_cap()) do
res, err = kong.rpc:call(node, "kong.sync.v2.notify_new_version", latest_version)
if not res then
if not err:find("requested capability does not exist", nil, true) then
ngx.log(ngx.ERR, "unable to notify new version: ", err)
end
-- no err, we should commit delete operation

else
ngx.log(ngx.ERR, "notified ", node, " ", latest_version)
local res, err = self.strategy:commit_txn()
if not res then
return self.strategy:cancel_txn()
end

return true
end)

hooks.register_hook("dao:delete:post", function(row, name, options, ws_id, cascade_entries)
local deltas = {
{
["type"] = name,
id = row.id,
ws_id = ws_id,
row = ngx.null, },
}

local res, err = self.strategy:insert_delta(deltas)
if not res then
self.strategy:cancel_txn()
return nil, err
end

res, err = self.strategy:commit_txn()
if not res then
self.strategy:cancel_txn()
return nil, err
end

local latest_version = self.strategy:get_latest_version()

for _, node in ipairs(get_all_nodes_with_sync_cap()) do
res, err = kong.rpc:call(node, "kong.sync.v2.notify_new_version", latest_version)
if not res then
if not err:find("requested capability does not exist", nil, true) then
ngx.log(ngx.ERR, "unable to notify new version: ", err)
end

else
ngx.log(ngx.ERR, "notified ", node, " ", latest_version)
end
end

return row, name, options, ws_id, cascade_entries
end)
end
return row, name, options, ws_id, cascade_entries
end)
end


Expand Down
20 changes: 18 additions & 2 deletions kong/clustering/services/sync/strategies/postgres.lua
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ end


function _M:get_latest_version()
local sql = "SELECT currval(pg_get_serial_sequence('clustering_sync_version', 'version'))"
return self.connector:query(sql)[1].currval
local sql = "SELECT MAX(version) AS max_version FROM clustering_sync_version"
return self.connector:query(sql)[1].max_version
end


Expand All @@ -62,4 +62,20 @@ function _M:get_delta(version)
end


function _M:begin_txn()
return self.connector:query("BEGIN;")
end


function _M:commit_txn()
return self.connector:query("COMMIT;")
end


function _M:cancel_txn()
-- we will close the connection, not execute 'ROLLBACK'
return self.connector:close()
end


return _M
4 changes: 4 additions & 0 deletions kong/db/dao/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -1156,12 +1156,14 @@ function DAO:insert(entity, options)

local row, err_t = self.strategy:insert(entity_to_insert, options)
if not row then
run_hook("dao:insert:fail", err_t, entity, self.schema.name, options)
return nil, tostring(err_t), err_t
end

local ws_id = row.ws_id
row, err, err_t = self:row_to_entity(row, options)
if not row then
run_hook("dao:insert:fail", err, entity, self.schema.name, options)
return nil, err, err_t
end

Expand Down Expand Up @@ -1337,9 +1339,11 @@ function DAO:delete(pk_or_entity, options)
local rows_affected
rows_affected, err_t = self.strategy:delete(primary_key, options)
if err_t then
run_hook("dao:delete:fail", err_t, entity, self.schema.name, options)
return nil, tostring(err_t), err_t

elseif not rows_affected then
run_hook("dao:delete:fail", nil, entity, self.schema.name, options)
return nil
end

Expand Down

0 comments on commit 9bc1338

Please sign in to comment.