diff --git a/kong/clustering/services/sync/rpc.lua b/kong/clustering/services/sync/rpc.lua index 4100afbb96756..2bea133613a7a 100644 --- a/kong/clustering/services/sync/rpc.lua +++ b/kong/clustering/services/sync/rpc.lua @@ -8,13 +8,19 @@ local constants = require("kong.constants") local concurrency = require("kong.concurrency") local isempty = require("table.isempty") local events = require("kong.runloop.events") +local lrucache = require("resty.lrucache") +local resumable_chunker = require("kong.db.resumable_chunker") +local clustering_utils = require("kong.clustering.utils") +local EMPTY = require("kong.tools.table").EMPTY local insert_entity_for_txn = declarative.insert_entity_for_txn local delete_entity_for_txn = declarative.delete_entity_for_txn local DECLARATIVE_HASH_KEY = constants.DECLARATIVE_HASH_KEY local CLUSTERING_DATA_PLANES_LATEST_VERSION_KEY = constants.CLUSTERING_DATA_PLANES_LATEST_VERSION_KEY +local CLUSTERING_DATA_PLANES_PAGED_FULL_SYNC_NEXT_TOKEN_KEY = constants.CLUSTERING_DATA_PLANES_PAGED_FULL_SYNC_NEXT_TOKEN_KEY local DECLARATIVE_DEFAULT_WORKSPACE_KEY = constants.DECLARATIVE_DEFAULT_WORKSPACE_KEY +local DECLARATIVE_EMPTY_CONFIG_HASH = constants.DECLARATIVE_EMPTY_CONFIG_HASH local CLUSTERING_SYNC_STATUS = constants.CLUSTERING_SYNC_STATUS local SYNC_MUTEX_OPTS = { name = "get_delta", timeout = 0, } local MAX_RETRY = 5 @@ -27,16 +33,23 @@ local ngx_null = ngx.null local ngx_log = ngx.log local ngx_ERR = ngx.ERR local ngx_INFO = ngx.INFO +local ngx_NOTICE = ngx.NOTICE local ngx_DEBUG = ngx.DEBUG +local json_encode = clustering_utils.json_encode +local json_decode = clustering_utils.json_decode + -- number of versions behind before a full sync is forced local DEFAULT_FULL_SYNC_THRESHOLD = 512 -function _M.new(strategy) +function _M.new(strategy, opts) + opts = opts or EMPTY + local self = { strategy = strategy, + page_size = opts.page_size, } return setmetatable(self, _MT) @@ -48,17 +61,129 @@ local function inc_sync_result(res) end -local function full_sync_result() +local function paged_full_sync_payload(page, next_token) + return { + default = { + full_sync = true, + deltas = page, + next_token = next_token and assert(json_encode(next_token)), + }, + } +end + + +local function lagacy_full_sync() local deltas, err = declarative.export_config_sync() if not deltas then return nil, err end - -- wipe dp lmdb, full sync return { default = { deltas = deltas, wipe = true, }, } end +local function page_to_deltas(page) + local deltas = {} + for i, entity in ipairs(page) do + local typ = entity.__type + entity.__type = nil + local delta = { + type = typ, + entity = entity, + version = 0, -- pin to the 0 to let DP report itself as not ready + ws_id = kong.default_workspace, + } + + deltas[i] = delta + end + + return deltas +end + + +local function full_sync(self, workspace) + local pageable = workspace.pageable + local next_token = workspace.next_token + + if not pageable then + if next_token then + -- how do I emit a client error? + return nil, "next_token is set for none pageable DP" + end + + return lagacy_full_sync() + end + + local offset, begin_version, end_version + if next_token then + local err + next_token, err = json_decode(next_token) + if not next_token then + return nil, "invalid next_token" + end + + offset, begin_version, end_version = + next_token.offset, next_token.begin_version, next_token.end_version + else + begin_version = self.strategy:get_latest_version() + end + + -- DP finished syncing DB entities. Now trying to catch up with the fix-up deltas + if not offset then + if not end_version then + return nil, "invalid next_token" + end + + local res, err = self.strategy:get_delta(end_version) + if not res then + return nil, err + end + + -- history is lost. Unable to make a consistent full sync + if not isempty(res) and res[1].version ~= default_namespace_version + 1 then + return nil, "history lost, unable to make a consistent full sync" + end + + return paged_full_sync_payload(res, nil) -- nil next_token marks the end + end + + local pager = self.pager + if not pager then + pager = resumable_chunker.from_db(manager.db, { + size = self.page_size, + }) + self.pager = pager + end + + local page, err, new_offset = pager:fetch(nil, offset) + if not page then + return nil, err + end + + local deltas = page_to_deltas(page) + + if not new_offset then + end_version = self.strategy:get_latest_version() + + -- no changes during the full sync session. No need for fix-up deltas + if end_version == begin_version then + return paged_full_sync_payload(deltas, nil) + end + + -- let DP initiate another call to get fix-up deltas + return paged_full_sync_payload(deltas, { + end_version = end_version, + }) + end + + -- more DB pages to fetch + return paged_full_sync_payload(deltas, { + offset = new_offset, + begin_version = begin_version, + }) +end + + function _M:init_cp(manager) local purge_delay = manager.conf.cluster_data_plane_purge_delay @@ -109,7 +234,8 @@ function _M:init_cp(manager) -- is the node empty? If so, just do a full sync to bring it up to date faster if default_namespace_version == 0 or - latest_version - default_namespace_version > FULL_SYNC_THRESHOLD + latest_version - default_namespace_version > FULL_SYNC_THRESHOLD or + default_namespace.next_token -- a full-sync session is in progress then -- we need to full sync because holes are found @@ -118,7 +244,7 @@ function _M:init_cp(manager) ", current_version: ", default_namespace_version, ", forcing a full sync") - return full_sync_result() + return full_sync(self, default_namespace) end -- do we need an incremental sync? @@ -148,7 +274,7 @@ function _M:init_cp(manager) ", current_version: ", default_namespace_version, ", forcing a full sync") - return full_sync_result() + return full_sync(self, default_namespace) end) end @@ -210,32 +336,84 @@ local function is_rpc_ready() end +local function purge(t) + t:db_drop(false) + -- we are at a unready state + -- consider the config empty + t:set(DECLARATIVE_HASH_KEY, DECLARATIVE_EMPTY_CONFIG_HASH) + kong.core_cache:purge() + kong.cache:purge() +end + + +local function paginated_error_handle() + -- a failed full sync. + local t = txn.begin(512) + purge(t) + local ok, err = t:commit() + if not ok then + error("failed to reset DB when handling error: " .. err) + end + + kong_shm:set(CLUSTERING_DATA_PLANES_PAGED_FULL_SYNC_NEXT_TOKEN_KEY, 0) + + -- retry immediately + return _M:sync_once(0) +end + + local function do_sync() if not is_rpc_ready() then return nil, "rpc is not ready" end + local next_token = kong_shm:get(CLUSTERING_DATA_PLANES_PAGED_FULL_SYNC_NEXT_TOKEN_KEY) + + local version + if next_token then + version = 0 + else + version = tonumber(declarative.get_current_hash()) or 0 + end + local msg = { default = - { version = - tonumber(declarative.get_current_hash()) or 0, + { version = version, + next_token = next_token, + pageable = true, }, } - local ns_deltas, err = kong.rpc:call("control_plane", "kong.sync.v2.get_delta", msg) - if not ns_deltas then + local result, err = kong.rpc:call("control_plane", "kong.sync.v2.get_delta", msg) + if not result then ngx_log(ngx_ERR, "sync get_delta error: ", err) + + if next_token then + return paginated_error_handle() + end + return true end - -- ns_deltas should look like: - -- { default = { deltas = { ... }, wipe = true, }, } + -- result should look like: + -- { default = { deltas = { ... }, wipe = true, full_sync_done = false, next_token = ...}, } - local ns_delta = ns_deltas.default - if not ns_delta then + local payload = result.default + if not payload then return nil, "default namespace does not exist inside params" end - local deltas = ns_delta.deltas + local full_sync, first_page, last_page + if payload.full_sync then + full_sync = true + first_page = not next_token and payload.next_token + last_page = not payload.next_token + + elseif payload.wipe then + -- lagacy full sync + full_sync, first_page, last_page = true, true, true + end + + local deltas = payload.deltas if isempty(deltas) then -- no delta to sync @@ -258,9 +436,11 @@ local function do_sync() local t = txn.begin(512) - local wipe = ns_delta.wipe - if wipe then - t:db_drop(false) + -- a full sync begins + if first_page then + -- reset the lmdb + purge(t) + next_token = payload.next_token end local db = kong.db @@ -291,8 +471,8 @@ local function do_sync() return nil, err end - -- If we will wipe lmdb, we don't need to delete it from lmdb. - if old_entity and not wipe then + -- If we are purging, we don't need to delete it. + if old_entity and not full_sync then local res, err = delete_entity_for_txn(t, delta_type, old_entity, opts) if not res then return nil, err @@ -309,8 +489,8 @@ local function do_sync() ", version: ", delta_version, ", type: ", delta_type) - -- wipe the whole lmdb, should not have events - if not wipe then + -- during the full sync, should not emit events + if not full_sync then ev = { delta_type, old_entity and "update" or "create", delta_entity, old_entity, } end @@ -321,8 +501,8 @@ local function do_sync() return nil, err end - -- If we will wipe lmdb, we don't need to delete it from lmdb. - if old_entity and not wipe then + -- during the full sync, should not emit events + if old_entity and not in_full_sync then local res, err = delete_entity_for_txn(t, delta_type, old_entity, opts) if not res then return nil, err @@ -334,14 +514,14 @@ local function do_sync() ", version: ", delta_version, ", type: ", delta_type) - -- wipe the whole lmdb, should not have events - if not wipe then + -- delete the entity, opts for getting correct lmdb key + if not in_full_sync then ev = { delta_type, "delete", old_entity, } end end -- if delta_entity ~= nil and delta_entity ~= ngx_null - -- wipe the whole lmdb, should not have events - if not wipe then + -- during the full sync, should not emit events + if not full_sync then crud_events_n = crud_events_n + 1 crud_events[crud_events_n] = ev end @@ -354,9 +534,12 @@ local function do_sync() end end -- for _, delta - -- store current sync version - t:set(DECLARATIVE_HASH_KEY, fmt("%032d", version)) - + -- only update the sync version if not in full sync/ full sync done + if (not full_sync) or last_page then + -- store current sync version + t:set(DECLARATIVE_HASH_KEY, fmt("%032d", version)) + end + -- store the correct default workspace uuid if default_ws_changed then t:set(DECLARATIVE_DEFAULT_WORKSPACE_KEY, kong.default_workspace) @@ -367,37 +550,45 @@ local function do_sync() return nil, err end - if wipe then - kong.core_cache:purge() - kong.cache:purge() + if full_sync then + -- the full sync is done + if last_page then + kong_shm:set(CLUSTERING_DATA_PLANES_PAGED_FULL_SYNC_NEXT_TOKEN_KEY, nil) + + -- Trigger other workers' callbacks like reconfigure_handler. + -- + -- Full sync could rebuild route, plugins and balancer route, so their + -- hashes are nil. + -- Until this point, the dataplane is not ready to serve requests or to + -- do delta syncs. + local reconfigure_data = { kong.default_workspace, nil, nil, nil, } + return events.declarative_reconfigure_notify(reconfigure_data) - -- Trigger other workers' callbacks like reconfigure_handler. - -- - -- Full sync could rebuild route, plugins and balancer route, so their - -- hashes are nil. - local reconfigure_data = { kong.default_workspace, nil, nil, nil, } - local ok, err = events.declarative_reconfigure_notify(reconfigure_data) - if not ok then - return nil, err - end + else + kong_shm:set(CLUSTERING_DATA_PLANES_PAGED_FULL_SYNC_NEXT_TOKEN_KEY, payload.next_token) - else - for _, event in ipairs(crud_events) do - -- delta_type, crud_event_type, delta.entity, old_entity - db[event[1]]:post_crud_event(event[2], event[3], event[4]) + -- get next page imeediately without releasing the mutex + return do_sync() end end + -- emit the CRUD events + -- if in_full_sync, no events should be added into the queue + for _, event in ipairs(crud_events) do + -- delta_type, crud_event_type, delta.entity, old_entity + db[event[1]]:post_crud_event(event[2], event[3], event[4]) + end + return true end -local function sync_handler(premature) +local function sync_handler(premature, try_counter, dp_status) if premature then return end - local res, err = concurrency.with_worker_mutex(SYNC_MUTEX_OPTS, do_sync) + local res, err = concurrency.with_worker_mutex(SYNC_MUTEX_OPTS, function() do_sync(dp_status) end) if not res and err ~= "timeout" then ngx_log(ngx_ERR, "unable to create worker mutex and sync: ", err) end @@ -446,12 +637,12 @@ end function _M:sync_once(delay) - return ngx.timer.at(delay or 0, sync_once_impl, 0) + return ngx.timer.at(delay or 0, sync_once_impl, 0, self) end function _M:sync_every(delay) - return ngx.timer.every(delay, sync_handler) + return ngx.timer.every(delay, sync_handler, nil, self) end diff --git a/kong/constants.lua b/kong/constants.lua index 4b04b01ce3fa9..ef02e4948d265 100644 --- a/kong/constants.lua +++ b/kong/constants.lua @@ -235,6 +235,7 @@ local constants = { GENERIC = "generic or unknown error", }, CLUSTERING_DATA_PLANES_LATEST_VERSION_KEY = "clustering_data_planes:latest_version", + CLUSTERING_DATA_PLANES_PAGED_FULL_SYNC_NEXT_TOKEN_KEY = "clustering_data_planes:paged_full_sync_next_token", CLEAR_HEALTH_STATUS_DELAY = 300, -- 300 seconds