Skip to content

Commit

Permalink
fix(clustering/rpc): cascade deleting for incremental sync (#13836)
Browse files Browse the repository at this point in the history
  • Loading branch information
chronolaw authored Nov 8, 2024
1 parent fd65f1e commit 08de6fe
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 10 deletions.
49 changes: 43 additions & 6 deletions kong/clustering/services/sync/hooks.lua
Original file line number Diff line number Diff line change
Expand Up @@ -77,22 +77,28 @@ function _M:notify_all_nodes()
end


function _M:entity_delta_writer(entity, name, options, ws_id, is_delete)
local function gen_delta(entity, name, options, ws_id, is_delete)
-- composite key, like { id = ... }
local schema = kong.db[name].schema
local pk = schema:extract_pk_values(entity)

assert(schema:validate_primary_key(pk))

local deltas = {
{
local delta = {
type = name,
pk = pk,
ws_id = ws_id,
entity = is_delete and ngx_null or entity,
},
}

return delta
end


function _M:entity_delta_writer(entity, name, options, ws_id, is_delete)
local d = gen_delta(entity, name, options, ws_id, is_delete)
local deltas = { d, }

local res, err = self.strategy:insert_delta(deltas)
if not res then
self.strategy:cancel_txn()
Expand Down Expand Up @@ -161,8 +167,39 @@ function _M:register_dao_hooks()

ngx_log(ngx_DEBUG, "[kong.sync.v2] new delta due to deleting ", name)

-- set lmdb value to ngx_null then return row
return self:entity_delta_writer(entity, name, options, ws_id, true)
-- set lmdb value to ngx_null then return entity

local d = gen_delta(entity, name, options, ws_id, true)
local deltas = { d, }

-- delete other related entities
for i, item in ipairs(cascade_entries or EMPTY) do
local e = item.entity
local name = item.dao.schema.name

ngx_log(ngx_DEBUG, "[kong.sync.v2] new delta due to cascade deleting ", name)

d = gen_delta(e, name, options, e.ws_id, true)

-- #1 item is initial entity
deltas[i + 1] = d
end

local res, err = self.strategy:insert_delta(deltas)
if not res then
self.strategy:cancel_txn()
return nil, err
end

res, err = self.strategy:commit_txn()
if not res then
self.strategy:cancel_txn()
return nil, err
end

self:notify_all_nodes()

return entity -- for other hooks
end

local dao_hooks = {
Expand Down
17 changes: 14 additions & 3 deletions kong/clustering/services/sync/rpc.lua
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ local function do_sync()
-- delta should look like:
-- { type = ..., entity = { ... }, version = 1, ws_id = ..., }
for _, delta in ipairs(deltas) do
local delta_version = delta.version
local delta_type = delta.type
local delta_entity = delta.entity
local ev
Expand Down Expand Up @@ -270,6 +271,11 @@ local function do_sync()
return nil, err
end

ngx_log(ngx_DEBUG,
"[kong.sync.v2] update entity",
", version: ", delta_version,
", type: ", delta_type)

ev = { delta_type, crud_event_type, delta_entity, old_entity, }

else
Expand All @@ -287,17 +293,22 @@ local function do_sync()
end
end

ngx_log(ngx_DEBUG,
"[kong.sync.v2] delete entity",
", version: ", delta_version,
", type: ", delta_type)

ev = { delta_type, "delete", old_entity, }
end

crud_events_n = crud_events_n + 1
crud_events[crud_events_n] = ev

-- delta.version should not be nil or ngx.null
assert(type(delta.version) == "number")
assert(type(delta_version) == "number")

if delta.version ~= version then
version = delta.version
if delta_version ~= version then
version = delta_version
end
end -- for _, delta

Expand Down
13 changes: 12 additions & 1 deletion kong/clustering/services/sync/strategies/postgres.lua
Original file line number Diff line number Diff line change
Expand Up @@ -78,16 +78,27 @@ local NEW_VERSION_QUERY = [[
-- }
function _M:insert_delta(deltas)
local buf = buffer.new()
for _, d in ipairs(deltas) do

local count = #deltas
for i = 1, count do
local d = deltas[i]

buf:putf("(new_version, %s, %s, %s, %s)",
self.connector:escape_literal(d.type),
self.connector:escape_literal(cjson_encode(d.pk)),
self.connector:escape_literal(d.ws_id or kong.default_workspace),
self.connector:escape_literal(cjson_encode(d.entity)))

-- sql values should be separated by comma
if i < count then
buf:put(",")
end
end

local sql = string_format(NEW_VERSION_QUERY, buf:get())

ngx_log(ngx_DEBUG, "[incremental] insert_delta sql: ", sql)

return self.connector:query(sql)
end

Expand Down

1 comment on commit 08de6fe

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bazel Build

Docker image available kong/kong:08de6fe1ecc661885dc8e127289513b464656bf4
Artifacts available https://github.com/Kong/kong/actions/runs/11734065680

Please sign in to comment.