Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(clustering/rpc): dao hooks of upsert/delete/update_by are missed #13819

Merged
merged 4 commits into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions kong/clustering/rpc/manager.lua
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ local cjson = require("cjson.safe")

local ngx_var = ngx.var
local ngx_ERR = ngx.ERR
local ngx_DEBUG = ngx.DEBUG
local ngx_log = ngx.log
local ngx_exit = ngx.exit
local ngx_time = ngx.time
Expand Down Expand Up @@ -172,12 +173,22 @@ function _M:call(node_id, method, ...)

local params = {...}

ngx_log(ngx_DEBUG,
chronolaw marked this conversation as resolved.
Show resolved Hide resolved
"[rpc] calling ", method,
"(node_id: ", node_id, ")",
" via ", res == "local" and "local" or "concentrator"
)

if res == "local" then
res, err = self:_local_call(node_id, method, params)

if not res then
ngx_log(ngx_DEBUG, "[rpc] ", method, " failed, err: ", err)
return nil, err
end

ngx_log(ngx_DEBUG, "[rpc] ", method, " succeeded")

return res
end

Expand All @@ -188,14 +199,21 @@ function _M:call(node_id, method, ...)
assert(fut:start())

local ok, err = fut:wait(5)

if err then
ngx_log(ngx_DEBUG, "[rpc] ", method, " failed, err: ", err)

return nil, err
end

if ok then
ngx_log(ngx_DEBUG, "[rpc] ", method, " succeeded")

return fut.result
end

ngx_log(ngx_DEBUG, "[rpc] ", method, " failed, err: ", fut.error.message)

return nil, fut.error.message
end

Expand Down
2 changes: 2 additions & 0 deletions kong/clustering/rpc/socket.lua
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,8 @@ function _M:start()
if payload.method then
-- invoke

ngx_log(ngx_DEBUG, "[rpc] got RPC call: ", payload.method, " (id: ", payload.id, ")")

local dispatch_cb = self.manager.callbacks.callbacks[payload.method]
if not dispatch_cb then
local res, err = self.outgoing:push(new_error(payload.id, jsonrpc.METHOD_NOT_FOUND))
Expand Down
28 changes: 27 additions & 1 deletion kong/clustering/services/sync/hooks.lua
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ function _M:notify_all_nodes()
return
end

ngx_log(ngx_DEBUG, "[kong.sync.v2] notifying all nodes of new version: ", latest_version)

local msg = { default = { new_version = latest_version, }, }

for _, node in ipairs(get_all_nodes_with_sync_cap()) do
Expand Down Expand Up @@ -113,6 +115,9 @@ end
function _M:register_dao_hooks()
local function is_db_export(name)
local db_export = kong.db[name].schema.db_export

ngx_log(ngx_DEBUG, "[kong.sync.v2] name: ", name, " db_export: ", db_export)

return db_export == nil or db_export == true
end

Expand All @@ -131,6 +136,8 @@ function _M:register_dao_hooks()
return
end

ngx_log(ngx_DEBUG, "[kong.sync.v2] failed. Canceling ", name)

local res, err = self.strategy:cancel_txn()
if not res then
ngx_log(ngx_ERR, "unable to cancel cancel_txn: ", tostring(err))
Expand All @@ -142,6 +149,8 @@ function _M:register_dao_hooks()
return entity
end

ngx_log(ngx_DEBUG, "[kong.sync.v2] new delta due to writing ", name)

return self:entity_delta_writer(entity, name, options, ws_id)
end

Expand All @@ -150,7 +159,9 @@ function _M:register_dao_hooks()
return entity
end

-- set lmdb value to ngx_null then return entity
ngx_log(ngx_DEBUG, "[kong.sync.v2] new delta due to deleting ", name)

-- set lmdb value to ngx_null then return row
return self:entity_delta_writer(entity, name, options, ws_id, true)
end

Expand All @@ -174,6 +185,21 @@ function _M:register_dao_hooks()
["dao:upsert:pre"] = pre_hook_func,
["dao:upsert:fail"] = fail_hook_func,
["dao:upsert:post"] = post_hook_writer_func,

