Skip to content

Commit

Permalink
code clean
Browse files Browse the repository at this point in the history
  • Loading branch information
chronolaw committed Dec 17, 2024
1 parent 195311b commit 6a31894
Show file tree
Hide file tree
Showing 3 changed files with 5 additions and 137 deletions.
58 changes: 2 additions & 56 deletions kong/clustering/services/sync/hooks.lua
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ local EMPTY = require("kong.tools.table").EMPTY


local ipairs = ipairs
local ngx_null = ngx.null
local ngx_log = ngx.log
local ngx_ERR = ngx.ERR
local ngx_DEBUG = ngx.DEBUG
Expand Down Expand Up @@ -74,29 +73,8 @@ function _M:notify_all_nodes()
end


local function gen_delta(entity, name, options, ws_id, is_delete)
-- composite key, like { id = ... }
local schema = kong.db[name].schema
local pk = schema:extract_pk_values(entity)

assert(schema:validate_primary_key(pk))

local delta = {
type = name,
pk = pk,
ws_id = ws_id,
entity = is_delete and ngx_null or entity,
}

return delta
end


function _M:entity_delta_writer(entity, name, options, ws_id, is_delete)
local d = gen_delta(entity, name, options, ws_id, is_delete)
local deltas = { d, }

local res, err = self.strategy:insert_delta(deltas)
local res, err = self.strategy:insert_delta()
if not res then
self.strategy:cancel_txn()
return nil, err
Expand Down Expand Up @@ -164,39 +142,7 @@ function _M:register_dao_hooks()

ngx_log(ngx_DEBUG, "[kong.sync.v2] new delta due to deleting ", name)

-- set lmdb value to ngx_null then return entity

local d = gen_delta(entity, name, options, ws_id, true)
local deltas = { d, }

-- delete other related entities
for i, item in ipairs(cascade_entries or EMPTY) do
local e = item.entity
local name = item.dao.schema.name

ngx_log(ngx_DEBUG, "[kong.sync.v2] new delta due to cascade deleting ", name)

d = gen_delta(e, name, options, e.ws_id, true)

-- #1 item is initial entity
deltas[i + 1] = d
end

local res, err = self.strategy:insert_delta(deltas)
if not res then
self.strategy:cancel_txn()
return nil, err
end

res, err = self.strategy:commit_txn()
if not res then
self.strategy:cancel_txn()
return nil, err
end

self:notify_all_nodes()

return entity -- for other hooks
return self:entity_delta_writer(entity, name, options, ws_id)
end

local dao_hooks = {
Expand Down
9 changes: 0 additions & 9 deletions kong/clustering/services/sync/rpc.lua
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,9 @@ local fmt = string.format
local ngx_null = ngx.null
local ngx_log = ngx.log
local ngx_ERR = ngx.ERR
local ngx_INFO = ngx.INFO
local ngx_DEBUG = ngx.DEBUG


-- number of versions behind before a full sync is forced
local DEFAULT_FULL_SYNC_THRESHOLD = 512


function _M.new(strategy)
local self = {
strategy = strategy,
Expand Down Expand Up @@ -62,10 +57,6 @@ end
function _M:init_cp(manager)
local purge_delay = manager.conf.cluster_data_plane_purge_delay

-- number of versions behind before a full sync is forced
local FULL_SYNC_THRESHOLD = manager.conf.cluster_full_sync_threshold or
DEFAULT_FULL_SYNC_THRESHOLD

-- CP
-- Method: kong.sync.v2.get_delta
-- Params: versions: list of current versions of the database
Expand Down
75 changes: 3 additions & 72 deletions kong/clustering/services/sync/strategies/postgres.lua
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,7 @@ local _M = {}
local _MT = { __index = _M }


local cjson = require("cjson.safe")
local buffer = require("string.buffer")


local string_format = string.format
local cjson_encode = cjson.encode
local ngx_null = ngx.null
local ngx_log = ngx.log
local ngx_ERR = ngx.ERR


local KEEP_VERSION_COUNT = 100
local CLEANUP_TIME_DELAY = 3600 -- 1 hour


function _M.new(db)
Expand All @@ -26,32 +14,8 @@ function _M.new(db)
end


local PURGE_QUERY = [[
DELETE FROM clustering_sync_version
WHERE "version" < (
SELECT MAX("version") - %d
FROM clustering_sync_version
);
]]


-- reserved for future
function _M:init_worker()
local function cleanup_handler(premature)
if premature then
return
end

local res, err = self.connector:query(string_format(PURGE_QUERY, KEEP_VERSION_COUNT))
if not res then
ngx_log(ngx_ERR,
"[incremental] unable to purge old data from incremental delta table, err: ",
err)

return
end
end

assert(ngx.timer.every(CLEANUP_TIME_DELAY, cleanup_handler))
end


Expand All @@ -61,37 +25,12 @@ local NEW_VERSION_QUERY = [[
new_version integer;
BEGIN
INSERT INTO clustering_sync_version DEFAULT VALUES RETURNING version INTO new_version;
INSERT INTO clustering_sync_delta (version, type, pk, ws_id, entity) VALUES %s;
END $$;
]]


-- deltas: {
-- { type = "service", "pk" = { id = "d78eb00f..." }, "ws_id" = "73478cf6...", entity = "JSON", }
-- { type = "route", "pk" = { id = "0a5bac5c..." }, "ws_id" = "73478cf6...", entity = "JSON", }
-- }
function _M:insert_delta(deltas)
local buf = buffer.new()

local count = #deltas
for i = 1, count do
local d = deltas[i]

buf:putf("(new_version, %s, %s, %s, %s)",
self.connector:escape_literal(d.type),
self.connector:escape_literal(cjson_encode(d.pk)),
self.connector:escape_literal(d.ws_id or kong.default_workspace),
self.connector:escape_literal(cjson_encode(d.entity)))

-- sql values should be separated by comma
if i < count then
buf:put(",")
end
end

local sql = string_format(NEW_VERSION_QUERY, buf:get())

return self.connector:query(sql)
function _M:insert_delta()
return self.connector:query(NEW_VERSION_QUERY)
end


Expand All @@ -112,14 +51,6 @@ function _M:get_latest_version()
end


function _M:get_delta(version)
local sql = "SELECT * FROM clustering_sync_delta" ..
" WHERE version > " .. self.connector:escape_literal(version) ..
" ORDER BY version ASC"
return self.connector:query(sql)
end


function _M:begin_txn()
return self.connector:query("BEGIN;")
end
Expand Down

0 comments on commit 6a31894

Please sign in to comment.