Skip to content

Commit

Permalink
fix(rpc): dao hooks of *_by are missed
Browse files Browse the repository at this point in the history
add hook dao:*_by:fail

Co-authored-by: Chrono <[email protected]>
  • Loading branch information
StarlightIbuki and chronolaw committed Nov 5, 2024
1 parent 1e872f1 commit 46e3bc9
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 16 deletions.
1 change: 1 addition & 0 deletions kong/clustering/rpc/socket.lua
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ function _M:start()
-- invoke

ngx_log(ngx_DEBUG, "[rpc] got RPC call: ", payload.method, " (id: ", payload.id, ")")

local dispatch_cb = self.manager.callbacks.callbacks[payload.method]
if not dispatch_cb then
local res, err = self.outgoing:push(new_error(payload.id, jsonrpc.METHOD_NOT_FOUND))
Expand Down
29 changes: 24 additions & 5 deletions kong/clustering/services/sync/hooks.lua
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ function _M:notify_all_nodes()
return
end

ngx_log(ngx_DEBUG, "[rpc:sync] notifying all nodes of new version: ", latest_version)
ngx_log(ngx_DEBUG, "[kong.sync.v2] notifying all nodes of new version: ", latest_version)

local msg = { default = { new_version = latest_version, }, }

Expand Down Expand Up @@ -114,7 +114,8 @@ end
-- only control plane has these delta operations
function _M:register_dao_hooks()
local function is_db_export(name)
ngx_log(ngx_DEBUG, "[rpc:sync] name: ", name, " db_export: ", kong.db[name].schema.db_export)
ngx_log(ngx_DEBUG, "[kong.sync.v2] name: ", name, " db_export: ", kong.db[name].schema.db_export)

local db_export = kong.db[name].schema.db_export
return db_export == nil or db_export == true
end
Expand All @@ -134,7 +135,8 @@ function _M:register_dao_hooks()
return
end

ngx_log(ngx_DEBUG, "[rpc:sync] failed. Canceling ", name)
ngx_log(ngx_DEBUG, "[kong.sync.v2] failed. Canceling ", name)

local res, err = self.strategy:cancel_txn()
if not res then
ngx_log(ngx_ERR, "unable to cancel cancel_txn: ", tostring(err))
Expand All @@ -146,7 +148,8 @@ function _M:register_dao_hooks()
return entity
end

ngx_log(ngx_DEBUG, "[rpc:sync] new delta due to writing ", name)
ngx_log(ngx_DEBUG, "[kong.sync.v2] new delta due to writing ", name)

return self:entity_delta_writer(entity, name, options, ws_id)
end

Expand All @@ -155,7 +158,8 @@ function _M:register_dao_hooks()
return entity
end

ngx_log(ngx_DEBUG, "[rpc:sync] new delta due to deleting ", name)
ngx_log(ngx_DEBUG, "[kong.sync.v2] new delta due to deleting ", name)

-- set lmdb value to ngx_null then return row
return self:entity_delta_writer(entity, name, options, ws_id, true)
end
Expand All @@ -180,6 +184,21 @@ function _M:register_dao_hooks()
["dao:upsert:pre"] = pre_hook_func,
["dao:upsert:fail"] = fail_hook_func,
["dao:upsert:post"] = post_hook_writer_func,

-- dao:upsert_by
["dao:upsert_by:pre"] = pre_hook_func,
["dao:upsert_by:fail"] = fail_hook_func,
["dao:upsert_by:post"] = post_hook_writer_func,

-- dao:delete_by
["dao:delete_by:pre"] = pre_hook_func,
["dao:delete_by:fail"] = fail_hook_func,
["dao:delete_by:post"] = post_hook_delete_func,

-- dao:update_by
["dao:update_by:pre"] = pre_hook_func,
["dao:update_by:fail"] = fail_hook_func,
["dao:update_by:post"] = post_hook_writer_func,
}

for ev, func in pairs(dao_hooks) do
Expand Down
6 changes: 6 additions & 0 deletions kong/db/dao/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -814,12 +814,14 @@ local function generate_foreign_key_methods(schema)
local entity_to_update, rbw_entity, err, err_t = check_update(self, unique_value,
entity, options, name)
if not entity_to_update then
run_hook("dao:update_by:fail", err_t, entity_to_update, self.schema.name, options)
return nil, err, err_t
end

local row, err_t = self.strategy:update_by_field(name, unique_value,
entity_to_update, options)
if not row then
run_hook("dao:update_by:fail", err_t, entity_to_update, self.schema.name, options)
return nil, tostring(err_t), err_t
end

Expand Down Expand Up @@ -869,12 +871,14 @@ local function generate_foreign_key_methods(schema)
local row, err_t = self.strategy:upsert_by_field(name, unique_value,
entity_to_upsert, options)
if not row then
run_hook("dao:upsert_by:fail", err_t, entity_to_upsert, self.schema.name, options)
return nil, tostring(err_t), err_t
end

local ws_id = row.ws_id
row, err, err_t = self:row_to_entity(row, options)
if not row then
run_hook("dao:upsert_by:fail", err_t, entity_to_upsert, self.schema.name, options)
return nil, err, err_t
end

Expand Down Expand Up @@ -928,9 +932,11 @@ local function generate_foreign_key_methods(schema)
local rows_affected
rows_affected, err_t = self.strategy:delete_by_field(name, unique_value, options)
if err_t then
run_hook("dao:delete_by:fail", err_t, entity, self.schema.name, options)
return nil, tostring(err_t), err_t

elseif not rows_affected then
run_hook("dao:delete_by:post", nil, self.schema.name, options, entity.ws_id, nil)
return nil
end

Expand Down
22 changes: 11 additions & 11 deletions spec/02-integration/09-hybrid_mode/01-sync_spec.lua
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,13 @@ local uuid = require("kong.tools.uuid").uuid
local KEY_AUTH_PLUGIN


for _, inc_sync in ipairs { "on", "off" } do
for _, inc_sync in ipairs { "on", "off" } do
for _, strategy in helpers.each_strategy() do

--- XXX FIXME: enable inc_sync = on
-- skips the rest of the tests. We will fix them in a follow-up PR
local skip_inc_sync = inc_sync == "on" and pending or describe

describe("CP/DP communication #" .. strategy .. " inc_sync=" .. inc_sync, function()

lazy_setup(function()
Expand Down Expand Up @@ -623,11 +627,7 @@ describe("CP/DP #version check #" .. strategy, function()
end)
end)

--- XXX FIXME: enable inc_sync = on
-- skips the rest of the tests. We will fix them in a follow-up PR
local skip_inc_sync = inc_sync == "on" and pending or describe

skip_inc_sync("CP/DP config sync #" .. strategy, function()
describe("CP/DP config sync #" .. strategy, function()
lazy_setup(function()
helpers.get_db_utils(strategy) -- runs migrations

Expand Down Expand Up @@ -776,12 +776,12 @@ skip_inc_sync("CP/DP labels #" .. strategy, function()

describe("status API", function()
it("shows DP status", function()
helpers.wait_until(function()
local admin_client = helpers.admin_client()
finally(function()
admin_client:close()
end)
local admin_client = helpers.admin_client()
finally(function()
admin_client:close()
end)

helpers.wait_until(function()
local res = assert(admin_client:get("/clustering/data-planes"))
local body = assert.res_status(200, res)
local json = cjson.decode(body)
Expand Down

0 comments on commit 46e3bc9

Please sign in to comment.