Skip to content

Commit

Permalink
refactor(clustering/rpc): tunings and fixes for rubustness (#13771)
Browse files Browse the repository at this point in the history
* fix(db): pagesize must not be less than 2
* fix(db): set global key in lmdb
* fix(db): use request_aware_table correctly
* fix(dbless): get default workspace correctly
* fix(clustering): check latest_version and d.ws_id
* fix(clustering): fix dp do_sync()
* fix(clustering):fix declarative export sync
* fix(clustering):check latest_version == ngx_null in CP
* fix(clustering): inc sync add ws (#13790)
* fix(clustering/sync): set ws_id in do_sync()
* fix(clustering): delta version is null (#13789)
* fix(clustering/sync): delta.version may be null
* refactor(clustering): change sync data structure (#13794)
* fix(incremental sync): added comment for 2 times do_sync (#13801)
* fix(incremental sync): fix cache_key handling for select_by_cache_key (#13815)
* fix(incremental sync): fix cache_key handling for select_by_cache_key
* refactor(dbless): clean logic of select_by_field (#13817)
* refactor(dbless): clean logic of selec_by_field
* fix(incremental sync): fixed ws_id processing in _set_entity_for_txn (#13816)

---------

Co-authored-by: Xiaochen Wang <[email protected]>
Co-authored-by: Datong Sun <[email protected]>
  • Loading branch information
3 people authored Nov 1, 2024
1 parent 8190a66 commit 0072d7b
Show file tree
Hide file tree
Showing 10 changed files with 179 additions and 94 deletions.
28 changes: 17 additions & 11 deletions kong/clustering/services/sync/hooks.lua
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,19 @@ function _M:notify_all_nodes()
end


function _M:entity_delta_writer(row, name, options, ws_id, is_delete)
function _M:entity_delta_writer(entity, name, options, ws_id, is_delete)
-- composite key, like { id = ... }
local schema = kong.db[name].schema
local pk = schema:extract_pk_values(entity)

assert(schema:validate_primary_key(pk))

local deltas = {
{
type = name,
id = row.id,
pk = pk,
ws_id = ws_id,
row = is_delete and ngx_null or row,
entity = is_delete and ngx_null or entity,
},
}

Expand All @@ -99,7 +105,7 @@ function _M:entity_delta_writer(row, name, options, ws_id, is_delete)

self:notify_all_nodes()

return row -- for other hooks
return entity -- for other hooks
end


Expand Down Expand Up @@ -131,21 +137,21 @@ function _M:register_dao_hooks()
end
end

local function post_hook_writer_func(row, name, options, ws_id)
local function post_hook_writer_func(entity, name, options, ws_id)
if not is_db_export(name) then
return row
return entity
end

return self:entity_delta_writer(row, name, options, ws_id)
return self:entity_delta_writer(entity, name, options, ws_id)
end

local function post_hook_delete_func(row, name, options, ws_id, cascade_entries)
local function post_hook_delete_func(entity, name, options, ws_id, cascade_entries)
if not is_db_export(name) then
return row
return entity
end

-- set lmdb value to ngx_null then return row
return self:entity_delta_writer(row, name, options, ws_id, true)
-- set lmdb value to ngx_null then return entity
return self:entity_delta_writer(entity, name, options, ws_id, true)
end

local dao_hooks = {
Expand Down
98 changes: 60 additions & 38 deletions kong/clustering/services/sync/rpc.lua
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,17 @@ local txn = require("resty.lmdb.transaction")
local declarative = require("kong.db.declarative")
local constants = require("kong.constants")
local concurrency = require("kong.concurrency")
local isempty = require("table.isempty")


local insert_entity_for_txn = declarative.insert_entity_for_txn
local delete_entity_for_txn = declarative.delete_entity_for_txn
local DECLARATIVE_HASH_KEY = constants.DECLARATIVE_HASH_KEY
local DECLARATIVE_DEFAULT_WORKSPACE_KEY = constants.DECLARATIVE_DEFAULT_WORKSPACE_KEY
local CLUSTERING_SYNC_STATUS = constants.CLUSTERING_SYNC_STATUS
local SYNC_MUTEX_OPTS = { name = "get_delta", timeout = 0, }


local pairs = pairs
local ipairs = ipairs
local fmt = string.format
local ngx_null = ngx.null
Expand All @@ -39,16 +40,16 @@ end


function _M:init_cp(manager)
-- CP
-- Method: kong.sync.v2.get_delta
-- Params: versions: list of current versions of the database
-- { { namespace = "default", version = 1000, }, }
local purge_delay = manager.conf.cluster_data_plane_purge_delay

local function gen_delta_result(res, wipe)
return { default = { deltas = res, wipe = wipe, }, }
end

-- CP
-- Method: kong.sync.v2.get_delta
-- Params: versions: list of current versions of the database
-- example: { default = { version = 1000, }, }
manager.callbacks:register("kong.sync.v2.get_delta", function(node_id, current_versions)
ngx_log(ngx_DEBUG, "[kong.sync.v2] config push (connected client)")

Expand All @@ -57,19 +58,13 @@ function _M:init_cp(manager)
rpc_peers = kong.rpc:get_peers()
end

local default_namespace
for namespace, v in pairs(current_versions) do
if namespace == "default" then
default_namespace = v
break
end
end
local default_namespace = current_versions.default

if not default_namespace then
return nil, "default namespace does not exist inside params"
end

-- { { namespace = "default", version = 1000, }, }
-- { default = { version = 1000, }, }
local default_namespace_version = default_namespace.version

-- XXX TODO: follow update_sync_status() in control_plane.lua
Expand Down Expand Up @@ -117,7 +112,7 @@ function _M:init_cp(manager)
return nil, err
end

if #res == 0 then
if isempty(res) then
ngx_log(ngx_DEBUG,
"[kong.sync.v2] no delta for node_id: ", node_id,
", current_version: ", default_namespace_version,
Expand All @@ -126,7 +121,7 @@ function _M:init_cp(manager)
end

-- some deltas are returned, are they contiguous?
if res[1].version == default_namespace.version + 1 then
if res[1].version == default_namespace_version + 1 then
-- doesn't wipe dp lmdb, incremental sync
return gen_delta_result(res, false)
end
Expand Down Expand Up @@ -155,7 +150,7 @@ function _M:init_dp(manager)
-- DP
-- Method: kong.sync.v2.notify_new_version
-- Params: new_versions: list of namespaces and their new versions, like:
-- { { new_version = 1000, }, }, possible field: namespace = "default"
-- { default = { new_version = 1000, }, }
manager.callbacks:register("kong.sync.v2.notify_new_version", function(node_id, new_versions)
-- TODO: currently only default is supported, and anything else is ignored
local default_new_version = new_versions.default
Expand Down Expand Up @@ -199,24 +194,33 @@ local function do_sync()
return true
end

local ns_delta

for namespace, delta in pairs(ns_deltas) do
if namespace == "default" then
ns_delta = delta
break -- should we break here?
end
end
-- ns_deltas should look like:
-- { default = { deltas = { ... }, wipe = true, }, }

local ns_delta = ns_deltas.default
if not ns_delta then
return nil, "default namespace does not exist inside params"
end

if #ns_delta.deltas == 0 then
local deltas = ns_delta.deltas

if isempty(deltas) then
ngx_log(ngx_DEBUG, "no delta to sync")
return true
end

-- we should find the correct default workspace
-- and replace the old one with it
local default_ws_changed
for _, delta in ipairs(deltas) do
if delta.type == "workspaces" and delta.entity.name == "default" then
kong.default_workspace = delta.entity.id
default_ws_changed = true
break
end
end
assert(type(kong.default_workspace) == "string")

local t = txn.begin(512)

local wipe = ns_delta.wipe
Expand All @@ -227,18 +231,25 @@ local function do_sync()
local db = kong.db

local version = 0
local opts = {}
local crud_events = {}
local crud_events_n = 0

for _, delta in ipairs(ns_delta.deltas) do
-- delta should look like:
-- { type = ..., entity = { ... }, version = 1, ws_id = ..., }
for _, delta in ipairs(deltas) do
local delta_type = delta.type
local delta_row = delta.row
local delta_entity = delta.entity
local ev

if delta_row ~= ngx_null then
-- delta must have ws_id to generate the correct lmdb key
-- set the correct workspace for item
opts.workspace = assert(delta.ws_id)

if delta_entity ~= nil and delta_entity ~= ngx_null then
-- upsert the entity
-- does the entity already exists?
local old_entity, err = db[delta_type]:select(delta_row)
local old_entity, err = db[delta_type]:select(delta_entity)
if err then
return nil, err
end
Expand All @@ -247,29 +258,29 @@ local function do_sync()

-- If we will wipe lmdb, we don't need to delete it from lmdb.
if old_entity and not wipe then
local res, err = delete_entity_for_txn(t, delta_type, old_entity, nil)
local res, err = delete_entity_for_txn(t, delta_type, old_entity, opts)
if not res then
return nil, err
end
end

local res, err = insert_entity_for_txn(t, delta_type, delta_row, nil)
local res, err = insert_entity_for_txn(t, delta_type, delta_entity, opts)
if not res then
return nil, err
end

ev = { delta_type, crud_event_type, delta_row, old_entity, }
ev = { delta_type, crud_event_type, delta_entity, old_entity, }

else
-- delete the entity
local old_entity, err = kong.db[delta_type]:select({ id = delta.id, }) -- TODO: composite key
local old_entity, err = kong.db[delta_type]:select(delta.pk) -- composite key
if err then
return nil, err
end

-- If we will wipe lmdb, we don't need to delete it from lmdb.
if old_entity and not wipe then
local res, err = delete_entity_for_txn(t, delta_type, old_entity, nil)
local res, err = delete_entity_for_txn(t, delta_type, old_entity, opts)
if not res then
return nil, err
end
Expand All @@ -281,13 +292,22 @@ local function do_sync()
crud_events_n = crud_events_n + 1
crud_events[crud_events_n] = ev

-- XXX TODO: could delta.version be nil or ngx.null
if type(delta.version) == "number" and delta.version ~= version then
-- delta.version should not be nil or ngx.null
assert(type(delta.version) == "number")

if delta.version ~= version then
version = delta.version
end
end -- for _, delta

-- store current sync version
t:set(DECLARATIVE_HASH_KEY, fmt("%032d", version))

-- store the correct default workspace uuid
if default_ws_changed then
t:set(DECLARATIVE_DEFAULT_WORKSPACE_KEY, kong.default_workspace)
end

local ok, err = t:commit()
if not ok then
return nil, err
Expand All @@ -299,7 +319,7 @@ local function do_sync()

else
for _, event in ipairs(crud_events) do
-- delta_type, crud_event_type, delta.row, old_entity
-- delta_type, crud_event_type, delta.entity, old_entity
db[event[1]]:post_crud_event(event[2], event[3], event[4])
end
end
Expand All @@ -314,7 +334,9 @@ local function sync_handler(premature)
end

local res, err = concurrency.with_worker_mutex(SYNC_MUTEX_OPTS, function()
-- here must be 2 times
-- `do_sync()` is run twice in a row to report back new version number
-- to CP quickly after sync. (`kong.sync.v2.get_delta` is used for both pulling delta
-- as well as status reporting)
for _ = 1, 2 do
local ok, err = do_sync()
if not ok then
Expand Down
22 changes: 14 additions & 8 deletions kong/clustering/services/sync/strategies/postgres.lua
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ local buffer = require("string.buffer")

local string_format = string.format
local cjson_encode = cjson.encode
local ngx_null = ngx.null
local ngx_log = ngx.log
local ngx_ERR = ngx.ERR
local ngx_DEBUG = ngx.DEBUG
Expand Down Expand Up @@ -66,23 +67,23 @@ local NEW_VERSION_QUERY = [[
new_version integer;
BEGIN
INSERT INTO clustering_sync_version DEFAULT VALUES RETURNING version INTO new_version;
INSERT INTO clustering_sync_delta (version, type, id, ws_id, row) VALUES %s;
INSERT INTO clustering_sync_delta (version, type, pk, ws_id, entity) VALUES %s;
END $$;
]]


-- deltas: {
-- { type = "service", "id" = "d78eb00f-8702-4d6a-bfd9-e005f904ae3e", "ws_id" = "73478cf6-964f-412d-b1c4-8ac88d9e85e9", row = "JSON", }
-- { type = "route", "id" = "0a5bac5c-b795-4981-95d2-919ba3390b7e", "ws_id" = "73478cf6-964f-412d-b1c4-8ac88d9e85e9", row = "JSON", }
-- { type = "service", "pk" = { id = "d78eb00f..." }, "ws_id" = "73478cf6...", entity = "JSON", }
-- { type = "route", "pk" = { id = "0a5bac5c..." }, "ws_id" = "73478cf6...", entity = "JSON", }
-- }
function _M:insert_delta(deltas)
local buf = buffer.new()
for _, d in ipairs(deltas) do
buf:putf("(new_version, %s, %s, %s, %s)",
self.connector:escape_literal(d.type),
self.connector:escape_literal(d.id),
self.connector:escape_literal(d.ws_id),
self.connector:escape_literal(cjson_encode(d.row)))
self.connector:escape_literal(cjson_encode(d.pk)),
self.connector:escape_literal(d.ws_id or kong.default_workspace),
self.connector:escape_literal(cjson_encode(d.entity)))
end

local sql = string_format(NEW_VERSION_QUERY, buf:get())
Expand All @@ -92,14 +93,19 @@ end


function _M:get_latest_version()
local sql = "SELECT MAX(version) AS max_version FROM clustering_sync_version"
local sql = "SELECT MAX(version) FROM clustering_sync_version"

local res, err = self.connector:query(sql)
if not res then
return nil, err
end

return res[1] and res[1].max_version
local ver = res[1] and res[1].max
if ver == ngx_null then
return 0
end

return ver
end


Expand Down
2 changes: 1 addition & 1 deletion kong/db/dao/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -1555,7 +1555,7 @@ function DAO:cache_key(key, arg2, arg3, arg4, arg5, ws_id)
error("key must be a string or an entity table", 2)
end

if key.ws_id ~= nil and key.ws_id ~= null then
if key.ws_id ~= nil and key.ws_id ~= null and schema.workspaceable then
ws_id = key.ws_id
end

Expand Down
7 changes: 6 additions & 1 deletion kong/db/declarative/export.lua
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,11 @@ local function export_from_db_impl(emitter, skip_ws, skip_disabled_entities, exp
return nil, err
end

-- it will be ngx.null when the table clustering_sync_version is empty
sync_version = assert(ok[1].max)
if sync_version == null then
sync_version = 0
end
end

emitter:emit_toplevel({
Expand Down Expand Up @@ -359,7 +363,8 @@ local sync_emitter = {

emit_entity = function(self, entity_name, entity_data)
self.out_n = self.out_n + 1
self.out[self.out_n] = { type = entity_name , row = entity_data, version = self.sync_version, }
self.out[self.out_n] = { type = entity_name , entity = entity_data, version = self.sync_version,
ws_id = kong.default_workspace, }
end,

done = function(self)
Expand Down
Loading

1 comment on commit 0072d7b

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bazel Build

Docker image available kong/kong:0072d7b770af30d6f3373ed6e6d39955b1c78090
Artifacts available https://github.com/Kong/kong/actions/runs/11627077213

Please sign in to comment.