From ca29685e9c10c94ca69794edd51212d543ce9674 Mon Sep 17 00:00:00 2001 From: chronolaw Date: Tue, 29 Oct 2024 12:23:30 +0800 Subject: [PATCH] rename in rpc.sync --- kong/clustering/services/sync/hooks.lua | 22 +++++++++---------- kong/clustering/services/sync/rpc.lua | 18 +++++++-------- .../services/sync/strategies/postgres.lua | 8 +++---- 3 files changed, 24 insertions(+), 24 deletions(-) diff --git a/kong/clustering/services/sync/hooks.lua b/kong/clustering/services/sync/hooks.lua index 336bd3226aa1..ae7bbbe90620 100644 --- a/kong/clustering/services/sync/hooks.lua +++ b/kong/clustering/services/sync/hooks.lua @@ -75,10 +75,10 @@ function _M:notify_all_nodes() end -function _M:entity_delta_writer(row, name, options, ws_id, is_delete) +function _M:entity_delta_writer(entity, name, options, ws_id, is_delete) -- composite key, like { id = ... } local schema = kong.db[name].schema - local pk = schema:extract_pk_values(row) + local pk = schema:extract_pk_values(entity) assert(schema:validate_primary_key(pk)) @@ -87,7 +87,7 @@ function _M:entity_delta_writer(row, name, options, ws_id, is_delete) type = name, pk = pk, ws_id = ws_id, - row = is_delete and ngx_null or row, + entity = is_delete and ngx_null or entity, }, } @@ -105,7 +105,7 @@ function _M:entity_delta_writer(row, name, options, ws_id, is_delete) self:notify_all_nodes() - return row -- for other hooks + return entity -- for other hooks end @@ -137,21 +137,21 @@ function _M:register_dao_hooks() end end - local function post_hook_writer_func(row, name, options, ws_id) + local function post_hook_writer_func(entity, name, options, ws_id) if not is_db_export(name) then - return row + return entity end - return self:entity_delta_writer(row, name, options, ws_id) + return self:entity_delta_writer(entity, name, options, ws_id) end - local function post_hook_delete_func(row, name, options, ws_id, cascade_entries) + local function post_hook_delete_func(entity, name, options, ws_id, cascade_entries) if not is_db_export(name) then - return row + return entity end - -- set lmdb value to ngx_null then return row - return self:entity_delta_writer(row, name, options, ws_id, true) + -- set lmdb value to ngx_null then return entity + return self:entity_delta_writer(entity, name, options, ws_id, true) end local dao_hooks = { diff --git a/kong/clustering/services/sync/rpc.lua b/kong/clustering/services/sync/rpc.lua index 1a8f749af01d..4d46cef1fb5a 100644 --- a/kong/clustering/services/sync/rpc.lua +++ b/kong/clustering/services/sync/rpc.lua @@ -213,8 +213,8 @@ local function do_sync() -- and replace the old one with it local default_ws_changed for _, delta in ipairs(deltas) do - if delta.type == "workspaces" and delta.row.name == "default" then - kong.default_workspace = delta.row.id + if delta.type == "workspaces" and delta.entity.name == "default" then + kong.default_workspace = delta.entity.id default_ws_changed = true break end @@ -236,20 +236,20 @@ local function do_sync() local crud_events_n = 0 -- delta should look like: - -- { type = ..., row = { ... }, version = 1, ws_id = ..., } + -- { type = ..., entity = { ... }, version = 1, ws_id = ..., } for _, delta in ipairs(deltas) do local delta_type = delta.type - local delta_row = delta.row + local delta_entity = delta.entity local ev -- delta must have ws_id to generate the correct lmdb key -- set the correct workspace for item opts.workspace = assert(delta.ws_id) - if delta_row ~= ngx_null then + if delta_entity ~= ngx_null then -- upsert the entity -- does the entity already exists? - local old_entity, err = db[delta_type]:select(delta_row) + local old_entity, err = db[delta_type]:select(delta_entity) if err then return nil, err end @@ -264,12 +264,12 @@ local function do_sync() end end - local res, err = insert_entity_for_txn(t, delta_type, delta_row, opts) + local res, err = insert_entity_for_txn(t, delta_type, delta_entity, opts) if not res then return nil, err end - ev = { delta_type, crud_event_type, delta_row, old_entity, } + ev = { delta_type, crud_event_type, delta_entity, old_entity, } else -- delete the entity @@ -319,7 +319,7 @@ local function do_sync() else for _, event in ipairs(crud_events) do - -- delta_type, crud_event_type, delta.row, old_entity + -- delta_type, crud_event_type, delta.entity, old_entity db[event[1]]:post_crud_event(event[2], event[3], event[4]) end end diff --git a/kong/clustering/services/sync/strategies/postgres.lua b/kong/clustering/services/sync/strategies/postgres.lua index 616a4e38cc78..bbc397d773cc 100644 --- a/kong/clustering/services/sync/strategies/postgres.lua +++ b/kong/clustering/services/sync/strategies/postgres.lua @@ -67,14 +67,14 @@ local NEW_VERSION_QUERY = [[ new_version integer; BEGIN INSERT INTO clustering_sync_version DEFAULT VALUES RETURNING version INTO new_version; - INSERT INTO clustering_sync_delta (version, type, pk, ws_id, row) VALUES %s; + INSERT INTO clustering_sync_delta (version, type, pk, ws_id, entity) VALUES %s; END $$; ]] -- deltas: { --- { type = "service", "pk" = { id = "d78eb00f..." }, "ws_id" = "73478cf6...", row = "JSON", } --- { type = "route", "pk" = { id = "0a5bac5c..." }, "ws_id" = "73478cf6...", row = "JSON", } +-- { type = "service", "pk" = { id = "d78eb00f..." }, "ws_id" = "73478cf6...", entity = "JSON", } +-- { type = "route", "pk" = { id = "0a5bac5c..." }, "ws_id" = "73478cf6...", entity = "JSON", } -- } function _M:insert_delta(deltas) local buf = buffer.new() @@ -83,7 +83,7 @@ function _M:insert_delta(deltas) self.connector:escape_literal(d.type), self.connector:escape_literal(cjson_encode(d.pk)), self.connector:escape_literal(d.ws_id or kong.default_workspace), - self.connector:escape_literal(cjson_encode(d.row))) + self.connector:escape_literal(cjson_encode(d.entity))) end local sql = string_format(NEW_VERSION_QUERY, buf:get())