From 1fd3a76ebcc916e6e03c648c092bd1c3616e4726 Mon Sep 17 00:00:00 2001 From: Chrono Date: Mon, 11 Nov 2024 23:46:38 +0800 Subject: [PATCH] refactor(clustering): small improvements about incremental sync (#13841) --- kong-3.9.0-0.rockspec | 2 +- kong/clustering/services/sync/hooks.lua | 5 +--- kong/clustering/services/sync/rpc.lua | 13 ++++----- .../services/sync/strategies/postgres.lua | 14 ++-------- kong/conf_loader/init.lua | 5 ++-- kong/db/declarative/export.lua | 28 ++++++++++++------- kong/db/declarative/import.lua | 2 +- ...{024_370_to_380.lua => 024_380_to_390.lua} | 0 kong/db/migrations/core/init.lua | 2 +- ...o_380_spec.lua => 024_380_to_390_spec.lua} | 0 10 files changed, 33 insertions(+), 38 deletions(-) rename kong/db/migrations/core/{024_370_to_380.lua => 024_380_to_390.lua} (100%) rename spec/05-migration/db/migrations/core/{024_370_to_380_spec.lua => 024_380_to_390_spec.lua} (100%) diff --git a/kong-3.9.0-0.rockspec b/kong-3.9.0-0.rockspec index b86e9c85658d..a4fd1ba7b177 100644 --- a/kong-3.9.0-0.rockspec +++ b/kong-3.9.0-0.rockspec @@ -322,7 +322,7 @@ build = { ["kong.db.migrations.core.021_340_to_350"] = "kong/db/migrations/core/021_340_to_350.lua", ["kong.db.migrations.core.022_350_to_360"] = "kong/db/migrations/core/022_350_to_360.lua", ["kong.db.migrations.core.023_360_to_370"] = "kong/db/migrations/core/023_360_to_370.lua", - ["kong.db.migrations.core.024_370_to_380"] = "kong/db/migrations/core/024_370_to_380.lua", + ["kong.db.migrations.core.024_380_to_390"] = "kong/db/migrations/core/024_380_to_390.lua", ["kong.db.migrations.operations.200_to_210"] = "kong/db/migrations/operations/200_to_210.lua", ["kong.db.migrations.operations.212_to_213"] = "kong/db/migrations/operations/212_to_213.lua", ["kong.db.migrations.operations.280_to_300"] = "kong/db/migrations/operations/280_to_300.lua", diff --git a/kong/clustering/services/sync/hooks.lua b/kong/clustering/services/sync/hooks.lua index 0dd694e7bbf4..a9368f755061 100644 --- a/kong/clustering/services/sync/hooks.lua +++ b/kong/clustering/services/sync/hooks.lua @@ -67,11 +67,8 @@ function _M:notify_all_nodes() local res, err = kong.rpc:call(node, "kong.sync.v2.notify_new_version", msg) if not res then if not err:find("requested capability does not exist", nil, true) then - ngx_log(ngx_ERR, "unable to notify new version: ", err) + ngx_log(ngx_ERR, "unable to notify ", node, " new version: ", err) end - - else - ngx_log(ngx_DEBUG, "notified ", node, " ", latest_version) end end end diff --git a/kong/clustering/services/sync/rpc.lua b/kong/clustering/services/sync/rpc.lua index 7852cafc2d55..c0c2836b964d 100644 --- a/kong/clustering/services/sync/rpc.lua +++ b/kong/clustering/services/sync/rpc.lua @@ -88,7 +88,7 @@ function _M:init_cp(manager) sync_status = CLUSTERING_SYNC_STATUS.NORMAL, config_hash = fmt("%032d", default_namespace_version), rpc_capabilities = rpc_peers and rpc_peers[node_id] or {}, - }, { ttl = purge_delay }) + }, { ttl = purge_delay, }) if not ok then ngx_log(ngx_ERR, "unable to update clustering data plane status: ", err) end @@ -120,10 +120,7 @@ function _M:init_cp(manager) end if isempty(res) then - ngx_log(ngx_DEBUG, - "[kong.sync.v2] no delta for node_id: ", node_id, - ", current_version: ", default_namespace_version, - ", node is already up to date" ) + -- node is already up to date return inc_sync_result(res) end @@ -206,7 +203,7 @@ local function do_sync() local deltas = ns_delta.deltas if isempty(deltas) then - ngx_log(ngx_DEBUG, "no delta to sync") + -- no delta to sync return true end @@ -280,8 +277,8 @@ local function do_sync() ev = { delta_type, crud_event_type, delta_entity, old_entity, } else - -- delete the entity - local old_entity, err = kong.db[delta_type]:select(delta.pk) -- composite key + -- delete the entity, opts for getting correct lmdb key + local old_entity, err = db[delta_type]:select(delta.pk, opts) -- composite key if err then return nil, err end diff --git a/kong/clustering/services/sync/strategies/postgres.lua b/kong/clustering/services/sync/strategies/postgres.lua index 31a1f6b8dd7a..1ef758e2c1d0 100644 --- a/kong/clustering/services/sync/strategies/postgres.lua +++ b/kong/clustering/services/sync/strategies/postgres.lua @@ -11,10 +11,9 @@ local cjson_encode = cjson.encode local ngx_null = ngx.null local ngx_log = ngx.log local ngx_ERR = ngx.ERR -local ngx_DEBUG = ngx.DEBUG -local CLEANUP_VERSION_COUNT = 100 +local KEEP_VERSION_COUNT = 100 local CLEANUP_TIME_DELAY = 3600 -- 1 hour @@ -39,12 +38,10 @@ local PURGE_QUERY = [[ function _M:init_worker() local function cleanup_handler(premature) if premature then - ngx_log(ngx_DEBUG, "[incremental] worker exiting, killing incremental cleanup timer") - return end - local res, err = self.connector:query(string_format(PURGE_QUERY, CLEANUP_VERSION_COUNT)) + local res, err = self.connector:query(string_format(PURGE_QUERY, KEEP_VERSION_COUNT)) if not res then ngx_log(ngx_ERR, "[incremental] unable to purge old data from incremental delta table, err: ", @@ -52,9 +49,6 @@ function _M:init_worker() return end - - ngx_log(ngx_DEBUG, - "[incremental] successfully purged old data from incremental delta table") end assert(ngx.timer.every(CLEANUP_TIME_DELAY, cleanup_handler)) @@ -97,8 +91,6 @@ function _M:insert_delta(deltas) local sql = string_format(NEW_VERSION_QUERY, buf:get()) - ngx_log(ngx_DEBUG, "[incremental] insert_delta sql: ", sql) - return self.connector:query(sql) end @@ -106,7 +98,7 @@ end function _M:get_latest_version() local sql = "SELECT MAX(version) FROM clustering_sync_version" - local res, err = self.connector:query(sql) + local res, err = self.connector:query(sql, "read") if not res then return nil, err end diff --git a/kong/conf_loader/init.lua b/kong/conf_loader/init.lua index 51ea979d2cc9..2345cd77c5da 100644 --- a/kong/conf_loader/init.lua +++ b/kong/conf_loader/init.lua @@ -953,8 +953,9 @@ local function load(path, custom_conf, opts) end end - if not conf.cluster_rpc then - log.warn("Cluster incremental sync has been forcibly disabled") + if conf.cluster_incremental_sync and not conf.cluster_rpc then + log.warn("Cluster incremental sync has been forcibly disabled, " .. + "please enable cluster RPC.") conf.cluster_incremental_sync = false end diff --git a/kong/db/declarative/export.lua b/kong/db/declarative/export.lua index e20d3c1d8469..9dbe994a42c7 100644 --- a/kong/db/declarative/export.lua +++ b/kong/db/declarative/export.lua @@ -95,6 +95,23 @@ local function end_transaction(db) end +local get_latest_version +do + local strategy + + local function get_latest_version_real() + return strategy:get_latest_version() + end + + get_latest_version = function() + -- ensure we get the initialized kong.db + strategy = require("kong.clustering.services.sync.strategies.postgres").new(kong.db) + get_latest_version = get_latest_version_real + return get_latest_version() + end +end + + local function export_from_db_impl(emitter, skip_ws, skip_disabled_entities, expand_foreigns) local schemas = {} @@ -119,16 +136,7 @@ local function export_from_db_impl(emitter, skip_ws, skip_disabled_entities, exp local sync_version if emitter.want_sync_version then - ok, err = db.connector:query("SELECT max(version) from clustering_sync_version", "read") - if not ok then - return nil, err - end - - -- it will be ngx.null when the table clustering_sync_version is empty - sync_version = assert(ok[1].max) - if sync_version == null then - sync_version = 0 - end + sync_version = get_latest_version() end emitter:emit_toplevel({ diff --git a/kong/db/declarative/import.lua b/kong/db/declarative/import.lua index c083fd86d846..2030da85359a 100644 --- a/kong/db/declarative/import.lua +++ b/kong/db/declarative/import.lua @@ -66,7 +66,7 @@ local function workspace_id(schema, options) end -- options.workspace does not exist - if not options or not options.workspace then + if not options or not options.workspace or options.workspace == "" then return get_workspace_id() end diff --git a/kong/db/migrations/core/024_370_to_380.lua b/kong/db/migrations/core/024_380_to_390.lua similarity index 100% rename from kong/db/migrations/core/024_370_to_380.lua rename to kong/db/migrations/core/024_380_to_390.lua diff --git a/kong/db/migrations/core/init.lua b/kong/db/migrations/core/init.lua index 394f13bf382b..37192c6e82cb 100644 --- a/kong/db/migrations/core/init.lua +++ b/kong/db/migrations/core/init.lua @@ -21,5 +21,5 @@ return { "021_340_to_350", "022_350_to_360", "023_360_to_370", - "024_370_to_380", + "024_380_to_390", } diff --git a/spec/05-migration/db/migrations/core/024_370_to_380_spec.lua b/spec/05-migration/db/migrations/core/024_380_to_390_spec.lua similarity index 100% rename from spec/05-migration/db/migrations/core/024_370_to_380_spec.lua rename to spec/05-migration/db/migrations/core/024_380_to_390_spec.lua