From 0072d7b770af30d6f3373ed6e6d39955b1c78090 Mon Sep 17 00:00:00 2001 From: Chrono Date: Fri, 1 Nov 2024 17:12:30 +0800 Subject: [PATCH] refactor(clustering/rpc): tunings and fixes for rubustness (#13771) * fix(db): pagesize must not be less than 2 * fix(db): set global key in lmdb * fix(db): use request_aware_table correctly * fix(dbless): get default workspace correctly * fix(clustering): check latest_version and d.ws_id * fix(clustering): fix dp do_sync() * fix(clustering):fix declarative export sync * fix(clustering):check latest_version == ngx_null in CP * fix(clustering): inc sync add ws (#13790) * fix(clustering/sync): set ws_id in do_sync() * fix(clustering): delta version is null (#13789) * fix(clustering/sync): delta.version may be null * refactor(clustering): change sync data structure (#13794) * fix(incremental sync): added comment for 2 times do_sync (#13801) * fix(incremental sync): fix cache_key handling for select_by_cache_key (#13815) * fix(incremental sync): fix cache_key handling for select_by_cache_key * refactor(dbless): clean logic of select_by_field (#13817) * refactor(dbless): clean logic of selec_by_field * fix(incremental sync): fixed ws_id processing in _set_entity_for_txn (#13816) --------- Co-authored-by: Xiaochen Wang Co-authored-by: Datong Sun --- kong/clustering/services/sync/hooks.lua | 28 +++--- kong/clustering/services/sync/rpc.lua | 98 ++++++++++++------- .../services/sync/strategies/postgres.lua | 22 +++-- kong/db/dao/init.lua | 2 +- kong/db/declarative/export.lua | 7 +- kong/db/declarative/import.lua | 76 ++++++++++---- kong/db/migrations/core/024_370_to_380.lua | 4 +- kong/db/schema/others/declarative_config.lua | 2 +- kong/db/strategies/off/init.lua | 30 ++++-- .../migrations/core/024_370_to_380_spec.lua | 4 +- 10 files changed, 179 insertions(+), 94 deletions(-) diff --git a/kong/clustering/services/sync/hooks.lua b/kong/clustering/services/sync/hooks.lua index 7a3a1402558d..ae7bbbe90620 100644 --- a/kong/clustering/services/sync/hooks.lua +++ b/kong/clustering/services/sync/hooks.lua @@ -75,13 +75,19 @@ function _M:notify_all_nodes() end -function _M:entity_delta_writer(row, name, options, ws_id, is_delete) +function _M:entity_delta_writer(entity, name, options, ws_id, is_delete) + -- composite key, like { id = ... } + local schema = kong.db[name].schema + local pk = schema:extract_pk_values(entity) + + assert(schema:validate_primary_key(pk)) + local deltas = { { type = name, - id = row.id, + pk = pk, ws_id = ws_id, - row = is_delete and ngx_null or row, + entity = is_delete and ngx_null or entity, }, } @@ -99,7 +105,7 @@ function _M:entity_delta_writer(row, name, options, ws_id, is_delete) self:notify_all_nodes() - return row -- for other hooks + return entity -- for other hooks end @@ -131,21 +137,21 @@ function _M:register_dao_hooks() end end - local function post_hook_writer_func(row, name, options, ws_id) + local function post_hook_writer_func(entity, name, options, ws_id) if not is_db_export(name) then - return row + return entity end - return self:entity_delta_writer(row, name, options, ws_id) + return self:entity_delta_writer(entity, name, options, ws_id) end - local function post_hook_delete_func(row, name, options, ws_id, cascade_entries) + local function post_hook_delete_func(entity, name, options, ws_id, cascade_entries) if not is_db_export(name) then - return row + return entity end - -- set lmdb value to ngx_null then return row - return self:entity_delta_writer(row, name, options, ws_id, true) + -- set lmdb value to ngx_null then return entity + return self:entity_delta_writer(entity, name, options, ws_id, true) end local dao_hooks = { diff --git a/kong/clustering/services/sync/rpc.lua b/kong/clustering/services/sync/rpc.lua index 3d3ec5360890..2cefa4a33417 100644 --- a/kong/clustering/services/sync/rpc.lua +++ b/kong/clustering/services/sync/rpc.lua @@ -6,16 +6,17 @@ local txn = require("resty.lmdb.transaction") local declarative = require("kong.db.declarative") local constants = require("kong.constants") local concurrency = require("kong.concurrency") +local isempty = require("table.isempty") local insert_entity_for_txn = declarative.insert_entity_for_txn local delete_entity_for_txn = declarative.delete_entity_for_txn local DECLARATIVE_HASH_KEY = constants.DECLARATIVE_HASH_KEY +local DECLARATIVE_DEFAULT_WORKSPACE_KEY = constants.DECLARATIVE_DEFAULT_WORKSPACE_KEY local CLUSTERING_SYNC_STATUS = constants.CLUSTERING_SYNC_STATUS local SYNC_MUTEX_OPTS = { name = "get_delta", timeout = 0, } -local pairs = pairs local ipairs = ipairs local fmt = string.format local ngx_null = ngx.null @@ -39,16 +40,16 @@ end function _M:init_cp(manager) - -- CP - -- Method: kong.sync.v2.get_delta - -- Params: versions: list of current versions of the database - -- { { namespace = "default", version = 1000, }, } local purge_delay = manager.conf.cluster_data_plane_purge_delay local function gen_delta_result(res, wipe) return { default = { deltas = res, wipe = wipe, }, } end + -- CP + -- Method: kong.sync.v2.get_delta + -- Params: versions: list of current versions of the database + -- example: { default = { version = 1000, }, } manager.callbacks:register("kong.sync.v2.get_delta", function(node_id, current_versions) ngx_log(ngx_DEBUG, "[kong.sync.v2] config push (connected client)") @@ -57,19 +58,13 @@ function _M:init_cp(manager) rpc_peers = kong.rpc:get_peers() end - local default_namespace - for namespace, v in pairs(current_versions) do - if namespace == "default" then - default_namespace = v - break - end - end + local default_namespace = current_versions.default if not default_namespace then return nil, "default namespace does not exist inside params" end - -- { { namespace = "default", version = 1000, }, } + -- { default = { version = 1000, }, } local default_namespace_version = default_namespace.version -- XXX TODO: follow update_sync_status() in control_plane.lua @@ -117,7 +112,7 @@ function _M:init_cp(manager) return nil, err end - if #res == 0 then + if isempty(res) then ngx_log(ngx_DEBUG, "[kong.sync.v2] no delta for node_id: ", node_id, ", current_version: ", default_namespace_version, @@ -126,7 +121,7 @@ function _M:init_cp(manager) end -- some deltas are returned, are they contiguous? - if res[1].version == default_namespace.version + 1 then + if res[1].version == default_namespace_version + 1 then -- doesn't wipe dp lmdb, incremental sync return gen_delta_result(res, false) end @@ -155,7 +150,7 @@ function _M:init_dp(manager) -- DP -- Method: kong.sync.v2.notify_new_version -- Params: new_versions: list of namespaces and their new versions, like: - -- { { new_version = 1000, }, }, possible field: namespace = "default" + -- { default = { new_version = 1000, }, } manager.callbacks:register("kong.sync.v2.notify_new_version", function(node_id, new_versions) -- TODO: currently only default is supported, and anything else is ignored local default_new_version = new_versions.default @@ -199,24 +194,33 @@ local function do_sync() return true end - local ns_delta - - for namespace, delta in pairs(ns_deltas) do - if namespace == "default" then - ns_delta = delta - break -- should we break here? - end - end + -- ns_deltas should look like: + -- { default = { deltas = { ... }, wipe = true, }, } + local ns_delta = ns_deltas.default if not ns_delta then return nil, "default namespace does not exist inside params" end - if #ns_delta.deltas == 0 then + local deltas = ns_delta.deltas + + if isempty(deltas) then ngx_log(ngx_DEBUG, "no delta to sync") return true end + -- we should find the correct default workspace + -- and replace the old one with it + local default_ws_changed + for _, delta in ipairs(deltas) do + if delta.type == "workspaces" and delta.entity.name == "default" then + kong.default_workspace = delta.entity.id + default_ws_changed = true + break + end + end + assert(type(kong.default_workspace) == "string") + local t = txn.begin(512) local wipe = ns_delta.wipe @@ -227,18 +231,25 @@ local function do_sync() local db = kong.db local version = 0 + local opts = {} local crud_events = {} local crud_events_n = 0 - for _, delta in ipairs(ns_delta.deltas) do + -- delta should look like: + -- { type = ..., entity = { ... }, version = 1, ws_id = ..., } + for _, delta in ipairs(deltas) do local delta_type = delta.type - local delta_row = delta.row + local delta_entity = delta.entity local ev - if delta_row ~= ngx_null then + -- delta must have ws_id to generate the correct lmdb key + -- set the correct workspace for item + opts.workspace = assert(delta.ws_id) + + if delta_entity ~= nil and delta_entity ~= ngx_null then -- upsert the entity -- does the entity already exists? - local old_entity, err = db[delta_type]:select(delta_row) + local old_entity, err = db[delta_type]:select(delta_entity) if err then return nil, err end @@ -247,29 +258,29 @@ local function do_sync() -- If we will wipe lmdb, we don't need to delete it from lmdb. if old_entity and not wipe then - local res, err = delete_entity_for_txn(t, delta_type, old_entity, nil) + local res, err = delete_entity_for_txn(t, delta_type, old_entity, opts) if not res then return nil, err end end - local res, err = insert_entity_for_txn(t, delta_type, delta_row, nil) + local res, err = insert_entity_for_txn(t, delta_type, delta_entity, opts) if not res then return nil, err end - ev = { delta_type, crud_event_type, delta_row, old_entity, } + ev = { delta_type, crud_event_type, delta_entity, old_entity, } else -- delete the entity - local old_entity, err = kong.db[delta_type]:select({ id = delta.id, }) -- TODO: composite key + local old_entity, err = kong.db[delta_type]:select(delta.pk) -- composite key if err then return nil, err end -- If we will wipe lmdb, we don't need to delete it from lmdb. if old_entity and not wipe then - local res, err = delete_entity_for_txn(t, delta_type, old_entity, nil) + local res, err = delete_entity_for_txn(t, delta_type, old_entity, opts) if not res then return nil, err end @@ -281,13 +292,22 @@ local function do_sync() crud_events_n = crud_events_n + 1 crud_events[crud_events_n] = ev - -- XXX TODO: could delta.version be nil or ngx.null - if type(delta.version) == "number" and delta.version ~= version then + -- delta.version should not be nil or ngx.null + assert(type(delta.version) == "number") + + if delta.version ~= version then version = delta.version end end -- for _, delta + -- store current sync version t:set(DECLARATIVE_HASH_KEY, fmt("%032d", version)) + + -- store the correct default workspace uuid + if default_ws_changed then + t:set(DECLARATIVE_DEFAULT_WORKSPACE_KEY, kong.default_workspace) + end + local ok, err = t:commit() if not ok then return nil, err @@ -299,7 +319,7 @@ local function do_sync() else for _, event in ipairs(crud_events) do - -- delta_type, crud_event_type, delta.row, old_entity + -- delta_type, crud_event_type, delta.entity, old_entity db[event[1]]:post_crud_event(event[2], event[3], event[4]) end end @@ -314,7 +334,9 @@ local function sync_handler(premature) end local res, err = concurrency.with_worker_mutex(SYNC_MUTEX_OPTS, function() - -- here must be 2 times + -- `do_sync()` is run twice in a row to report back new version number + -- to CP quickly after sync. (`kong.sync.v2.get_delta` is used for both pulling delta + -- as well as status reporting) for _ = 1, 2 do local ok, err = do_sync() if not ok then diff --git a/kong/clustering/services/sync/strategies/postgres.lua b/kong/clustering/services/sync/strategies/postgres.lua index 39c550b8ffb0..bbc397d773cc 100644 --- a/kong/clustering/services/sync/strategies/postgres.lua +++ b/kong/clustering/services/sync/strategies/postgres.lua @@ -8,6 +8,7 @@ local buffer = require("string.buffer") local string_format = string.format local cjson_encode = cjson.encode +local ngx_null = ngx.null local ngx_log = ngx.log local ngx_ERR = ngx.ERR local ngx_DEBUG = ngx.DEBUG @@ -66,23 +67,23 @@ local NEW_VERSION_QUERY = [[ new_version integer; BEGIN INSERT INTO clustering_sync_version DEFAULT VALUES RETURNING version INTO new_version; - INSERT INTO clustering_sync_delta (version, type, id, ws_id, row) VALUES %s; + INSERT INTO clustering_sync_delta (version, type, pk, ws_id, entity) VALUES %s; END $$; ]] -- deltas: { --- { type = "service", "id" = "d78eb00f-8702-4d6a-bfd9-e005f904ae3e", "ws_id" = "73478cf6-964f-412d-b1c4-8ac88d9e85e9", row = "JSON", } --- { type = "route", "id" = "0a5bac5c-b795-4981-95d2-919ba3390b7e", "ws_id" = "73478cf6-964f-412d-b1c4-8ac88d9e85e9", row = "JSON", } +-- { type = "service", "pk" = { id = "d78eb00f..." }, "ws_id" = "73478cf6...", entity = "JSON", } +-- { type = "route", "pk" = { id = "0a5bac5c..." }, "ws_id" = "73478cf6...", entity = "JSON", } -- } function _M:insert_delta(deltas) local buf = buffer.new() for _, d in ipairs(deltas) do buf:putf("(new_version, %s, %s, %s, %s)", self.connector:escape_literal(d.type), - self.connector:escape_literal(d.id), - self.connector:escape_literal(d.ws_id), - self.connector:escape_literal(cjson_encode(d.row))) + self.connector:escape_literal(cjson_encode(d.pk)), + self.connector:escape_literal(d.ws_id or kong.default_workspace), + self.connector:escape_literal(cjson_encode(d.entity))) end local sql = string_format(NEW_VERSION_QUERY, buf:get()) @@ -92,14 +93,19 @@ end function _M:get_latest_version() - local sql = "SELECT MAX(version) AS max_version FROM clustering_sync_version" + local sql = "SELECT MAX(version) FROM clustering_sync_version" local res, err = self.connector:query(sql) if not res then return nil, err end - return res[1] and res[1].max_version + local ver = res[1] and res[1].max + if ver == ngx_null then + return 0 + end + + return ver end diff --git a/kong/db/dao/init.lua b/kong/db/dao/init.lua index 4305be4a96f9..a20cb6813e7c 100644 --- a/kong/db/dao/init.lua +++ b/kong/db/dao/init.lua @@ -1555,7 +1555,7 @@ function DAO:cache_key(key, arg2, arg3, arg4, arg5, ws_id) error("key must be a string or an entity table", 2) end - if key.ws_id ~= nil and key.ws_id ~= null then + if key.ws_id ~= nil and key.ws_id ~= null and schema.workspaceable then ws_id = key.ws_id end diff --git a/kong/db/declarative/export.lua b/kong/db/declarative/export.lua index 6c1c66ede7fd..e20d3c1d8469 100644 --- a/kong/db/declarative/export.lua +++ b/kong/db/declarative/export.lua @@ -124,7 +124,11 @@ local function export_from_db_impl(emitter, skip_ws, skip_disabled_entities, exp return nil, err end + -- it will be ngx.null when the table clustering_sync_version is empty sync_version = assert(ok[1].max) + if sync_version == null then + sync_version = 0 + end end emitter:emit_toplevel({ @@ -359,7 +363,8 @@ local sync_emitter = { emit_entity = function(self, entity_name, entity_data) self.out_n = self.out_n + 1 - self.out[self.out_n] = { type = entity_name , row = entity_data, version = self.sync_version, } + self.out[self.out_n] = { type = entity_name , entity = entity_data, version = self.sync_version, + ws_id = kong.default_workspace, } end, done = function(self) diff --git a/kong/db/declarative/import.lua b/kong/db/declarative/import.lua index ea8f23a546d5..c083fd86d846 100644 --- a/kong/db/declarative/import.lua +++ b/kong/db/declarative/import.lua @@ -30,14 +30,31 @@ local DECLARATIVE_EMPTY_CONFIG_HASH = constants.DECLARATIVE_EMPTY_CONFIG_HASH local DECLARATIVE_DEFAULT_WORKSPACE_KEY = constants.DECLARATIVE_DEFAULT_WORKSPACE_KEY +local GLOBAL_WORKSPACE_TAG = "*" +local UNINIT_WORKSPACE_ID = "00000000-0000-0000-0000-000000000000" + + +local function get_default_workspace() + -- in init phase we can not access lmdb + if kong.default_workspace == UNINIT_WORKSPACE_ID and + get_phase() ~= "init" + then + local res = kong.db.workspaces:select_by_name("default") + kong.default_workspace = assert(res and res.id) + end + + return kong.default_workspace +end + + -- Generates the appropriate workspace ID for current operating context -- depends on schema settings -- -- Non-workspaceable entities are always placed under the "default" -- workspace -- --- If the query explicitly set options.workspace == null, then "default" --- workspace shall be used +-- If the query explicitly set options.workspace == null, then all +-- workspaces shall be used -- -- If the query explicitly set options.workspace == "some UUID", then -- it will be returned @@ -45,7 +62,7 @@ local DECLARATIVE_DEFAULT_WORKSPACE_KEY = constants.DECLARATIVE_DEFAULT_WORKSPAC -- Otherwise, the current workspace ID will be returned local function workspace_id(schema, options) if not schema.workspaceable then - return kong.default_workspace + return get_default_workspace() end -- options.workspace does not exist @@ -53,10 +70,8 @@ local function workspace_id(schema, options) return get_workspace_id() end - -- options.workspace == null must be handled by caller by querying - -- all available workspaces one by one if options.workspace == null then - return kong.default_workspace + return GLOBAL_WORKSPACE_TAG end -- options.workspace is a UUID @@ -264,7 +279,10 @@ local function _set_entity_for_txn(t, entity_name, item, options, is_delete) local dao = kong.db[entity_name] local schema = dao.schema local pk = pk_string(schema, item) - local ws_id = workspace_id(schema, options) + + -- If the item belongs to a specific workspace, + -- use it directly without using the default one. + local ws_id = item.ws_id or workspace_id(schema, options) local itm_key = item_key(entity_name, ws_id, pk) @@ -289,11 +307,17 @@ local function _set_entity_for_txn(t, entity_name, item, options, is_delete) -- store serialized entity into lmdb t:set(itm_key, itm_value) + -- for global query + local global_key = item_key(entity_name, GLOBAL_WORKSPACE_TAG, pk) + t:set(global_key, idx_value) + -- select_by_cache_key if schema.cache_key then local cache_key = dao:cache_key(item) - local key = unique_field_key(entity_name, ws_id, "cache_key", cache_key) - + -- The second parameter (ws_id) is a placeholder here, because the cache_key + -- is already unique globally. + local key = unique_field_key(entity_name, get_default_workspace(), + "cache_key", cache_key) -- store item_key or nil into lmdb t:set(key, idx_value) end @@ -302,6 +326,8 @@ local function _set_entity_for_txn(t, entity_name, item, options, is_delete) local is_foreign = fdata.type == "foreign" local fdata_reference = fdata.reference local value = item[fname] + -- avoid overriding the outer ws_id + local field_ws_id = fdata.unique_across_ws and kong.default_workspace or ws_id -- value may be null, we should skip it if not value or value == null then @@ -321,14 +347,12 @@ local function _set_entity_for_txn(t, entity_name, item, options, is_delete) value_str = pk_string(kong.db[fdata_reference].schema, value) end - if fdata.unique_across_ws then - ws_id = kong.default_workspace - end - - local key = unique_field_key(entity_name, ws_id, fname, value_str or value) + for _, wid in ipairs {field_ws_id, GLOBAL_WORKSPACE_TAG} do + local key = unique_field_key(entity_name, wid, fname, value_str or value) - -- store item_key or nil into lmdb - t:set(key, idx_value) + -- store item_key or nil into lmdb + t:set(key, idx_value) + end end if is_foreign then @@ -337,10 +361,12 @@ local function _set_entity_for_txn(t, entity_name, item, options, is_delete) value_str = pk_string(kong.db[fdata_reference].schema, value) - local key = foreign_field_key(entity_name, ws_id, fname, value_str, pk) + for _, wid in ipairs {field_ws_id, GLOBAL_WORKSPACE_TAG} do + local key = foreign_field_key(entity_name, wid, fname, value_str, pk) - -- store item_key or nil into lmdb - t:set(key, idx_value) + -- store item_key or nil into lmdb + t:set(key, idx_value) + end end ::continue:: @@ -354,10 +380,18 @@ end -- the provided LMDB txn object, this operation is only safe -- is the entity does not already exist inside the LMDB database -- +-- The actual item key is: ||*| +-- -- This function sets the following: +-- -- * ||*| => serialized item --- * |||sha256(field_value) => ||*| --- * |||| -> ||*| +-- * |*|*| => actual item key +-- +-- * |||sha256(field_value) => actual item key +-- * |*||sha256(field_value) => actual item key +-- +-- * |||| => actual item key +-- * |*||| => actual item key -- -- DO NOT touch `item`, or else the entity will be changed local function insert_entity_for_txn(t, entity_name, item, options) diff --git a/kong/db/migrations/core/024_370_to_380.lua b/kong/db/migrations/core/024_370_to_380.lua index 9d78807962cf..b433500f7edc 100644 --- a/kong/db/migrations/core/024_370_to_380.lua +++ b/kong/db/migrations/core/024_370_to_380.lua @@ -9,9 +9,9 @@ return { CREATE TABLE IF NOT EXISTS clustering_sync_delta ( "version" INT NOT NULL, "type" TEXT NOT NULL, - "id" UUID NOT NULL, + "pk" JSON NOT NULL, "ws_id" UUID NOT NULL, - "row" JSON, + "entity" JSON, FOREIGN KEY (version) REFERENCES clustering_sync_version(version) ON DELETE CASCADE ); CREATE INDEX IF NOT EXISTS clustering_sync_delta_version_idx ON clustering_sync_delta (version); diff --git a/kong/db/schema/others/declarative_config.lua b/kong/db/schema/others/declarative_config.lua index 6d7c47e4d50e..42b165f7e7ff 100644 --- a/kong/db/schema/others/declarative_config.lua +++ b/kong/db/schema/others/declarative_config.lua @@ -59,7 +59,7 @@ do CACHED_OUT = request_aware_table.new() end - CACHED_OUT.clear() + CACHED_OUT:clear() for i = 1, count do local k = primary_key[i] insert(CACHED_OUT, tostring(object[k])) diff --git a/kong/db/strategies/off/init.lua b/kong/db/strategies/off/init.lua index 1ab27cddf56e..a80772224f27 100644 --- a/kong/db/strategies/off/init.lua +++ b/kong/db/strategies/off/init.lua @@ -38,6 +38,11 @@ _mt.__index = _mt local UNINIT_WORKSPACE_ID = "00000000-0000-0000-0000-000000000000" +local function need_follow(ws_id) + return ws_id == "*" +end + + local function get_default_workspace() if kong.default_workspace == UNINIT_WORKSPACE_ID then local res = kong.db.workspaces:select_by_name("default") @@ -107,11 +112,18 @@ local function select_by_key(schema, key, follow) end +local LMDB_MIN_PAGE_SIZE = 2 + + local function page_for_prefix(self, prefix, size, offset, options, follow) if not size then size = self.connector:get_page_size(options) end + -- LMDB 'page_size' can not be less than 2 + -- see: https://github.com/Kong/lua-resty-lmdb?tab=readme-ov-file#page + size = math.max(size, LMDB_MIN_PAGE_SIZE) + offset = offset or prefix local res, err_or_more = lmdb_prefix.page(offset, prefix, nil, size) @@ -166,7 +178,7 @@ local function page(self, size, offset, options) offset = token end - return page_for_prefix(self, prefix, size, offset, options, false) + return page_for_prefix(self, prefix, size, offset, options, need_follow(ws_id)) end @@ -176,7 +188,7 @@ local function select(self, pk, options) local ws_id = workspace_id(schema, options) local pk = pk_string(schema, pk) local key = item_key(schema.name, ws_id, pk) - return select_by_key(schema, key, false) + return select_by_key(schema, key, need_follow(ws_id)) end @@ -196,18 +208,18 @@ local function select_by_field(self, field, value, options) _, value = next(value) end - local ws_id = workspace_id(schema, options) + local schema_field = schema.fields[field] + local unique_across_ws = schema_field and schema_field.unique_across_ws - local key - local unique_across_ws = schema.fields[field].unique_across_ws -- only accept global query by field if field is unique across workspaces assert(not options or options.workspace ~= null or unique_across_ws) - if unique_across_ws then - ws_id = get_default_workspace() - end + -- align with cache_key insertion logic in _set_entity_for_txn + local ws_id = (unique_across_ws or field == "cache_key") and + get_default_workspace() or + workspace_id(schema, options) - key = unique_field_key(schema.name, ws_id, field, value) + local key = unique_field_key(schema.name, ws_id, field, value) return select_by_key(schema, key, true) end diff --git a/spec/05-migration/db/migrations/core/024_370_to_380_spec.lua b/spec/05-migration/db/migrations/core/024_370_to_380_spec.lua index 0563c4c83f80..cf0f04513c68 100644 --- a/spec/05-migration/db/migrations/core/024_370_to_380_spec.lua +++ b/spec/05-migration/db/migrations/core/024_370_to_380_spec.lua @@ -10,8 +10,8 @@ describe("database migration", function() assert.database_has_relation("clustering_sync_delta") assert.table_has_column("clustering_sync_delta", "version", "integer") assert.table_has_column("clustering_sync_delta", "type", "text") - assert.table_has_column("clustering_sync_delta", "id", "uuid") + assert.table_has_column("clustering_sync_delta", "pk", "json") assert.table_has_column("clustering_sync_delta", "ws_id", "uuid") - assert.table_has_column("clustering_sync_delta", "row", "json") + assert.table_has_column("clustering_sync_delta", "entity", "json") end) end)