Skip to content

Commit

Permalink
feat(clustering): introduce incremental sync for clustering (#13157)
Browse files Browse the repository at this point in the history
* Revert "fix(rpc): disable cluster_rpc for 3.7"

This reverts commit ddda6a1.

This commit introduces a freshly redesigned DB-less mode that is more efficient
when storing data, more efficient when accessing data and incrementally update
able.

This commit also introduces the "incremental sync" RPC server/client. It
introduces the `kong.sync.v2` RPC capability that is used by CP to notify
new version updates to DP and for DP to pull config delta. DP applies these
config delta into the LMDB incrementally and transactionally which avoids
the expensive config flip and cache wipe majority of the time.

This commit also modifies the DAO so that for the CP,
config diff logs are generated transactionally whenever entity modification occurs.

Finally, this commit modifies the `off` strategy so that it works with the redesigned
DB-less mode and storage format.

Incremental sync is not yet enabled by default, it can be enabled by setting
`cluster_incremental_sync = on` via `kong.conf`.

KAG-4865
KAG-2986
KAG-2987
KAG-3502
KAG-3258
KAG-5283

---------

Co-authored-by: Chrono <[email protected]>
Co-authored-by: Xiaochen Wang <[email protected]>
  • Loading branch information
3 people authored Oct 16, 2024
1 parent 9ae3638 commit 398e180
Show file tree
Hide file tree
Showing 54 changed files with 1,604 additions and 628 deletions.
3 changes: 3 additions & 0 deletions changelog/unreleased/kong/cp-dp-rpc.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
message: "Added a remote procedure call (RPC) framework for Hybrid mode deployments."
type: feature
scope: Clustering
6 changes: 6 additions & 0 deletions changelog/unreleased/kong/dynamic-log-level-rpc.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
message: |
Dynamic log level over Hybrid mode RPC which allows setting DP log level
to a different level for specified duration before reverting back
to the `kong.conf` configured value.
type: feature
scope: Clustering
9 changes: 7 additions & 2 deletions kong-3.9.0-0.rockspec
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ build = {
["kong.clustering.compat.checkers"] = "kong/clustering/compat/checkers.lua",
["kong.clustering.config_helper"] = "kong/clustering/config_helper.lua",
["kong.clustering.tls"] = "kong/clustering/tls.lua",
["kong.clustering.services.debug"] = "kong/clustering/services/debug.lua",

["kong.clustering.rpc.callbacks"] = "kong/clustering/rpc/callbacks.lua",
["kong.clustering.rpc.future"] = "kong/clustering/rpc/future.lua",
Expand All @@ -99,6 +98,12 @@ build = {
["kong.clustering.rpc.utils"] = "kong/clustering/rpc/utils.lua",
["kong.clustering.rpc.concentrator"] = "kong/clustering/rpc/concentrator.lua",

["kong.clustering.services.debug"] = "kong/clustering/services/debug.lua",
["kong.clustering.services.sync"] = "kong/clustering/services/sync/init.lua",
["kong.clustering.services.sync.rpc"] = "kong/clustering/services/sync/rpc.lua",
["kong.clustering.services.sync.hooks"] = "kong/clustering/services/sync/hooks.lua",
["kong.clustering.services.sync.strategies.postgres"] = "kong/clustering/services/sync/strategies/postgres.lua",

["kong.cluster_events"] = "kong/cluster_events/init.lua",
["kong.cluster_events.strategies.postgres"] = "kong/cluster_events/strategies/postgres.lua",
["kong.cluster_events.strategies.off"] = "kong/cluster_events/strategies/off.lua",
Expand Down Expand Up @@ -289,7 +294,6 @@ build = {
["kong.db.strategies.postgres.plugins"] = "kong/db/strategies/postgres/plugins.lua",
["kong.db.strategies.off"] = "kong/db/strategies/off/init.lua",
["kong.db.strategies.off.connector"] = "kong/db/strategies/off/connector.lua",
["kong.db.strategies.off.tags"] = "kong/db/strategies/off/tags.lua",

["kong.db.migrations.state"] = "kong/db/migrations/state.lua",
["kong.db.migrations.subsystems"] = "kong/db/migrations/subsystems.lua",
Expand All @@ -316,6 +320,7 @@ build = {
["kong.db.migrations.core.021_340_to_350"] = "kong/db/migrations/core/021_340_to_350.lua",
["kong.db.migrations.core.022_350_to_360"] = "kong/db/migrations/core/022_350_to_360.lua",
["kong.db.migrations.core.023_360_to_370"] = "kong/db/migrations/core/023_360_to_370.lua",
["kong.db.migrations.core.024_370_to_380"] = "kong/db/migrations/core/024_370_to_380.lua",
["kong.db.migrations.operations.200_to_210"] = "kong/db/migrations/operations/200_to_210.lua",
["kong.db.migrations.operations.212_to_213"] = "kong/db/migrations/operations/212_to_213.lua",
["kong.db.migrations.operations.280_to_300"] = "kong/db/migrations/operations/280_to_300.lua",
Expand Down
19 changes: 15 additions & 4 deletions kong/clustering/rpc/manager.lua
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ function _M.new(conf, node_id)
-- clients[node_id]: { socket1 => true, socket2 => true, ... }
clients = {},
client_capabilities = {},
client_ips = {}, -- store DP node's ip addr
node_id = node_id,
conf = conf,
cluster_cert = assert(clustering_tls.get_cluster_cert(conf)),
Expand Down Expand Up @@ -75,16 +76,18 @@ end


function _M:_remove_socket(socket)
local sockets = assert(self.clients[socket.node_id])
local node_id = socket.node_id
local sockets = assert(self.clients[node_id])

assert(sockets[socket])

sockets[socket] = nil

if table_isempty(sockets) then
self.clients[socket.node_id] = nil
self.client_capabilities[socket.node_id] = nil
assert(self.concentrator:_enqueue_unsubscribe(socket.node_id))
self.clients[node_id] = nil
self.client_ips[node_id] = nil
self.client_capabilities[node_id] = nil
assert(self.concentrator:_enqueue_unsubscribe(node_id))
end
end

Expand Down Expand Up @@ -255,6 +258,9 @@ function _M:handle_websocket()
local s = socket.new(self, wb, node_id)
self:_add_socket(s, rpc_capabilities)

-- store DP's ip addr
self.client_ips[node_id] = ngx_var.remote_addr

s:start()
local res, err = s:join()
self:_remove_socket(s)
Expand Down Expand Up @@ -362,4 +368,9 @@ function _M:get_peers()
end


function _M:get_peer_ip(node_id)
return self.client_ips[node_id]
end


return _M
179 changes: 179 additions & 0 deletions kong/clustering/services/sync/hooks.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
local _M = {}
local _MT = { __index = _M, }


local hooks = require("kong.hooks")
local EMPTY = require("kong.tools.table").EMPTY


local ipairs = ipairs
local ngx_null = ngx.null
local ngx_log = ngx.log
local ngx_ERR = ngx.ERR
local ngx_DEBUG = ngx.DEBUG


local DEFAULT_PAGE_SIZE = 512


function _M.new(strategy)
local self = {
strategy = strategy,
}

return setmetatable(self, _MT)
end


local function get_all_nodes_with_sync_cap()
local res, err = kong.db.clustering_data_planes:page(DEFAULT_PAGE_SIZE)
if err then
return nil, "unable to query DB " .. err
end

if not res then
return EMPTY
end

local ret = {}
local ret_n = 0

for _, row in ipairs(res) do
for _, c in ipairs(row.rpc_capabilities) do
if c == "kong.sync.v2" then
ret_n = ret_n + 1
ret[ret_n] = row.id
break
end
end
end

return ret
end


function _M:notify_all_nodes()
local latest_version, err = self.strategy:get_latest_version()
if not latest_version then
ngx_log(ngx_ERR, "can not get the latest version: ", err)
return
end

local msg = { default = { new_version = latest_version, }, }

for _, node in ipairs(get_all_nodes_with_sync_cap()) do
local res, err = kong.rpc:call(node, "kong.sync.v2.notify_new_version", msg)
if not res then
if not err:find("requested capability does not exist", nil, true) then
ngx_log(ngx_ERR, "unable to notify new version: ", err)
end

else
ngx_log(ngx_DEBUG, "notified ", node, " ", latest_version)
end
end
end


function _M:entity_delta_writer(row, name, options, ws_id, is_delete)
local deltas = {
{
type = name,
id = row.id,
ws_id = ws_id,
row = is_delete and ngx_null or row,
},
}

local res, err = self.strategy:insert_delta(deltas)
if not res then
self.strategy:cancel_txn()
return nil, err
end

res, err = self.strategy:commit_txn()
if not res then
self.strategy:cancel_txn()
return nil, err
end

self:notify_all_nodes()

return row -- for other hooks
end


-- only control plane has these delta operations
function _M:register_dao_hooks()
local function is_db_export(name)
local db_export = kong.db[name].schema.db_export
return db_export == nil or db_export == true
end

-- common hook functions (pre/fail/post)

local function pre_hook_func(entity, name, options)
if not is_db_export(name) then
return true
end

return self.strategy:begin_txn()
end

local function fail_hook_func(err, entity, name)
if not is_db_export(name) then
return
end

local res, err = self.strategy:cancel_txn()
if not res then
ngx_log(ngx_ERR, "unable to cancel cancel_txn: ", tostring(err))
end
end

local function post_hook_writer_func(row, name, options, ws_id)
if not is_db_export(name) then
return row
end

return self:entity_delta_writer(row, name, options, ws_id)
end

local function post_hook_delete_func(row, name, options, ws_id, cascade_entries)
if not is_db_export(name) then
return row
end

-- set lmdb value to ngx_null then return row
return self:entity_delta_writer(row, name, options, ws_id, true)
end

local dao_hooks = {
-- dao:insert
["dao:insert:pre"] = pre_hook_func,
["dao:insert:fail"] = fail_hook_func,
["dao:insert:post"] = post_hook_writer_func,

-- dao:delete
["dao:delete:pre"] = pre_hook_func,
["dao:delete:fail"] = fail_hook_func,
["dao:delete:post"] = post_hook_delete_func,

-- dao:update
["dao:update:pre"] = pre_hook_func,
["dao:update:fail"] = fail_hook_func,
["dao:update:post"] = post_hook_writer_func,

-- dao:upsert
["dao:upsert:pre"] = pre_hook_func,
["dao:upsert:fail"] = fail_hook_func,
["dao:upsert:post"] = post_hook_writer_func,
}

for ev, func in pairs(dao_hooks) do
hooks.register_hook(ev, func)
end
end


return _M
63 changes: 63 additions & 0 deletions kong/clustering/services/sync/init.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
local _M = {}
local _MT = { __index = _M, }


local events = require("kong.clustering.events")
local strategy = require("kong.clustering.services.sync.strategies.postgres")
local rpc = require("kong.clustering.services.sync.rpc")


-- TODO: what is the proper value?
local FIRST_SYNC_DELAY = 0.5 -- seconds
local EACH_SYNC_DELAY = 30 -- seconds


function _M.new(db, is_cp)
local strategy = strategy.new(db)

local self = {
db = db,
strategy = strategy,
rpc = rpc.new(strategy),
is_cp = is_cp,
}

-- only cp needs hooks
if is_cp then
self.hooks = require("kong.clustering.services.sync.hooks").new(strategy)
end

return setmetatable(self, _MT)
end


function _M:init(manager)
if self.hooks then
self.hooks:register_dao_hooks()
end
self.rpc:init(manager, self.is_cp)
end


function _M:init_worker()
-- is CP, enable clustering broadcasts
if self.is_cp then
events.init()

self.strategy:init_worker()
return
end

-- is DP, sync only in worker 0
if ngx.worker.id() ~= 0 then
return
end

-- sync to CP ASAP
assert(self.rpc:sync_once(FIRST_SYNC_DELAY))

assert(self.rpc:sync_every(EACH_SYNC_DELAY))
end


return _M
Loading

1 comment on commit 398e180

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bazel Build

Docker image available kong/kong:398e180145c8b5123a8cd884328e318d8fe79914
Artifacts available https://github.com/Kong/kong/actions/runs/11367537996

Please sign in to comment.