Skip to content

Commit

Permalink
refactor(clustering): change sync data structure (#13794)
Browse files Browse the repository at this point in the history
* rename db id to pk

* change id to pk in Lua

* change pk to json

* schema:extract_pk_values
  • Loading branch information
chronolaw authored Oct 28, 2024
1 parent c625a60 commit b075641
Show file tree
Hide file tree
Showing 5 changed files with 14 additions and 8 deletions.
8 changes: 7 additions & 1 deletion kong/clustering/services/sync/hooks.lua
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,16 @@ end


function _M:entity_delta_writer(row, name, options, ws_id, is_delete)
-- composite key, like { id = ... }
local schema = kong.db[name].schema
local pk = schema:extract_pk_values(row)

assert(schema:validate_primary_key(pk))

local deltas = {
{
type = name,
id = row.id,
pk = pk,
ws_id = ws_id,
row = is_delete and ngx_null or row,
},
Expand Down
2 changes: 1 addition & 1 deletion kong/clustering/services/sync/rpc.lua
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ local function do_sync()

else
-- delete the entity
local old_entity, err = kong.db[delta_type]:select({ id = delta.id, }) -- TODO: composite key
local old_entity, err = kong.db[delta_type]:select(delta.pk) -- composite key
if err then
return nil, err
end
Expand Down
8 changes: 4 additions & 4 deletions kong/clustering/services/sync/strategies/postgres.lua
Original file line number Diff line number Diff line change
Expand Up @@ -67,21 +67,21 @@ local NEW_VERSION_QUERY = [[
new_version integer;
BEGIN
INSERT INTO clustering_sync_version DEFAULT VALUES RETURNING version INTO new_version;
INSERT INTO clustering_sync_delta (version, type, id, ws_id, row) VALUES %s;
INSERT INTO clustering_sync_delta (version, type, pk, ws_id, row) VALUES %s;
END $$;
]]


-- deltas: {
-- { type = "service", "id" = "d78eb00f-8702-4d6a-bfd9-e005f904ae3e", "ws_id" = "73478cf6-964f-412d-b1c4-8ac88d9e85e9", row = "JSON", }
-- { type = "route", "id" = "0a5bac5c-b795-4981-95d2-919ba3390b7e", "ws_id" = "73478cf6-964f-412d-b1c4-8ac88d9e85e9", row = "JSON", }
-- { type = "service", "pk" = { id = "d78eb00f..." }, "ws_id" = "73478cf6...", row = "JSON", }
-- { type = "route", "pk" = { id = "0a5bac5c..." }, "ws_id" = "73478cf6...", row = "JSON", }
-- }
function _M:insert_delta(deltas)
local buf = buffer.new()
for _, d in ipairs(deltas) do
buf:putf("(new_version, %s, %s, %s, %s)",
self.connector:escape_literal(d.type),
self.connector:escape_literal(d.id),
self.connector:escape_literal(cjson_encode(d.pk)),
self.connector:escape_literal(d.ws_id or kong.default_workspace),
self.connector:escape_literal(cjson_encode(d.row)))
end
Expand Down
2 changes: 1 addition & 1 deletion kong/db/migrations/core/024_370_to_380.lua
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ return {
CREATE TABLE IF NOT EXISTS clustering_sync_delta (
"version" INT NOT NULL,
"type" TEXT NOT NULL,
"id" UUID NOT NULL,
"pk" JSON NOT NULL,
"ws_id" UUID NOT NULL,
"row" JSON,
FOREIGN KEY (version) REFERENCES clustering_sync_version(version) ON DELETE CASCADE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ describe("database migration", function()
assert.database_has_relation("clustering_sync_delta")
assert.table_has_column("clustering_sync_delta", "version", "integer")
assert.table_has_column("clustering_sync_delta", "type", "text")
assert.table_has_column("clustering_sync_delta", "id", "uuid")
assert.table_has_column("clustering_sync_delta", "pk", "json")
assert.table_has_column("clustering_sync_delta", "ws_id", "uuid")
assert.table_has_column("clustering_sync_delta", "row", "json")
end)
Expand Down

0 comments on commit b075641

Please sign in to comment.