Skip to content

Commit

Permalink
feat(clustering): introduce incremental sync for clustering (#13157)
Browse files Browse the repository at this point in the history
* Revert "fix(rpc): disable cluster_rpc for 3.7"

This reverts commit ddda6a1.

This commit introduces a freshly redesigned DB-less mode that is more efficient
when storing data, more efficient when accessing data and incrementally update
able.

This commit also introduces the "incremental sync" RPC server/client. It
introduces the `kong.sync.v2` RPC capability that is used by CP to notify
new version updates to DP and for DP to pull config delta. DP applies these
config delta into the LMDB incrementally and transactionally which avoids
the expensive config flip and cache wipe majority of the time.

This commit also modifies the DAO so that for the CP,
config diff logs are generated transactionally whenever entity modification occurs.

Finally, this commit modifies the `off` strategy so that it works with the redesigned
DB-less mode and storage format.

Incremental sync is not yet enabled by default, it can be enabled by setting
`cluster_incremental_sync = on` via `kong.conf`.

KAG-4865
KAG-2986
KAG-2987
KAG-3502
KAG-3258
KAG-5283

---------

Co-authored-by: Chrono <[email protected]>
Co-authored-by: Xiaochen Wang <[email protected]>
  • Loading branch information
3 people authored and bungle committed Nov 7, 2024
1 parent a476f79 commit 4d69f51
Show file tree
Hide file tree
Showing 53 changed files with 1,646 additions and 618 deletions.
3 changes: 3 additions & 0 deletions changelog/unreleased/kong/cp-dp-rpc.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
message: "Added a remote procedure call (RPC) framework for Hybrid mode deployments."
type: feature
scope: Clustering
6 changes: 6 additions & 0 deletions changelog/unreleased/kong/dynamic-log-level-rpc.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
message: |
Dynamic log level over Hybrid mode RPC which allows setting DP log level
to a different level for specified duration before reverting back
to the `kong.conf` configured value.
type: feature
scope: Clustering
8 changes: 7 additions & 1 deletion kong-3.9.0-0.rockspec
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ build = {
["kong.clustering.config_sync_backup.election"] = "kong/clustering/config_sync_backup/election.lua",
["kong.clustering.config_sync_backup.strategies.s3"] = "kong/clustering/config_sync_backup/strategies/s3.lua",
["kong.clustering.config_sync_backup.strategies.gcs"] = "kong/clustering/config_sync_backup/strategies/gcs.lua",
["kong.clustering.services.debug"] = "kong/clustering/services/debug.lua",

["kong.clustering.rpc.callbacks"] = "kong/clustering/rpc/callbacks.lua",
["kong.clustering.rpc.future"] = "kong/clustering/rpc/future.lua",
Expand All @@ -121,6 +120,12 @@ build = {
["kong.clustering.rpc.utils"] = "kong/clustering/rpc/utils.lua",
["kong.clustering.rpc.concentrator"] = "kong/clustering/rpc/concentrator.lua",

["kong.clustering.services.debug"] = "kong/clustering/services/debug.lua",
["kong.clustering.services.sync"] = "kong/clustering/services/sync/init.lua",
["kong.clustering.services.sync.rpc"] = "kong/clustering/services/sync/rpc.lua",
["kong.clustering.services.sync.hooks"] = "kong/clustering/services/sync/hooks.lua",
["kong.clustering.services.sync.strategies.postgres"] = "kong/clustering/services/sync/strategies/postgres.lua",

["kong.cluster_events"] = "kong/cluster_events/init.lua",
["kong.cluster_events.strategies.postgres"] = "kong/cluster_events/strategies/postgres.lua",
["kong.cluster_events.strategies.off"] = "kong/cluster_events/strategies/off.lua",
Expand Down Expand Up @@ -590,6 +595,7 @@ build = {
["kong.db.migrations.core.021_340_to_350"] = "kong/db/migrations/core/021_340_to_350.lua",
["kong.db.migrations.core.022_350_to_360"] = "kong/db/migrations/core/022_350_to_360.lua",
["kong.db.migrations.core.023_360_to_370"] = "kong/db/migrations/core/023_360_to_370.lua",
["kong.db.migrations.core.024_370_to_380"] = "kong/db/migrations/core/024_370_to_380.lua",
["kong.db.migrations.operations.200_to_210"] = "kong/db/migrations/operations/200_to_210.lua",
["kong.db.migrations.operations.212_to_213"] = "kong/db/migrations/operations/212_to_213.lua",
["kong.db.migrations.operations.230_to_260"] = "kong/db/migrations/operations/230_to_260.lua",
Expand Down
19 changes: 15 additions & 4 deletions kong/clustering/rpc/manager.lua
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ function _M.new(conf, node_id)
-- clients[node_id]: { socket1 => true, socket2 => true, ... }
clients = {},
client_capabilities = {},
client_ips = {}, -- store DP node's ip addr
node_id = node_id,
conf = conf,
cluster_cert = assert(clustering_tls.get_cluster_cert(conf)),
Expand Down Expand Up @@ -82,16 +83,18 @@ end


function _M:_remove_socket(socket)
local sockets = assert(self.clients[socket.node_id])
local node_id = socket.node_id
local sockets = assert(self.clients[node_id])

assert(sockets[socket])

sockets[socket] = nil

if table_isempty(sockets) then
self.clients[socket.node_id] = nil
self.client_capabilities[socket.node_id] = nil
assert(self.concentrator:_enqueue_unsubscribe(socket.node_id))
self.clients[node_id] = nil
self.client_ips[node_id] = nil
self.client_capabilities[node_id] = nil
assert(self.concentrator:_enqueue_unsubscribe(node_id))
end
end

Expand Down Expand Up @@ -262,6 +265,9 @@ function _M:handle_websocket()
local s = socket.new(self, wb, node_id)
self:_add_socket(s, rpc_capabilities)

-- store DP's ip addr
self.client_ips[node_id] = ngx_var.remote_addr

s:start()
local res, err = s:join()
self:_remove_socket(s)
Expand Down Expand Up @@ -369,4 +375,9 @@ function _M:get_peers()
end


function _M:get_peer_ip(node_id)
return self.client_ips[node_id]
end


return _M
186 changes: 186 additions & 0 deletions kong/clustering/services/sync/hooks.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
-- This software is copyright Kong Inc. and its licensors.
-- Use of the software is subject to the agreement between your organization
-- and Kong Inc. If there is no such agreement, use is governed by and
-- subject to the terms of the Kong Master Software License Agreement found
-- at https://konghq.com/enterprisesoftwarelicense/.
-- [ END OF LICENSE 0867164ffc95e54f04670b5169c09574bdbd9bba ]

local _M = {}
local _MT = { __index = _M, }


local hooks = require("kong.hooks")
local EMPTY = require("kong.tools.table").EMPTY


local ipairs = ipairs
local ngx_null = ngx.null
local ngx_log = ngx.log
local ngx_ERR = ngx.ERR
local ngx_DEBUG = ngx.DEBUG


local DEFAULT_PAGE_SIZE = 512


function _M.new(strategy)
local self = {
strategy = strategy,
}

return setmetatable(self, _MT)
end


local function get_all_nodes_with_sync_cap()
local res, err = kong.db.clustering_data_planes:page(DEFAULT_PAGE_SIZE)
if err then
return nil, "unable to query DB " .. err
end

if not res then
return EMPTY
end

local ret = {}
local ret_n = 0

for _, row in ipairs(res) do
for _, c in ipairs(row.rpc_capabilities) do
if c == "kong.sync.v2" then
ret_n = ret_n + 1
ret[ret_n] = row.id
break
end
end
end

return ret
end


function _M:notify_all_nodes()
local latest_version, err = self.strategy:get_latest_version()
if not latest_version then
ngx_log(ngx_ERR, "can not get the latest version: ", err)
return
end

local msg = { default = { new_version = latest_version, }, }

for _, node in ipairs(get_all_nodes_with_sync_cap()) do
local res, err = kong.rpc:call(node, "kong.sync.v2.notify_new_version", msg)
if not res then
if not err:find("requested capability does not exist", nil, true) then
ngx_log(ngx_ERR, "unable to notify new version: ", err)
end

else
ngx_log(ngx_DEBUG, "notified ", node, " ", latest_version)
end
end
end


function _M:entity_delta_writer(row, name, options, ws_id, is_delete)
local deltas = {
{
type = name,
id = row.id,
ws_id = ws_id,
row = is_delete and ngx_null or row,
},
}

local res, err = self.strategy:insert_delta(deltas)
if not res then
self.strategy:cancel_txn()
return nil, err
end

res, err = self.strategy:commit_txn()
if not res then
self.strategy:cancel_txn()
return nil, err
end

self:notify_all_nodes()

return row -- for other hooks
end


-- only control plane has these delta operations
function _M:register_dao_hooks()
local function is_db_export(name)
local db_export = kong.db[name].schema.db_export
return db_export == nil or db_export == true
end

-- common hook functions (pre/fail/post)

local function pre_hook_func(entity, name, options)
if not is_db_export(name) then
return true
end

return self.strategy:begin_txn()
end

local function fail_hook_func(err, entity, name)
if not is_db_export(name) then
return
end

local res, err = self.strategy:cancel_txn()
if not res then
ngx_log(ngx_ERR, "unable to cancel cancel_txn: ", tostring(err))
end
end

local function post_hook_writer_func(row, name, options, ws_id)
if not is_db_export(name) then
return row
end

return self:entity_delta_writer(row, name, options, ws_id)
end

local function post_hook_delete_func(row, name, options, ws_id, cascade_entries)
if not is_db_export(name) then
return row
end

-- set lmdb value to ngx_null then return row
return self:entity_delta_writer(row, name, options, ws_id, true)
end

local dao_hooks = {
-- dao:insert
["dao:insert:pre"] = pre_hook_func,
["dao:insert:fail"] = fail_hook_func,
["dao:insert:post"] = post_hook_writer_func,

-- dao:delete
["dao:delete:pre"] = pre_hook_func,
["dao:delete:fail"] = fail_hook_func,
["dao:delete:post"] = post_hook_delete_func,

-- dao:update
["dao:update:pre"] = pre_hook_func,
["dao:update:fail"] = fail_hook_func,
["dao:update:post"] = post_hook_writer_func,

-- dao:upsert
["dao:upsert:pre"] = pre_hook_func,
["dao:upsert:fail"] = fail_hook_func,
["dao:upsert:post"] = post_hook_writer_func,
}

for ev, func in pairs(dao_hooks) do
hooks.register_hook(ev, func)
end
end


return _M
70 changes: 70 additions & 0 deletions kong/clustering/services/sync/init.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
-- This software is copyright Kong Inc. and its licensors.
-- Use of the software is subject to the agreement between your organization
-- and Kong Inc. If there is no such agreement, use is governed by and
-- subject to the terms of the Kong Master Software License Agreement found
-- at https://konghq.com/enterprisesoftwarelicense/.
-- [ END OF LICENSE 0867164ffc95e54f04670b5169c09574bdbd9bba ]

local _M = {}
local _MT = { __index = _M, }


local events = require("kong.clustering.events")
local strategy = require("kong.clustering.services.sync.strategies.postgres")
local rpc = require("kong.clustering.services.sync.rpc")


-- TODO: what is the proper value?
local FIRST_SYNC_DELAY = 0.5 -- seconds
local EACH_SYNC_DELAY = 30 -- seconds


function _M.new(db, is_cp)
local strategy = strategy.new(db)

local self = {
db = db,
strategy = strategy,
rpc = rpc.new(strategy),
is_cp = is_cp,
}

-- only cp needs hooks
if is_cp then
self.hooks = require("kong.clustering.services.sync.hooks").new(strategy)
end

return setmetatable(self, _MT)
end


function _M:init(manager)
if self.hooks then
self.hooks:register_dao_hooks()
end
self.rpc:init(manager, self.is_cp)
end


function _M:init_worker()
-- is CP, enable clustering broadcasts
if self.is_cp then
events.init()

self.strategy:init_worker()
return
end

-- is DP, sync only in worker 0
if ngx.worker.id() ~= 0 then
return
end

-- sync to CP ASAP
assert(self.rpc:sync_once(FIRST_SYNC_DELAY))

assert(self.rpc:sync_every(EACH_SYNC_DELAY))
end


return _M
Loading

0 comments on commit 4d69f51

Please sign in to comment.