From 9bc1338ccdd3ca435d3d45867970b5d392d4e413 Mon Sep 17 00:00:00 2001 From: Chrono Date: Thu, 5 Sep 2024 15:14:12 +0800 Subject: [PATCH] feat(dao): support diff transaction (#13586) * txn in strategy * register_dao_hooks * fix txn funcs * comments * hook dao:xxx:fail * dao:delete:fail * get_latest_version * Revert "get_latest_version" This reverts commit c97f868e5de714c4a8cdf77740dd014e6e713f02. * select max --- kong/clustering/services/sync/hooks.lua | 146 ++++++++++++------ .../services/sync/strategies/postgres.lua | 20 ++- kong/db/dao/init.lua | 4 + 3 files changed, 119 insertions(+), 51 deletions(-) diff --git a/kong/clustering/services/sync/hooks.lua b/kong/clustering/services/sync/hooks.lua index a25441239c7..b9b4d3d0fa0 100644 --- a/kong/clustering/services/sync/hooks.lua +++ b/kong/clustering/services/sync/hooks.lua @@ -45,69 +45,117 @@ end function _M:register_dao_hooks(is_cp) - if is_cp then - hooks.register_hook("dao:insert:post", function(row, name, options, ws_id) - local deltas = { - { - ["type"] = name, - id = row.id, - ws_id = ws_id, - row = row, }, - } - - local res, err = self.strategy:insert_delta(deltas) - if not res then - return nil, err - end + -- only control plane has these delta operations + if not is_cp then + return + end - local latest_version = self.strategy:get_latest_version() + -- dao:insert + + hooks.register_hook("dao:insert:pre", function() + return self.strategy:begin_txn() + end) + + hooks.register_hook("dao:insert:fail", function() + return self.strategy:cancel_txn() + end) + + hooks.register_hook("dao:insert:post", function(row, name, options, ws_id) + local deltas = { + { + ["type"] = name, + id = row.id, + ws_id = ws_id, + row = row, }, + } + + local res, err = self.strategy:insert_delta(deltas) + if not res then + self.strategy:cancel_txn() + return nil, err + end + + res, err = self.strategy:commit_txn() + if not res then + self.strategy:cancel_txn() + return nil, err + end - for _, node in ipairs(get_all_nodes_with_sync_cap()) do - res, err = kong.rpc:call(node, "kong.sync.v2.notify_new_version", latest_version) - if not res then - if not err:find("requested capability does not exist", nil, true) then - ngx.log(ngx.ERR, "unable to notify new version: ", err) - end + local latest_version = self.strategy:get_latest_version() - else - ngx.log(ngx.ERR, "notified ", node, " ", latest_version) + for _, node in ipairs(get_all_nodes_with_sync_cap()) do + res, err = kong.rpc:call(node, "kong.sync.v2.notify_new_version", latest_version) + if not res then + if not err:find("requested capability does not exist", nil, true) then + ngx.log(ngx.ERR, "unable to notify new version: ", err) end + + else + ngx.log(ngx.ERR, "notified ", node, " ", latest_version) end + end - return row, name, options, ws_id - end) + return row, name, options, ws_id + end) - hooks.register_hook("dao:delete:post", function(row, name, options, ws_id, cascade_entries) - local deltas = { - { - ["type"] = name, - id = row.id, - ws_id = ws_id, - row = ngx.null, }, - } + -- dao:delete - local res, err = self.strategy:insert_delta(deltas) - if not res then - return nil, err - end + hooks.register_hook("dao:delete:pre", function() + return self.strategy:begin_txn() + end) - local latest_version = self.strategy:get_latest_version() + hooks.register_hook("dao:delete:fail", function(err) + if err then + return self.strategy:cancel_txn() + end - for _, node in ipairs(get_all_nodes_with_sync_cap()) do - res, err = kong.rpc:call(node, "kong.sync.v2.notify_new_version", latest_version) - if not res then - if not err:find("requested capability does not exist", nil, true) then - ngx.log(ngx.ERR, "unable to notify new version: ", err) - end + -- no err, we should commit delete operation - else - ngx.log(ngx.ERR, "notified ", node, " ", latest_version) + local res, err = self.strategy:commit_txn() + if not res then + return self.strategy:cancel_txn() + end + + return true + end) + + hooks.register_hook("dao:delete:post", function(row, name, options, ws_id, cascade_entries) + local deltas = { + { + ["type"] = name, + id = row.id, + ws_id = ws_id, + row = ngx.null, }, + } + + local res, err = self.strategy:insert_delta(deltas) + if not res then + self.strategy:cancel_txn() + return nil, err + end + + res, err = self.strategy:commit_txn() + if not res then + self.strategy:cancel_txn() + return nil, err + end + + local latest_version = self.strategy:get_latest_version() + + for _, node in ipairs(get_all_nodes_with_sync_cap()) do + res, err = kong.rpc:call(node, "kong.sync.v2.notify_new_version", latest_version) + if not res then + if not err:find("requested capability does not exist", nil, true) then + ngx.log(ngx.ERR, "unable to notify new version: ", err) end + + else + ngx.log(ngx.ERR, "notified ", node, " ", latest_version) end + end - return row, name, options, ws_id, cascade_entries - end) - end + return row, name, options, ws_id, cascade_entries + end) end diff --git a/kong/clustering/services/sync/strategies/postgres.lua b/kong/clustering/services/sync/strategies/postgres.lua index dbc49fa18a2..57110af4353 100644 --- a/kong/clustering/services/sync/strategies/postgres.lua +++ b/kong/clustering/services/sync/strategies/postgres.lua @@ -51,8 +51,8 @@ end function _M:get_latest_version() - local sql = "SELECT currval(pg_get_serial_sequence('clustering_sync_version', 'version'))" - return self.connector:query(sql)[1].currval + local sql = "SELECT MAX(version) AS max_version FROM clustering_sync_version" + return self.connector:query(sql)[1].max_version end @@ -62,4 +62,20 @@ function _M:get_delta(version) end +function _M:begin_txn() + return self.connector:query("BEGIN;") +end + + +function _M:commit_txn() + return self.connector:query("COMMIT;") +end + + +function _M:cancel_txn() + -- we will close the connection, not execute 'ROLLBACK' + return self.connector:close() +end + + return _M diff --git a/kong/db/dao/init.lua b/kong/db/dao/init.lua index 515e6c0c719..c5e993ad1f1 100644 --- a/kong/db/dao/init.lua +++ b/kong/db/dao/init.lua @@ -1156,12 +1156,14 @@ function DAO:insert(entity, options) local row, err_t = self.strategy:insert(entity_to_insert, options) if not row then + run_hook("dao:insert:fail", err_t, entity, self.schema.name, options) return nil, tostring(err_t), err_t end local ws_id = row.ws_id row, err, err_t = self:row_to_entity(row, options) if not row then + run_hook("dao:insert:fail", err, entity, self.schema.name, options) return nil, err, err_t end @@ -1337,9 +1339,11 @@ function DAO:delete(pk_or_entity, options) local rows_affected rows_affected, err_t = self.strategy:delete(primary_key, options) if err_t then + run_hook("dao:delete:fail", err_t, entity, self.schema.name, options) return nil, tostring(err_t), err_t elseif not rows_affected then + run_hook("dao:delete:fail", nil, entity, self.schema.name, options) return nil end