diff --git a/kong/clustering/services/sync/hooks.lua b/kong/clustering/services/sync/hooks.lua index a9368f7550612..ddc36d6ccfaab 100644 --- a/kong/clustering/services/sync/hooks.lua +++ b/kong/clustering/services/sync/hooks.lua @@ -7,7 +7,6 @@ local EMPTY = require("kong.tools.table").EMPTY local ipairs = ipairs -local ngx_null = ngx.null local ngx_log = ngx.log local ngx_ERR = ngx.ERR local ngx_DEBUG = ngx.DEBUG @@ -74,29 +73,8 @@ function _M:notify_all_nodes() end -local function gen_delta(entity, name, options, ws_id, is_delete) - -- composite key, like { id = ... } - local schema = kong.db[name].schema - local pk = schema:extract_pk_values(entity) - - assert(schema:validate_primary_key(pk)) - - local delta = { - type = name, - pk = pk, - ws_id = ws_id, - entity = is_delete and ngx_null or entity, - } - - return delta -end - - function _M:entity_delta_writer(entity, name, options, ws_id, is_delete) - local d = gen_delta(entity, name, options, ws_id, is_delete) - local deltas = { d, } - - local res, err = self.strategy:insert_delta(deltas) + local res, err = self.strategy:insert_delta() if not res then self.strategy:cancel_txn() return nil, err @@ -164,39 +142,7 @@ function _M:register_dao_hooks() ngx_log(ngx_DEBUG, "[kong.sync.v2] new delta due to deleting ", name) - -- set lmdb value to ngx_null then return entity - - local d = gen_delta(entity, name, options, ws_id, true) - local deltas = { d, } - - -- delete other related entities - for i, item in ipairs(cascade_entries or EMPTY) do - local e = item.entity - local name = item.dao.schema.name - - ngx_log(ngx_DEBUG, "[kong.sync.v2] new delta due to cascade deleting ", name) - - d = gen_delta(e, name, options, e.ws_id, true) - - -- #1 item is initial entity - deltas[i + 1] = d - end - - local res, err = self.strategy:insert_delta(deltas) - if not res then - self.strategy:cancel_txn() - return nil, err - end - - res, err = self.strategy:commit_txn() - if not res then - self.strategy:cancel_txn() - return nil, err - end - - self:notify_all_nodes() - - return entity -- for other hooks + return self:entity_delta_writer(entity, name, options, ws_id) end local dao_hooks = { diff --git a/kong/clustering/services/sync/rpc.lua b/kong/clustering/services/sync/rpc.lua index 258cd2a388686..3eb9bd0eae56d 100644 --- a/kong/clustering/services/sync/rpc.lua +++ b/kong/clustering/services/sync/rpc.lua @@ -26,14 +26,9 @@ local fmt = string.format local ngx_null = ngx.null local ngx_log = ngx.log local ngx_ERR = ngx.ERR -local ngx_INFO = ngx.INFO local ngx_DEBUG = ngx.DEBUG --- number of versions behind before a full sync is forced -local DEFAULT_FULL_SYNC_THRESHOLD = 512 - - function _M.new(strategy) local self = { strategy = strategy, @@ -62,10 +57,6 @@ end function _M:init_cp(manager) local purge_delay = manager.conf.cluster_data_plane_purge_delay - -- number of versions behind before a full sync is forced - local FULL_SYNC_THRESHOLD = manager.conf.cluster_full_sync_threshold or - DEFAULT_FULL_SYNC_THRESHOLD - -- CP -- Method: kong.sync.v2.get_delta -- Params: versions: list of current versions of the database diff --git a/kong/clustering/services/sync/strategies/postgres.lua b/kong/clustering/services/sync/strategies/postgres.lua index 1ef758e2c1d05..7bc0784c6e61e 100644 --- a/kong/clustering/services/sync/strategies/postgres.lua +++ b/kong/clustering/services/sync/strategies/postgres.lua @@ -2,19 +2,7 @@ local _M = {} local _MT = { __index = _M } -local cjson = require("cjson.safe") -local buffer = require("string.buffer") - - -local string_format = string.format -local cjson_encode = cjson.encode local ngx_null = ngx.null -local ngx_log = ngx.log -local ngx_ERR = ngx.ERR - - -local KEEP_VERSION_COUNT = 100 -local CLEANUP_TIME_DELAY = 3600 -- 1 hour function _M.new(db) @@ -26,32 +14,8 @@ function _M.new(db) end -local PURGE_QUERY = [[ - DELETE FROM clustering_sync_version - WHERE "version" < ( - SELECT MAX("version") - %d - FROM clustering_sync_version - ); -]] - - +-- reserved for future function _M:init_worker() - local function cleanup_handler(premature) - if premature then - return - end - - local res, err = self.connector:query(string_format(PURGE_QUERY, KEEP_VERSION_COUNT)) - if not res then - ngx_log(ngx_ERR, - "[incremental] unable to purge old data from incremental delta table, err: ", - err) - - return - end - end - - assert(ngx.timer.every(CLEANUP_TIME_DELAY, cleanup_handler)) end @@ -61,37 +25,12 @@ local NEW_VERSION_QUERY = [[ new_version integer; BEGIN INSERT INTO clustering_sync_version DEFAULT VALUES RETURNING version INTO new_version; - INSERT INTO clustering_sync_delta (version, type, pk, ws_id, entity) VALUES %s; END $$; ]] --- deltas: { --- { type = "service", "pk" = { id = "d78eb00f..." }, "ws_id" = "73478cf6...", entity = "JSON", } --- { type = "route", "pk" = { id = "0a5bac5c..." }, "ws_id" = "73478cf6...", entity = "JSON", } --- } -function _M:insert_delta(deltas) - local buf = buffer.new() - - local count = #deltas - for i = 1, count do - local d = deltas[i] - - buf:putf("(new_version, %s, %s, %s, %s)", - self.connector:escape_literal(d.type), - self.connector:escape_literal(cjson_encode(d.pk)), - self.connector:escape_literal(d.ws_id or kong.default_workspace), - self.connector:escape_literal(cjson_encode(d.entity))) - - -- sql values should be separated by comma - if i < count then - buf:put(",") - end - end - - local sql = string_format(NEW_VERSION_QUERY, buf:get()) - - return self.connector:query(sql) +function _M:insert_delta() + return self.connector:query(NEW_VERSION_QUERY) end @@ -112,14 +51,6 @@ function _M:get_latest_version() end -function _M:get_delta(version) - local sql = "SELECT * FROM clustering_sync_delta" .. - " WHERE version > " .. self.connector:escape_literal(version) .. - " ORDER BY version ASC" - return self.connector:query(sql) -end - - function _M:begin_txn() return self.connector:query("BEGIN;") end