diff --git a/kong/clustering/services/sync/hooks.lua b/kong/clustering/services/sync/hooks.lua index 654ce74f1c59..0dd694e7bbf4 100644 --- a/kong/clustering/services/sync/hooks.lua +++ b/kong/clustering/services/sync/hooks.lua @@ -77,22 +77,28 @@ function _M:notify_all_nodes() end -function _M:entity_delta_writer(entity, name, options, ws_id, is_delete) +local function gen_delta(entity, name, options, ws_id, is_delete) -- composite key, like { id = ... } local schema = kong.db[name].schema local pk = schema:extract_pk_values(entity) assert(schema:validate_primary_key(pk)) - local deltas = { - { + local delta = { type = name, pk = pk, ws_id = ws_id, entity = is_delete and ngx_null or entity, - }, } + return delta +end + + +function _M:entity_delta_writer(entity, name, options, ws_id, is_delete) + local d = gen_delta(entity, name, options, ws_id, is_delete) + local deltas = { d, } + local res, err = self.strategy:insert_delta(deltas) if not res then self.strategy:cancel_txn() @@ -161,8 +167,39 @@ function _M:register_dao_hooks() ngx_log(ngx_DEBUG, "[kong.sync.v2] new delta due to deleting ", name) - -- set lmdb value to ngx_null then return row - return self:entity_delta_writer(entity, name, options, ws_id, true) + -- set lmdb value to ngx_null then return entity + + local d = gen_delta(entity, name, options, ws_id, true) + local deltas = { d, } + + -- delete other related entities + for i, item in ipairs(cascade_entries or EMPTY) do + local e = item.entity + local name = item.dao.schema.name + + ngx_log(ngx_DEBUG, "[kong.sync.v2] new delta due to cascade deleting ", name) + + d = gen_delta(e, name, options, e.ws_id, true) + + -- #1 item is initial entity + deltas[i + 1] = d + end + + local res, err = self.strategy:insert_delta(deltas) + if not res then + self.strategy:cancel_txn() + return nil, err + end + + res, err = self.strategy:commit_txn() + if not res then + self.strategy:cancel_txn() + return nil, err + end + + self:notify_all_nodes() + + return entity -- for other hooks end local dao_hooks = { diff --git a/kong/clustering/services/sync/rpc.lua b/kong/clustering/services/sync/rpc.lua index f115ceff02b1..227d1f1cbb31 100644 --- a/kong/clustering/services/sync/rpc.lua +++ b/kong/clustering/services/sync/rpc.lua @@ -239,6 +239,7 @@ local function do_sync() -- delta should look like: -- { type = ..., entity = { ... }, version = 1, ws_id = ..., } for _, delta in ipairs(deltas) do + local delta_version = delta.version local delta_type = delta.type local delta_entity = delta.entity local ev @@ -270,6 +271,11 @@ local function do_sync() return nil, err end + ngx_log(ngx_DEBUG, + "[kong.sync.v2] update entity", + ", version: ", delta_version, + ", type: ", delta_type) + ev = { delta_type, crud_event_type, delta_entity, old_entity, } else @@ -287,6 +293,11 @@ local function do_sync() end end + ngx_log(ngx_DEBUG, + "[kong.sync.v2] delete entity", + ", version: ", delta_version, + ", type: ", delta_type) + ev = { delta_type, "delete", old_entity, } end @@ -294,10 +305,10 @@ local function do_sync() crud_events[crud_events_n] = ev -- delta.version should not be nil or ngx.null - assert(type(delta.version) == "number") + assert(type(delta_version) == "number") - if delta.version ~= version then - version = delta.version + if delta_version ~= version then + version = delta_version end end -- for _, delta diff --git a/kong/clustering/services/sync/strategies/postgres.lua b/kong/clustering/services/sync/strategies/postgres.lua index bbc397d773cc..31a1f6b8dd7a 100644 --- a/kong/clustering/services/sync/strategies/postgres.lua +++ b/kong/clustering/services/sync/strategies/postgres.lua @@ -78,16 +78,27 @@ local NEW_VERSION_QUERY = [[ -- } function _M:insert_delta(deltas) local buf = buffer.new() - for _, d in ipairs(deltas) do + + local count = #deltas + for i = 1, count do + local d = deltas[i] + buf:putf("(new_version, %s, %s, %s, %s)", self.connector:escape_literal(d.type), self.connector:escape_literal(cjson_encode(d.pk)), self.connector:escape_literal(d.ws_id or kong.default_workspace), self.connector:escape_literal(cjson_encode(d.entity))) + + -- sql values should be separated by comma + if i < count then + buf:put(",") + end end local sql = string_format(NEW_VERSION_QUERY, buf:get()) + ngx_log(ngx_DEBUG, "[incremental] insert_delta sql: ", sql) + return self.connector:query(sql) end