-- dao:upsert_by
["dao:upsert_by:pre"] = pre_hook_func,
["dao:upsert_by:fail"] = fail_hook_func,
chronolaw marked this conversation as resolved.
Show resolved Hide resolved
["dao:upsert_by:post"] = post_hook_writer_func,

-- dao:delete_by
["dao:delete_by:pre"] = pre_hook_func,
["dao:delete_by:fail"] = fail_hook_func,
["dao:delete_by:post"] = post_hook_delete_func,

-- dao:update_by
["dao:update_by:pre"] = pre_hook_func,
["dao:update_by:fail"] = fail_hook_func,
["dao:update_by:post"] = post_hook_writer_func,
}

for ev, func in pairs(dao_hooks) do
Expand Down
6 changes: 6 additions & 0 deletions kong/db/dao/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -814,12 +814,14 @@ local function generate_foreign_key_methods(schema)
local entity_to_update, rbw_entity, err, err_t = check_update(self, unique_value,
entity, options, name)
if not entity_to_update then
run_hook("dao:update_by:fail", err_t, entity_to_update, self.schema.name, options)
return nil, err, err_t
end

local row, err_t = self.strategy:update_by_field(name, unique_value,
entity_to_update, options)
if not row then
run_hook("dao:update_by:fail", err_t, entity_to_update, self.schema.name, options)
return nil, tostring(err_t), err_t
end

Expand Down Expand Up @@ -869,12 +871,14 @@ local function generate_foreign_key_methods(schema)
local row, err_t = self.strategy:upsert_by_field(name, unique_value,
entity_to_upsert, options)
if not row then
run_hook("dao:upsert_by:fail", err_t, entity_to_upsert, self.schema.name, options)
return nil, tostring(err_t), err_t
end

local ws_id = row.ws_id
row, err, err_t = self:row_to_entity(row, options)
if not row then
run_hook("dao:upsert_by:fail", err_t, entity_to_upsert, self.schema.name, options)
return nil, err, err_t
end

Expand Down Expand Up @@ -928,9 +932,11 @@ local function generate_foreign_key_methods(schema)
local rows_affected
rows_affected, err_t = self.strategy:delete_by_field(name, unique_value, options)
if err_t then
run_hook("dao:delete_by:fail", err_t, entity, self.schema.name, options)
return nil, tostring(err_t), err_t

elseif not rows_affected then
run_hook("dao:delete_by:post", nil, self.schema.name, options, entity.ws_id, nil)
return nil
end

Expand Down
22 changes: 11 additions & 11 deletions spec/02-integration/09-hybrid_mode/01-sync_spec.lua
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,13 @@ local uuid = require("kong.tools.uuid").uuid
local KEY_AUTH_PLUGIN


for _, inc_sync in ipairs { "on", "off" } do
for _, inc_sync in ipairs { "on", "off" } do
for _, strategy in helpers.each_strategy() do

--- XXX FIXME: enable inc_sync = on
-- skips the rest of the tests. We will fix them in a follow-up PR
local skip_inc_sync = inc_sync == "on" and pending or describe

describe("CP/DP communication #" .. strategy .. " inc_sync=" .. inc_sync, function()

lazy_setup(function()
Expand Down Expand Up @@ -623,11 +627,7 @@ describe("CP/DP #version check #" .. strategy, function()
end)
end)

--- XXX FIXME: enable inc_sync = on
-- skips the rest of the tests. We will fix them in a follow-up PR
local skip_inc_sync = inc_sync == "on" and pending or describe

skip_inc_sync("CP/DP config sync #" .. strategy, function()
describe("CP/DP config sync #" .. strategy, function()
lazy_setup(function()
helpers.get_db_utils(strategy) -- runs migrations

Expand Down Expand Up @@ -776,12 +776,12 @@ skip_inc_sync("CP/DP labels #" .. strategy, function()

describe("status API", function()
it("shows DP status", function()
helpers.wait_until(function()
local admin_client = helpers.admin_client()
finally(function()
admin_client:close()
end)
local admin_client = helpers.admin_client()
finally(function()
admin_client:close()
end)

helpers.wait_until(function()
local res = assert(admin_client:get("/clustering/data-planes"))
local body = assert.res_status(200, res)
local json = cjson.decode(body)
Expand Down
Loading