Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(clustering): small improvements about incremental sync #13841

Merged
merged 15 commits into from
Nov 11, 2024
2 changes: 1 addition & 1 deletion kong-3.9.0-0.rockspec
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ build = {
["kong.db.migrations.core.021_340_to_350"] = "kong/db/migrations/core/021_340_to_350.lua",
["kong.db.migrations.core.022_350_to_360"] = "kong/db/migrations/core/022_350_to_360.lua",
["kong.db.migrations.core.023_360_to_370"] = "kong/db/migrations/core/023_360_to_370.lua",
["kong.db.migrations.core.024_370_to_380"] = "kong/db/migrations/core/024_370_to_380.lua",
chronolaw marked this conversation as resolved.
Show resolved Hide resolved
["kong.db.migrations.core.024_380_to_390"] = "kong/db/migrations/core/024_380_to_390.lua",
["kong.db.migrations.operations.200_to_210"] = "kong/db/migrations/operations/200_to_210.lua",
["kong.db.migrations.operations.212_to_213"] = "kong/db/migrations/operations/212_to_213.lua",
["kong.db.migrations.operations.280_to_300"] = "kong/db/migrations/operations/280_to_300.lua",
Expand Down
5 changes: 1 addition & 4 deletions kong/clustering/services/sync/hooks.lua
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,8 @@ function _M:notify_all_nodes()
local res, err = kong.rpc:call(node, "kong.sync.v2.notify_new_version", msg)
if not res then
if not err:find("requested capability does not exist", nil, true) then
ngx_log(ngx_ERR, "unable to notify new version: ", err)
ngx_log(ngx_ERR, "unable to notify ", node, " new version: ", err)
end

else
ngx_log(ngx_DEBUG, "notified ", node, " ", latest_version)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we still need the log for debugging. It will be hard to diagnose if we log nothing when succeeds

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's remove it now, If we do need it in the practice we could add them back.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chronolaw With the level debug it's almost free in runtime. Any specific reason we are removing them?

Copy link
Member

@bungle bungle Nov 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@StarlightIbuki we need to be careful with debug logs. Currently we don't have means of disabling just some debug logs or fine tune what is debug logged. While log messages like these may be beneficial when debugging incremental sync, it makes debugging everything else much harder as logs are just flooded with these. Logs that happen by background workers and timers or automated processes are especially problematic. And we already log errors. Unix: no news is good news.

end
end
end
Expand Down
13 changes: 5 additions & 8 deletions kong/clustering/services/sync/rpc.lua
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ function _M:init_cp(manager)
sync_status = CLUSTERING_SYNC_STATUS.NORMAL,
config_hash = fmt("%032d", default_namespace_version),
rpc_capabilities = rpc_peers and rpc_peers[node_id] or {},
}, { ttl = purge_delay })
}, { ttl = purge_delay, no_broadcast_crud_event = true })
bungle marked this conversation as resolved.
Show resolved Hide resolved
if not ok then
ngx_log(ngx_ERR, "unable to update clustering data plane status: ", err)
end
Expand Down Expand Up @@ -120,10 +120,7 @@ function _M:init_cp(manager)
end

if isempty(res) then
ngx_log(ngx_DEBUG,
"[kong.sync.v2] no delta for node_id: ", node_id,
", current_version: ", default_namespace_version,
", node is already up to date" )
-- node is already up to date
return inc_sync_result(res)
end

Expand Down Expand Up @@ -206,7 +203,7 @@ local function do_sync()
local deltas = ns_delta.deltas

if isempty(deltas) then
ngx_log(ngx_DEBUG, "no delta to sync")
chronolaw marked this conversation as resolved.
Show resolved Hide resolved
-- no delta to sync
return true
end

Expand Down Expand Up @@ -280,8 +277,8 @@ local function do_sync()
ev = { delta_type, crud_event_type, delta_entity, old_entity, }

else
-- delete the entity
local old_entity, err = kong.db[delta_type]:select(delta.pk) -- composite key
-- delete the entity, opts for getting correct lmdb key
local old_entity, err = db[delta_type]:select(delta.pk, opts) -- composite key
if err then
return nil, err
end
Expand Down
14 changes: 3 additions & 11 deletions kong/clustering/services/sync/strategies/postgres.lua
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,9 @@ local cjson_encode = cjson.encode
local ngx_null = ngx.null
local ngx_log = ngx.log
local ngx_ERR = ngx.ERR
local ngx_DEBUG = ngx.DEBUG


