Skip to content

Commit

Permalink
refactor(clustering): small improvements about incremental sync (#13841)
Browse files Browse the repository at this point in the history
  • Loading branch information
chronolaw authored Nov 11, 2024
1 parent a1a6f63 commit 1fd3a76
Show file tree
Hide file tree
Showing 10 changed files with 33 additions and 38 deletions.
2 changes: 1 addition & 1 deletion kong-3.9.0-0.rockspec
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ build = {
["kong.db.migrations.core.021_340_to_350"] = "kong/db/migrations/core/021_340_to_350.lua",
["kong.db.migrations.core.022_350_to_360"] = "kong/db/migrations/core/022_350_to_360.lua",
["kong.db.migrations.core.023_360_to_370"] = "kong/db/migrations/core/023_360_to_370.lua",
["kong.db.migrations.core.024_370_to_380"] = "kong/db/migrations/core/024_370_to_380.lua",
["kong.db.migrations.core.024_380_to_390"] = "kong/db/migrations/core/024_380_to_390.lua",
["kong.db.migrations.operations.200_to_210"] = "kong/db/migrations/operations/200_to_210.lua",
["kong.db.migrations.operations.212_to_213"] = "kong/db/migrations/operations/212_to_213.lua",
["kong.db.migrations.operations.280_to_300"] = "kong/db/migrations/operations/280_to_300.lua",
Expand Down
5 changes: 1 addition & 4 deletions kong/clustering/services/sync/hooks.lua
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,8 @@ function _M:notify_all_nodes()
local res, err = kong.rpc:call(node, "kong.sync.v2.notify_new_version", msg)
if not res then
if not err:find("requested capability does not exist", nil, true) then
ngx_log(ngx_ERR, "unable to notify new version: ", err)
ngx_log(ngx_ERR, "unable to notify ", node, " new version: ", err)
end

else
ngx_log(ngx_DEBUG, "notified ", node, " ", latest_version)
end
end
end
Expand Down
13 changes: 5 additions & 8 deletions kong/clustering/services/sync/rpc.lua
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ function _M:init_cp(manager)
sync_status = CLUSTERING_SYNC_STATUS.NORMAL,
config_hash = fmt("%032d", default_namespace_version),
rpc_capabilities = rpc_peers and rpc_peers[node_id] or {},
}, { ttl = purge_delay })
}, { ttl = purge_delay, })
if not ok then
ngx_log(ngx_ERR, "unable to update clustering data plane status: ", err)
end
Expand Down Expand Up @@ -120,10 +120,7 @@ function _M:init_cp(manager)
end

if isempty(res) then
ngx_log(ngx_DEBUG,
"[kong.sync.v2] no delta for node_id: ", node_id,
", current_version: ", default_namespace_version,
", node is already up to date" )
-- node is already up to date
return inc_sync_result(res)
end

Expand Down Expand Up @@ -206,7 +203,7 @@ local function do_sync()
local deltas = ns_delta.deltas

if isempty(deltas) then
ngx_log(ngx_DEBUG, "no delta to sync")
-- no delta to sync
return true
end

Expand Down Expand Up @@ -280,8 +277,8 @@ local function do_sync()
ev = { delta_type, crud_event_type, delta_entity, old_entity, }

else
-- delete the entity
local old_entity, err = kong.db[delta_type]:select(delta.pk) -- composite key
-- delete the entity, opts for getting correct lmdb key
local old_entity, err = db[delta_type]:select(delta.pk, opts) -- composite key
if err then
return nil, err
end
Expand Down
14 changes: 3 additions & 11 deletions kong/clustering/services/sync/strategies/postgres.lua
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,9 @@ local cjson_encode = cjson.encode
local ngx_null = ngx.null
local ngx_log = ngx.log
local ngx_ERR = ngx.ERR
local ngx_DEBUG = ngx.DEBUG


