Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(dao): support diff transaction #13586

Merged
merged 9 commits into from
Sep 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 97 additions & 49 deletions kong/clustering/services/sync/hooks.lua
Original file line number Diff line number Diff line change
Expand Up @@ -45,69 +45,117 @@ end


function _M:register_dao_hooks(is_cp)
if is_cp then
hooks.register_hook("dao:insert:post", function(row, name, options, ws_id)
local deltas = {
{
["type"] = name,
id = row.id,
ws_id = ws_id,
row = row, },
}

local res, err = self.strategy:insert_delta(deltas)
if not res then
return nil, err
end
-- only control plane has these delta operations
if not is_cp then
return
end

local latest_version = self.strategy:get_latest_version()
-- dao:insert

hooks.register_hook("dao:insert:pre", function()
return self.strategy:begin_txn()
end)

hooks.register_hook("dao:insert:fail", function()
return self.strategy:cancel_txn()
end)

hooks.register_hook("dao:insert:post", function(row, name, options, ws_id)
local deltas = {
{
["type"] = name,
id = row.id,
ws_id = ws_id,
row = row, },
}

local res, err = self.strategy:insert_delta(deltas)
if not res then
self.strategy:cancel_txn()
return nil, err
end

res, err = self.strategy:commit_txn()
if not res then
self.strategy:cancel_txn()
return nil, err
end

for _, node in ipairs(get_all_nodes_with_sync_cap()) do
res, err = kong.rpc:call(node, "kong.sync.v2.notify_new_version", latest_version)
if not res then
if not err:find("requested capability does not exist", nil, true) then
ngx.log(ngx.ERR, "unable to notify new version: ", err)
end
local latest_version = self.strategy:get_latest_version()

else
ngx.log(ngx.ERR, "notified ", node, " ", latest_version)
for _, node in ipairs(get_all_nodes_with_sync_cap()) do
res, err = kong.rpc:call(node, "kong.sync.v2.notify_new_version", latest_version)
if not res then
if not err:find("requested capability does not exist", nil, true) then
ngx.log(ngx.ERR, "unable to notify new version: ", err)
end

else
ngx.log(ngx.ERR, "notified ", node, " ", latest_version)
end
end

return row, name, options, ws_id
end)
return row, name, options, ws_id
end)

hooks.register_hook("dao:delete:post", function(row, name, options, ws_id, cascade_entries)
local deltas = {
{
["type"] = name,
id = row.id,
ws_id = ws_id,
row = ngx.null, },
}
-- dao:delete

local res, err = self.strategy:insert_delta(deltas)
if not res then
return nil, err
end
hooks.register_hook("dao:delete:pre", function()
return self.strategy:begin_txn()
end)

local latest_version = self.strategy:get_latest_version()
hooks.register_hook("dao:delete:fail", function(err)
if err then
return self.strategy:cancel_txn()
end

for _, node in ipairs(get_all_nodes_with_sync_cap()) do
res, err = kong.rpc:call(node, "kong.sync.v2.notify_new_version", latest_version)
if not res then
if not err:find("requested capability does not exist", nil, true) then
ngx.log(ngx.ERR, "unable to notify new version: ", err)
end
-- no err, we should commit delete operation

else
ngx.log(ngx.ERR, "notified ", node, " ", latest_version)
local res, err = self.strategy:commit_txn()
if not res then
return self.strategy:cancel_txn()
end

return true
end)

hooks.register_hook("dao:delete:post", function(row, name, options, ws_id, cascade_entries)
local deltas = {
{
["type"] = name,
id = row.id,
ws_id = ws_id,
row = ngx.null, },
}

local res, err = self.strategy:insert_delta(deltas)
if not res then
self.strategy:cancel_txn()
return nil, err
end

res, err = self.strategy:commit_txn()
if not res then
self.strategy:cancel_txn()
return nil, err
end

local latest_version = self.strategy:get_latest_version()

for _, node in ipairs(get_all_nodes_with_sync_cap()) do
res, err = kong.rpc:call(node, "kong.sync.v2.notify_new_version", latest_version)
if not res then
if not err:find("requested capability does not exist", nil, true) then
ngx.log(ngx.ERR, "unable to notify new version: ", err)
end

else
ngx.log(ngx.ERR, "notified ", node, " ", latest_version)
end
end

return row, name, options, ws_id, cascade_entries
end)
end
return row, name, options, ws_id, cascade_entries
end)
end


Expand Down
20 changes: 18 additions & 2 deletions kong/clustering/services/sync/strategies/postgres.lua
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ end


function _M:get_latest_version()
local sql = "SELECT currval(pg_get_serial_sequence('clustering_sync_version', 'version'))"
return self.connector:query(sql)[1].currval
local sql = "SELECT MAX(version) AS max_version FROM clustering_sync_version"
return self.connector:query(sql)[1].max_version
end


Expand All @@ -62,4 +62,20 @@ function _M:get_delta(version)
end


function _M:begin_txn()
return self.connector:query("BEGIN;")
end


function _M:commit_txn()
return self.connector:query("COMMIT;")
end


function _M:cancel_txn()
-- we will close the connection, not execute 'ROLLBACK'
return self.connector:close()
end


return _M
4 changes: 4 additions & 0 deletions kong/db/dao/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -1156,12 +1156,14 @@ function DAO:insert(entity, options)

local row, err_t = self.strategy:insert(entity_to_insert, options)
if not row then
run_hook("dao:insert:fail", err_t, entity, self.schema.name, options)
return nil, tostring(err_t), err_t
end

local ws_id = row.ws_id
row, err, err_t = self:row_to_entity(row, options)
if not row then
run_hook("dao:insert:fail", err, entity, self.schema.name, options)
return nil, err, err_t
end

Expand Down Expand Up @@ -1337,9 +1339,11 @@ function DAO:delete(pk_or_entity, options)
local rows_affected
rows_affected, err_t = self.strategy:delete(primary_key, options)
if err_t then
run_hook("dao:delete:fail", err_t, entity, self.schema.name, options)
return nil, tostring(err_t), err_t

elseif not rows_affected then
run_hook("dao:delete:fail", nil, entity, self.schema.name, options)
return nil
end

Expand Down
Loading