From 1e872f1872480e1fbb23c7a1bae6aac123052c1a Mon Sep 17 00:00:00 2001 From: xumin Date: Thu, 31 Oct 2024 17:30:09 +0800 Subject: [PATCH 1/4] chore(rpc): add debugging logs --- kong/clustering/rpc/manager.lua | 13 +++++++++++++ kong/clustering/rpc/socket.lua | 1 + kong/clustering/services/sync/hooks.lua | 8 +++++++- 3 files changed, 21 insertions(+), 1 deletion(-) diff --git a/kong/clustering/rpc/manager.lua b/kong/clustering/rpc/manager.lua index 7881b1661ffe..44f3c6d43f1f 100644 --- a/kong/clustering/rpc/manager.lua +++ b/kong/clustering/rpc/manager.lua @@ -18,6 +18,7 @@ local cjson = require("cjson.safe") local ngx_var = ngx.var local ngx_ERR = ngx.ERR +local ngx_DEBUG = ngx.DEBUG local ngx_log = ngx.log local ngx_exit = ngx.exit local ngx_time = ngx.time @@ -172,8 +173,17 @@ function _M:call(node_id, method, ...) local params = {...} + ngx_log(ngx_DEBUG, + "[rpc] calling ", method, + "(node_id: ", node_id, ")", + " via ", res == "local" and "local" or "concentrator" + ) + if res == "local" then res, err = self:_local_call(node_id, method, params) + + ngx_log(ngx_DEBUG, "[rpc] ", method, err and " failed" or " succeeded") + if not res then return nil, err end @@ -188,6 +198,9 @@ function _M:call(node_id, method, ...) assert(fut:start()) local ok, err = fut:wait(5) + + ngx_log(ngx_DEBUG, "[rpc] ", method, err and " failed" or " succeeded") + if err then return nil, err end diff --git a/kong/clustering/rpc/socket.lua b/kong/clustering/rpc/socket.lua index 95ef614df73d..d73620709c41 100644 --- a/kong/clustering/rpc/socket.lua +++ b/kong/clustering/rpc/socket.lua @@ -148,6 +148,7 @@ function _M:start() if payload.method then -- invoke + ngx_log(ngx_DEBUG, "[rpc] got RPC call: ", payload.method, " (id: ", payload.id, ")") local dispatch_cb = self.manager.callbacks.callbacks[payload.method] if not dispatch_cb then local res, err = self.outgoing:push(new_error(payload.id, jsonrpc.METHOD_NOT_FOUND)) diff --git a/kong/clustering/services/sync/hooks.lua b/kong/clustering/services/sync/hooks.lua index ae7bbbe90620..8ae44ca062b7 100644 --- a/kong/clustering/services/sync/hooks.lua +++ b/kong/clustering/services/sync/hooks.lua @@ -59,6 +59,8 @@ function _M:notify_all_nodes() return end + ngx_log(ngx_DEBUG, "[rpc:sync] notifying all nodes of new version: ", latest_version) + local msg = { default = { new_version = latest_version, }, } for _, node in ipairs(get_all_nodes_with_sync_cap()) do @@ -112,6 +114,7 @@ end -- only control plane has these delta operations function _M:register_dao_hooks() local function is_db_export(name) + ngx_log(ngx_DEBUG, "[rpc:sync] name: ", name, " db_export: ", kong.db[name].schema.db_export) local db_export = kong.db[name].schema.db_export return db_export == nil or db_export == true end @@ -131,6 +134,7 @@ function _M:register_dao_hooks() return end + ngx_log(ngx_DEBUG, "[rpc:sync] failed. Canceling ", name) local res, err = self.strategy:cancel_txn() if not res then ngx_log(ngx_ERR, "unable to cancel cancel_txn: ", tostring(err)) @@ -142,6 +146,7 @@ function _M:register_dao_hooks() return entity end + ngx_log(ngx_DEBUG, "[rpc:sync] new delta due to writing ", name) return self:entity_delta_writer(entity, name, options, ws_id) end @@ -150,7 +155,8 @@ function _M:register_dao_hooks() return entity end - -- set lmdb value to ngx_null then return entity + ngx_log(ngx_DEBUG, "[rpc:sync] new delta due to deleting ", name) + -- set lmdb value to ngx_null then return row return self:entity_delta_writer(entity, name, options, ws_id, true) end From 46e3bc943d3e428aa44b0113b78b78fa16dc4019 Mon Sep 17 00:00:00 2001 From: xumin Date: Thu, 31 Oct 2024 17:30:35 +0800 Subject: [PATCH 2/4] fix(rpc): dao hooks of `*_by` are missed add hook dao:*_by:fail Co-authored-by: Chrono --- kong/clustering/rpc/socket.lua | 1 + kong/clustering/services/sync/hooks.lua | 29 +++++++++++++++---- kong/db/dao/init.lua | 6 ++++ .../09-hybrid_mode/01-sync_spec.lua | 22 +++++++------- 4 files changed, 42 insertions(+), 16 deletions(-) diff --git a/kong/clustering/rpc/socket.lua b/kong/clustering/rpc/socket.lua index d73620709c41..ca8a80d622cd 100644 --- a/kong/clustering/rpc/socket.lua +++ b/kong/clustering/rpc/socket.lua @@ -149,6 +149,7 @@ function _M:start() -- invoke ngx_log(ngx_DEBUG, "[rpc] got RPC call: ", payload.method, " (id: ", payload.id, ")") + local dispatch_cb = self.manager.callbacks.callbacks[payload.method] if not dispatch_cb then local res, err = self.outgoing:push(new_error(payload.id, jsonrpc.METHOD_NOT_FOUND)) diff --git a/kong/clustering/services/sync/hooks.lua b/kong/clustering/services/sync/hooks.lua index 8ae44ca062b7..80d892ced627 100644 --- a/kong/clustering/services/sync/hooks.lua +++ b/kong/clustering/services/sync/hooks.lua @@ -59,7 +59,7 @@ function _M:notify_all_nodes() return end - ngx_log(ngx_DEBUG, "[rpc:sync] notifying all nodes of new version: ", latest_version) + ngx_log(ngx_DEBUG, "[kong.sync.v2] notifying all nodes of new version: ", latest_version) local msg = { default = { new_version = latest_version, }, } @@ -114,7 +114,8 @@ end -- only control plane has these delta operations function _M:register_dao_hooks() local function is_db_export(name) - ngx_log(ngx_DEBUG, "[rpc:sync] name: ", name, " db_export: ", kong.db[name].schema.db_export) + ngx_log(ngx_DEBUG, "[kong.sync.v2] name: ", name, " db_export: ", kong.db[name].schema.db_export) + local db_export = kong.db[name].schema.db_export return db_export == nil or db_export == true end @@ -134,7 +135,8 @@ function _M:register_dao_hooks() return end - ngx_log(ngx_DEBUG, "[rpc:sync] failed. Canceling ", name) + ngx_log(ngx_DEBUG, "[kong.sync.v2] failed. Canceling ", name) + local res, err = self.strategy:cancel_txn() if not res then ngx_log(ngx_ERR, "unable to cancel cancel_txn: ", tostring(err)) @@ -146,7 +148,8 @@ function _M:register_dao_hooks() return entity end - ngx_log(ngx_DEBUG, "[rpc:sync] new delta due to writing ", name) + ngx_log(ngx_DEBUG, "[kong.sync.v2] new delta due to writing ", name) + return self:entity_delta_writer(entity, name, options, ws_id) end @@ -155,7 +158,8 @@ function _M:register_dao_hooks() return entity end - ngx_log(ngx_DEBUG, "[rpc:sync] new delta due to deleting ", name) + ngx_log(ngx_DEBUG, "[kong.sync.v2] new delta due to deleting ", name) + -- set lmdb value to ngx_null then return row return self:entity_delta_writer(entity, name, options, ws_id, true) end @@ -180,6 +184,21 @@ function _M:register_dao_hooks() ["dao:upsert:pre"] = pre_hook_func, ["dao:upsert:fail"] = fail_hook_func, ["dao:upsert:post"] = post_hook_writer_func, + + -- dao:upsert_by + ["dao:upsert_by:pre"] = pre_hook_func, + ["dao:upsert_by:fail"] = fail_hook_func, + ["dao:upsert_by:post"] = post_hook_writer_func, + + -- dao:delete_by + ["dao:delete_by:pre"] = pre_hook_func, + ["dao:delete_by:fail"] = fail_hook_func, + ["dao:delete_by:post"] = post_hook_delete_func, + + -- dao:update_by + ["dao:update_by:pre"] = pre_hook_func, + ["dao:update_by:fail"] = fail_hook_func, + ["dao:update_by:post"] = post_hook_writer_func, } for ev, func in pairs(dao_hooks) do diff --git a/kong/db/dao/init.lua b/kong/db/dao/init.lua index a20cb6813e7c..b064bf126124 100644 --- a/kong/db/dao/init.lua +++ b/kong/db/dao/init.lua @@ -814,12 +814,14 @@ local function generate_foreign_key_methods(schema) local entity_to_update, rbw_entity, err, err_t = check_update(self, unique_value, entity, options, name) if not entity_to_update then + run_hook("dao:update_by:fail", err_t, entity_to_update, self.schema.name, options) return nil, err, err_t end local row, err_t = self.strategy:update_by_field(name, unique_value, entity_to_update, options) if not row then + run_hook("dao:update_by:fail", err_t, entity_to_update, self.schema.name, options) return nil, tostring(err_t), err_t end @@ -869,12 +871,14 @@ local function generate_foreign_key_methods(schema) local row, err_t = self.strategy:upsert_by_field(name, unique_value, entity_to_upsert, options) if not row then + run_hook("dao:upsert_by:fail", err_t, entity_to_upsert, self.schema.name, options) return nil, tostring(err_t), err_t end local ws_id = row.ws_id row, err, err_t = self:row_to_entity(row, options) if not row then + run_hook("dao:upsert_by:fail", err_t, entity_to_upsert, self.schema.name, options) return nil, err, err_t end @@ -928,9 +932,11 @@ local function generate_foreign_key_methods(schema) local rows_affected rows_affected, err_t = self.strategy:delete_by_field(name, unique_value, options) if err_t then + run_hook("dao:delete_by:fail", err_t, entity, self.schema.name, options) return nil, tostring(err_t), err_t elseif not rows_affected then + run_hook("dao:delete_by:post", nil, self.schema.name, options, entity.ws_id, nil) return nil end diff --git a/spec/02-integration/09-hybrid_mode/01-sync_spec.lua b/spec/02-integration/09-hybrid_mode/01-sync_spec.lua index f434f7c37531..48b60395027c 100644 --- a/spec/02-integration/09-hybrid_mode/01-sync_spec.lua +++ b/spec/02-integration/09-hybrid_mode/01-sync_spec.lua @@ -12,9 +12,13 @@ local uuid = require("kong.tools.uuid").uuid local KEY_AUTH_PLUGIN -for _, inc_sync in ipairs { "on", "off" } do +for _, inc_sync in ipairs { "on", "off" } do for _, strategy in helpers.each_strategy() do +--- XXX FIXME: enable inc_sync = on +-- skips the rest of the tests. We will fix them in a follow-up PR +local skip_inc_sync = inc_sync == "on" and pending or describe + describe("CP/DP communication #" .. strategy .. " inc_sync=" .. inc_sync, function() lazy_setup(function() @@ -623,11 +627,7 @@ describe("CP/DP #version check #" .. strategy, function() end) end) ---- XXX FIXME: enable inc_sync = on --- skips the rest of the tests. We will fix them in a follow-up PR -local skip_inc_sync = inc_sync == "on" and pending or describe - -skip_inc_sync("CP/DP config sync #" .. strategy, function() +describe("CP/DP config sync #" .. strategy, function() lazy_setup(function() helpers.get_db_utils(strategy) -- runs migrations @@ -776,12 +776,12 @@ skip_inc_sync("CP/DP labels #" .. strategy, function() describe("status API", function() it("shows DP status", function() - helpers.wait_until(function() - local admin_client = helpers.admin_client() - finally(function() - admin_client:close() - end) + local admin_client = helpers.admin_client() + finally(function() + admin_client:close() + end) + helpers.wait_until(function() local res = assert(admin_client:get("/clustering/data-planes")) local body = assert.res_status(200, res) local json = cjson.decode(body) From 76564d6de03e2885987fb95433e9fddbc536c239 Mon Sep 17 00:00:00 2001 From: chronolaw Date: Tue, 5 Nov 2024 13:43:43 +0800 Subject: [PATCH 3/4] clean logs --- kong/clustering/rpc/manager.lua | 4 ++-- kong/clustering/services/sync/hooks.lua | 5 +++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/kong/clustering/rpc/manager.lua b/kong/clustering/rpc/manager.lua index 44f3c6d43f1f..6ab7696fd660 100644 --- a/kong/clustering/rpc/manager.lua +++ b/kong/clustering/rpc/manager.lua @@ -182,7 +182,7 @@ function _M:call(node_id, method, ...) if res == "local" then res, err = self:_local_call(node_id, method, params) - ngx_log(ngx_DEBUG, "[rpc] ", method, err and " failed" or " succeeded") + ngx_log(ngx_DEBUG, "[rpc] ", method, " err: ", err) if not res then return nil, err @@ -199,7 +199,7 @@ function _M:call(node_id, method, ...) local ok, err = fut:wait(5) - ngx_log(ngx_DEBUG, "[rpc] ", method, err and " failed" or " succeeded") + ngx_log(ngx_DEBUG, "[rpc] ", method, " err: ", err) if err then return nil, err diff --git a/kong/clustering/services/sync/hooks.lua b/kong/clustering/services/sync/hooks.lua index 80d892ced627..654ce74f1c59 100644 --- a/kong/clustering/services/sync/hooks.lua +++ b/kong/clustering/services/sync/hooks.lua @@ -114,9 +114,10 @@ end -- only control plane has these delta operations function _M:register_dao_hooks() local function is_db_export(name) - ngx_log(ngx_DEBUG, "[kong.sync.v2] name: ", name, " db_export: ", kong.db[name].schema.db_export) - local db_export = kong.db[name].schema.db_export + + ngx_log(ngx_DEBUG, "[kong.sync.v2] name: ", name, " db_export: ", db_export) + return db_export == nil or db_export == true end From 1c89c331aba960f882a76bbf07ee40529fdb227c Mon Sep 17 00:00:00 2001 From: chronolaw Date: Tue, 5 Nov 2024 13:47:53 +0800 Subject: [PATCH 4/4] clean logs --- kong/clustering/rpc/manager.lua | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/kong/clustering/rpc/manager.lua b/kong/clustering/rpc/manager.lua index 6ab7696fd660..5d614997567a 100644 --- a/kong/clustering/rpc/manager.lua +++ b/kong/clustering/rpc/manager.lua @@ -182,12 +182,13 @@ function _M:call(node_id, method, ...) if res == "local" then res, err = self:_local_call(node_id, method, params) - ngx_log(ngx_DEBUG, "[rpc] ", method, " err: ", err) - if not res then + ngx_log(ngx_DEBUG, "[rpc] ", method, " failed, err: ", err) return nil, err end + ngx_log(ngx_DEBUG, "[rpc] ", method, " succeeded") + return res end @@ -199,16 +200,20 @@ function _M:call(node_id, method, ...) local ok, err = fut:wait(5) - ngx_log(ngx_DEBUG, "[rpc] ", method, " err: ", err) - if err then + ngx_log(ngx_DEBUG, "[rpc] ", method, " failed, err: ", err) + return nil, err end if ok then + ngx_log(ngx_DEBUG, "[rpc] ", method, " succeeded") + return fut.result end + ngx_log(ngx_DEBUG, "[rpc] ", method, " failed, err: ", fut.error.message) + return nil, fut.error.message end