local CLEANUP_VERSION_COUNT = 100
local KEEP_VERSION_COUNT = 100
local CLEANUP_TIME_DELAY = 3600 -- 1 hour


Expand All @@ -39,22 +38,17 @@ local PURGE_QUERY = [[
function _M:init_worker()
local function cleanup_handler(premature)
if premature then
ngx_log(ngx_DEBUG, "[incremental] worker exiting, killing incremental cleanup timer")

return
end

local res, err = self.connector:query(string_format(PURGE_QUERY, CLEANUP_VERSION_COUNT))
local res, err = self.connector:query(string_format(PURGE_QUERY, KEEP_VERSION_COUNT))
if not res then
ngx_log(ngx_ERR,
"[incremental] unable to purge old data from incremental delta table, err: ",
err)

return
end

ngx_log(ngx_DEBUG,
"[incremental] successfully purged old data from incremental delta table")
end

assert(ngx.timer.every(CLEANUP_TIME_DELAY, cleanup_handler))
Expand Down Expand Up @@ -97,16 +91,14 @@ function _M:insert_delta(deltas)

local sql = string_format(NEW_VERSION_QUERY, buf:get())

ngx_log(ngx_DEBUG, "[incremental] insert_delta sql: ", sql)

return self.connector:query(sql)
end


function _M:get_latest_version()
local sql = "SELECT MAX(version) FROM clustering_sync_version"

local res, err = self.connector:query(sql)
local res, err = self.connector:query(sql, "read")
if not res then
return nil, err
end
Expand Down
5 changes: 3 additions & 2 deletions kong/conf_loader/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -953,8 +953,9 @@ local function load(path, custom_conf, opts)
end
end

if not conf.cluster_rpc then
log.warn("Cluster incremental sync has been forcibly disabled")
if conf.cluster_incremental_sync and not conf.cluster_rpc then
log.warn("Cluster incremental sync has been forcibly disabled, " ..
"please enable cluster RPC.")
conf.cluster_incremental_sync = false
end

Expand Down
28 changes: 18 additions & 10 deletions kong/db/declarative/export.lua
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,23 @@ local function end_transaction(db)
end


local get_latest_version
do
local strategy

local function get_latest_version_real()
return strategy:get_latest_version()
end

get_latest_version = function()
-- ensure we get the initialized kong.db
strategy = require("kong.clustering.services.sync.strategies.postgres").new(kong.db)
get_latest_version = get_latest_version_real
return get_latest_version()
end
end


local function export_from_db_impl(emitter, skip_ws, skip_disabled_entities, expand_foreigns)
local schemas = {}

Expand All @@ -119,16 +136,7 @@ local function export_from_db_impl(emitter, skip_ws, skip_disabled_entities, exp

local sync_version
if emitter.want_sync_version then
ok, err = db.connector:query("SELECT max(version) from clustering_sync_version", "read")
if not ok then
return nil, err
end

-- it will be ngx.null when the table clustering_sync_version is empty
sync_version = assert(ok[1].max)
if sync_version == null then
sync_version = 0
end
sync_version = get_latest_version()
end

emitter:emit_toplevel({
Expand Down
2 changes: 1 addition & 1 deletion kong/db/declarative/import.lua
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ local function workspace_id(schema, options)
end

-- options.workspace does not exist
if not options or not options.workspace then
if not options or not options.workspace or options.workspace == "" then
return get_workspace_id()
end

Expand Down
File renamed without changes.
2 changes: 1 addition & 1 deletion kong/db/migrations/core/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,5 @@ return {
"021_340_to_350",
"022_350_to_360",
"023_360_to_370",
"024_370_to_380",
"024_380_to_390",
}

1 comment on commit 1fd3a76

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bazel Build

Docker image available kong/kong:1fd3a76ebcc916e6e03c648c092bd1c3616e4726
Artifacts available https://github.com/Kong/kong/actions/runs/11781770188

Please sign in to comment.