local CLEANUP_VERSION_COUNT = 100
local KEEP_VERSION_COUNT = 100
local CLEANUP_TIME_DELAY = 3600 -- 1 hour


Expand All @@ -39,22 +38,17 @@ local PURGE_QUERY = [[
function _M:init_worker()
local function cleanup_handler(premature)
if premature then
ngx_log(ngx_DEBUG, "[incremental] worker exiting, killing incremental cleanup timer")

return
end

local res, err = self.connector:query(string_format(PURGE_QUERY, CLEANUP_VERSION_COUNT))
local res, err = self.connector:query(string_format(PURGE_QUERY, KEEP_VERSION_COUNT))
if not res then
ngx_log(ngx_ERR,
"[incremental] unable to purge old data from incremental delta table, err: ",
err)

return
end

ngx_log(ngx_DEBUG,
"[incremental] successfully purged old data from incremental delta table")
end

assert(ngx.timer.every(CLEANUP_TIME_DELAY, cleanup_handler))
Expand Down Expand Up @@ -97,16 +91,14 @@ function _M:insert_delta(deltas)

local sql = string_format(NEW_VERSION_QUERY, buf:get())

ngx_log(ngx_DEBUG, "[incremental] insert_delta sql: ", sql)

return self.connector:query(sql)
end


function _M:get_latest_version()
local sql = "SELECT MAX(version) FROM clustering_sync_version"

local res, err = self.connector:query(sql)
local res, err = self.connector:query(sql, "read")
if not res then
return nil, err
end
Expand Down
5 changes: 3 additions & 2 deletions kong/conf_loader/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -953,8 +953,9 @@ local function load(path, custom_conf, opts)
end
end

if not conf.cluster_rpc then
log.warn("Cluster incremental sync has been forcibly disabled")
if conf.cluster_incremental_sync and not conf.cluster_rpc then
log.warn("Cluster incremental sync has been forcibly disabled, " ..
"please enable cluster RPC.")
conf.cluster_incremental_sync = false
end

Expand Down
11 changes: 2 additions & 9 deletions kong/db/declarative/export.lua
Original file line number Diff line number Diff line change
Expand Up @@ -119,16 +119,9 @@ local function export_from_db_impl(emitter, skip_ws, skip_disabled_entities, exp

local sync_version
if emitter.want_sync_version then
ok, err = db.connector:query("SELECT max(version) from clustering_sync_version", "read")
if not ok then
return nil, err
end
local strategy = require("kong.clustering.services.sync.strategies.postgres").new(db)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we initialize this once and then just reuse, e.g. a module level get_latest_version:

Suggested change
local strategy = require("kong.clustering.services.sync.strategies.postgres").new(db)
if not get_latest_version then
local strategy = require("kong.clustering.services.sync.strategies.postgres").new(db)
get_latest_version = function()
return strategy:get_latest_version()
end

Or put it in some helper as it seems to be used in many places.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It needs the global variable kong.db, I think that it can not be a module level function.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

module level function that caches it on first use

Copy link
Contributor Author

@chronolaw chronolaw Nov 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"kong.clustering.services.sync.strategies.postgres.new() needs a db param, if it is a module level function, kong.db may not be initialized yet.

Copy link
Member

@bungle bungle Nov 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chronolaw I meant something like this:

local get_latest_version do
  local strategy
  local function get_latest_version_real()
    return strategy:get_latest_version()
  end 
  function get_latest_version()
    strategy = require("kong.clustering.services.sync.strategies.postgres").new(kong.db)
    get_latest_version = get_latest_version_real
    return get_latest_version()
  end
end

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, many thanks.


-- it will be ngx.null when the table clustering_sync_version is empty
sync_version = assert(ok[1].max)
if sync_version == null then
sync_version = 0
end
sync_version = strategy:get_latest_version()
end

emitter:emit_toplevel({
Expand Down
2 changes: 1 addition & 1 deletion kong/db/declarative/import.lua
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ local function workspace_id(schema, options)
end

-- options.workspace does not exist
if not options or not options.workspace then
if not options or not options.workspace or options.workspace == "" then
return get_workspace_id()
end

Expand Down
2 changes: 1 addition & 1 deletion kong/db/migrations/core/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,5 @@ return {
"021_340_to_350",
"022_350_to_360",
"023_360_to_370",
"024_370_to_380",
"024_380_to_390",
}
Loading