From d07425b0a7b5a3e7e29f719dd068f826e36829d9 Mon Sep 17 00:00:00 2001 From: Chrono Date: Mon, 23 Dec 2024 14:01:26 +0800 Subject: [PATCH] refactor(clustering/rpc): simplify the implementation (#14026) KAG-6036 --- kong-3.10.0-0.rockspec | 1 + kong/clustering/services/sync/hooks.lua | 58 +-- kong/clustering/services/sync/rpc.lua | 53 +-- .../services/sync/strategies/postgres.lua | 75 +--- kong/conf_loader/constants.lua | 1 - kong/db/migrations/core/024_380_to_390.lua | 9 - kong/db/migrations/core/025_390_to_3100.lua | 12 + kong/templates/kong_defaults.lua | 1 - .../09-hybrid_mode/01-sync_spec.lua | 6 + .../09-hybrid_mode/08-lazy_export_spec.lua | 1 - .../19-incrmental_sync/01-sync_spec.lua | 341 ------------------ .../02-multiple_dp_nodes_spec.lua | 113 ------ .../migrations/core/024_380_to_390_spec.lua | 9 - .../migrations/core/025_390_to_3100_spec.lua | 7 + 14 files changed, 35 insertions(+), 652 deletions(-) create mode 100644 kong/db/migrations/core/025_390_to_3100.lua delete mode 100644 spec/02-integration/19-incrmental_sync/01-sync_spec.lua delete mode 100644 spec/02-integration/19-incrmental_sync/02-multiple_dp_nodes_spec.lua create mode 100644 spec/05-migration/db/migrations/core/025_390_to_3100_spec.lua diff --git a/kong-3.10.0-0.rockspec b/kong-3.10.0-0.rockspec index 22e19b4fefe9..9aafa5c0c0a2 100644 --- a/kong-3.10.0-0.rockspec +++ b/kong-3.10.0-0.rockspec @@ -341,6 +341,7 @@ build = { ["kong.db.migrations.core.022_350_to_360"] = "kong/db/migrations/core/022_350_to_360.lua", ["kong.db.migrations.core.023_360_to_370"] = "kong/db/migrations/core/023_360_to_370.lua", ["kong.db.migrations.core.024_380_to_390"] = "kong/db/migrations/core/024_380_to_390.lua", + ["kong.db.migrations.core.025_390_to_3100"] = "kong/db/migrations/core/025_390_to_3100.lua", ["kong.db.migrations.operations.200_to_210"] = "kong/db/migrations/operations/200_to_210.lua", ["kong.db.migrations.operations.212_to_213"] = "kong/db/migrations/operations/212_to_213.lua", ["kong.db.migrations.operations.280_to_300"] = "kong/db/migrations/operations/280_to_300.lua", diff --git a/kong/clustering/services/sync/hooks.lua b/kong/clustering/services/sync/hooks.lua index a9368f755061..ddc36d6ccfaa 100644 --- a/kong/clustering/services/sync/hooks.lua +++ b/kong/clustering/services/sync/hooks.lua @@ -7,7 +7,6 @@ local EMPTY = require("kong.tools.table").EMPTY local ipairs = ipairs -local ngx_null = ngx.null local ngx_log = ngx.log local ngx_ERR = ngx.ERR local ngx_DEBUG = ngx.DEBUG @@ -74,29 +73,8 @@ function _M:notify_all_nodes() end -local function gen_delta(entity, name, options, ws_id, is_delete) - -- composite key, like { id = ... } - local schema = kong.db[name].schema - local pk = schema:extract_pk_values(entity) - - assert(schema:validate_primary_key(pk)) - - local delta = { - type = name, - pk = pk, - ws_id = ws_id, - entity = is_delete and ngx_null or entity, - } - - return delta -end - - function _M:entity_delta_writer(entity, name, options, ws_id, is_delete) - local d = gen_delta(entity, name, options, ws_id, is_delete) - local deltas = { d, } - - local res, err = self.strategy:insert_delta(deltas) + local res, err = self.strategy:insert_delta() if not res then self.strategy:cancel_txn() return nil, err @@ -164,39 +142,7 @@ function _M:register_dao_hooks() ngx_log(ngx_DEBUG, "[kong.sync.v2] new delta due to deleting ", name) - -- set lmdb value to ngx_null then return entity - - local d = gen_delta(entity, name, options, ws_id, true) - local deltas = { d, } - - -- delete other related entities - for i, item in ipairs(cascade_entries or EMPTY) do - local e = item.entity - local name = item.dao.schema.name - - ngx_log(ngx_DEBUG, "[kong.sync.v2] new delta due to cascade deleting ", name) - - d = gen_delta(e, name, options, e.ws_id, true) - - -- #1 item is initial entity - deltas[i + 1] = d - end - - local res, err = self.strategy:insert_delta(deltas) - if not res then - self.strategy:cancel_txn() - return nil, err - end - - res, err = self.strategy:commit_txn() - if not res then - self.strategy:cancel_txn() - return nil, err - end - - self:notify_all_nodes() - - return entity -- for other hooks + return self:entity_delta_writer(entity, name, options, ws_id) end local dao_hooks = { diff --git a/kong/clustering/services/sync/rpc.lua b/kong/clustering/services/sync/rpc.lua index 006a8c4bb439..b6525c3ee04a 100644 --- a/kong/clustering/services/sync/rpc.lua +++ b/kong/clustering/services/sync/rpc.lua @@ -26,14 +26,9 @@ local fmt = string.format local ngx_null = ngx.null local ngx_log = ngx.log local ngx_ERR = ngx.ERR -local ngx_INFO = ngx.INFO local ngx_DEBUG = ngx.DEBUG --- number of versions behind before a full sync is forced -local DEFAULT_FULL_SYNC_THRESHOLD = 512 - - function _M.new(strategy) local self = { strategy = strategy, @@ -43,8 +38,8 @@ function _M.new(strategy) end -local function inc_sync_result(res) - return { default = { deltas = res, wipe = false, }, } +local function empty_sync_result() + return { default = { deltas = {}, wipe = false, }, } end @@ -62,10 +57,6 @@ end function _M:init_cp(manager) local purge_delay = manager.conf.cluster_data_plane_purge_delay - -- number of versions behind before a full sync is forced - local FULL_SYNC_THRESHOLD = manager.conf.cluster_full_sync_threshold or - DEFAULT_FULL_SYNC_THRESHOLD - -- CP -- Method: kong.sync.v2.get_delta -- Params: versions: list of current versions of the database @@ -107,48 +98,12 @@ function _M:init_cp(manager) return nil, err end - -- is the node empty? If so, just do a full sync to bring it up to date faster if default_namespace_version == 0 or - latest_version - default_namespace_version > FULL_SYNC_THRESHOLD - then - -- we need to full sync because holes are found - - ngx_log(ngx_INFO, - "[kong.sync.v2] database is empty or too far behind for node_id: ", node_id, - ", current_version: ", default_namespace_version, - ", forcing a full sync") - + default_namespace_version < latest_version then return full_sync_result() end - -- do we need an incremental sync? - - local res, err = self.strategy:get_delta(default_namespace_version) - if not res then - return nil, err - end - - if isempty(res) then - -- node is already up to date - return inc_sync_result(res) - end - - -- some deltas are returned, are they contiguous? - if res[1].version == default_namespace_version + 1 then - -- doesn't wipe dp lmdb, incremental sync - return inc_sync_result(res) - end - - -- we need to full sync because holes are found - -- in the delta, meaning the oldest version is no longer - -- available - - ngx_log(ngx_INFO, - "[kong.sync.v2] delta for node_id no longer available: ", node_id, - ", current_version: ", default_namespace_version, - ", forcing a full sync") - - return full_sync_result() + return empty_sync_result() end) end diff --git a/kong/clustering/services/sync/strategies/postgres.lua b/kong/clustering/services/sync/strategies/postgres.lua index 1ef758e2c1d0..7bc0784c6e61 100644 --- a/kong/clustering/services/sync/strategies/postgres.lua +++ b/kong/clustering/services/sync/strategies/postgres.lua @@ -2,19 +2,7 @@ local _M = {} local _MT = { __index = _M } -local cjson = require("cjson.safe") -local buffer = require("string.buffer") - - -local string_format = string.format -local cjson_encode = cjson.encode local ngx_null = ngx.null -local ngx_log = ngx.log -local ngx_ERR = ngx.ERR - - -local KEEP_VERSION_COUNT = 100 -local CLEANUP_TIME_DELAY = 3600 -- 1 hour function _M.new(db) @@ -26,32 +14,8 @@ function _M.new(db) end -local PURGE_QUERY = [[ - DELETE FROM clustering_sync_version - WHERE "version" < ( - SELECT MAX("version") - %d - FROM clustering_sync_version - ); -]] - - +-- reserved for future function _M:init_worker() - local function cleanup_handler(premature) - if premature then - return - end - - local res, err = self.connector:query(string_format(PURGE_QUERY, KEEP_VERSION_COUNT)) - if not res then - ngx_log(ngx_ERR, - "[incremental] unable to purge old data from incremental delta table, err: ", - err) - - return - end - end - - assert(ngx.timer.every(CLEANUP_TIME_DELAY, cleanup_handler)) end @@ -61,37 +25,12 @@ local NEW_VERSION_QUERY = [[ new_version integer; BEGIN INSERT INTO clustering_sync_version DEFAULT VALUES RETURNING version INTO new_version; - INSERT INTO clustering_sync_delta (version, type, pk, ws_id, entity) VALUES %s; END $$; ]] --- deltas: { --- { type = "service", "pk" = { id = "d78eb00f..." }, "ws_id" = "73478cf6...", entity = "JSON", } --- { type = "route", "pk" = { id = "0a5bac5c..." }, "ws_id" = "73478cf6...", entity = "JSON", } --- } -function _M:insert_delta(deltas) - local buf = buffer.new() - - local count = #deltas - for i = 1, count do - local d = deltas[i] - - buf:putf("(new_version, %s, %s, %s, %s)", - self.connector:escape_literal(d.type), - self.connector:escape_literal(cjson_encode(d.pk)), - self.connector:escape_literal(d.ws_id or kong.default_workspace), - self.connector:escape_literal(cjson_encode(d.entity))) - - -- sql values should be separated by comma - if i < count then - buf:put(",") - end - end - - local sql = string_format(NEW_VERSION_QUERY, buf:get()) - - return self.connector:query(sql) +function _M:insert_delta() + return self.connector:query(NEW_VERSION_QUERY) end @@ -112,14 +51,6 @@ function _M:get_latest_version() end -function _M:get_delta(version) - local sql = "SELECT * FROM clustering_sync_delta" .. - " WHERE version > " .. self.connector:escape_literal(version) .. - " ORDER BY version ASC" - return self.connector:query(sql) -end - - function _M:begin_txn() return self.connector:query("BEGIN;") end diff --git a/kong/conf_loader/constants.lua b/kong/conf_loader/constants.lua index 2e3a27b31b57..f0fd9c0c0310 100644 --- a/kong/conf_loader/constants.lua +++ b/kong/conf_loader/constants.lua @@ -514,7 +514,6 @@ local CONF_PARSERS = { cluster_dp_labels = { typ = "array" }, cluster_rpc = { typ = "boolean" }, cluster_rpc_sync = { typ = "boolean" }, - cluster_full_sync_threshold = { typ = "number" }, cluster_cjson = { typ = "boolean" }, kic = { typ = "boolean" }, diff --git a/kong/db/migrations/core/024_380_to_390.lua b/kong/db/migrations/core/024_380_to_390.lua index b433500f7edc..39e4dc5af2e3 100644 --- a/kong/db/migrations/core/024_380_to_390.lua +++ b/kong/db/migrations/core/024_380_to_390.lua @@ -6,15 +6,6 @@ return { CREATE TABLE IF NOT EXISTS clustering_sync_version ( "version" SERIAL PRIMARY KEY ); - CREATE TABLE IF NOT EXISTS clustering_sync_delta ( - "version" INT NOT NULL, - "type" TEXT NOT NULL, - "pk" JSON NOT NULL, - "ws_id" UUID NOT NULL, - "entity" JSON, - FOREIGN KEY (version) REFERENCES clustering_sync_version(version) ON DELETE CASCADE - ); - CREATE INDEX IF NOT EXISTS clustering_sync_delta_version_idx ON clustering_sync_delta (version); END; $$; ]] diff --git a/kong/db/migrations/core/025_390_to_3100.lua b/kong/db/migrations/core/025_390_to_3100.lua new file mode 100644 index 000000000000..8af90e46c83c --- /dev/null +++ b/kong/db/migrations/core/025_390_to_3100.lua @@ -0,0 +1,12 @@ +return { + postgres = { + up = [[ + DO $$ + BEGIN + DROP TABLE IF EXISTS clustering_sync_delta; + DROP INDEX IF EXISTS clustering_sync_delta_version_idx; + END; + $$; + ]] + } +} diff --git a/kong/templates/kong_defaults.lua b/kong/templates/kong_defaults.lua index 83ef5f95eb3c..68f3863b5f42 100644 --- a/kong/templates/kong_defaults.lua +++ b/kong/templates/kong_defaults.lua @@ -43,7 +43,6 @@ cluster_use_proxy = off cluster_dp_labels = NONE cluster_rpc = off cluster_rpc_sync = off -cluster_full_sync_threshold = 512 cluster_cjson = off lmdb_environment_path = dbless.lmdb diff --git a/spec/02-integration/09-hybrid_mode/01-sync_spec.lua b/spec/02-integration/09-hybrid_mode/01-sync_spec.lua index bb941bd4ed30..4877869c4a23 100644 --- a/spec/02-integration/09-hybrid_mode/01-sync_spec.lua +++ b/spec/02-integration/09-hybrid_mode/01-sync_spec.lua @@ -732,6 +732,12 @@ describe("CP/DP config sync #" .. strategy .. " rpc_sync=" .. rpc_sync, function end end, 5) + -- TODO: it may cause flakiness + -- wait for rpc sync finishing + if rpc_sync == "on" then + ngx.sleep(0.5) + end + for i = 5, 2, -1 do res = proxy_client:get("/" .. i) assert.res_status(404, res) diff --git a/spec/02-integration/09-hybrid_mode/08-lazy_export_spec.lua b/spec/02-integration/09-hybrid_mode/08-lazy_export_spec.lua index bc235385c6b5..14230c89f2f6 100644 --- a/spec/02-integration/09-hybrid_mode/08-lazy_export_spec.lua +++ b/spec/02-integration/09-hybrid_mode/08-lazy_export_spec.lua @@ -93,7 +93,6 @@ describe("lazy_export with #".. strategy .. " rpc_sync=" .. rpc_sync, function() touch_config() if rpc_sync == "on" then assert.logfile().has.line("[kong.sync.v2] config push (connected client)", true) - assert.logfile().has.line("[kong.sync.v2] database is empty or too far behind for node_id", true) else assert.logfile().has.line("[clustering] exporting config", true) diff --git a/spec/02-integration/19-incrmental_sync/01-sync_spec.lua b/spec/02-integration/19-incrmental_sync/01-sync_spec.lua deleted file mode 100644 index a608e6432edb..000000000000 --- a/spec/02-integration/19-incrmental_sync/01-sync_spec.lua +++ /dev/null @@ -1,341 +0,0 @@ -local helpers = require "spec.helpers" -local cjson = require("cjson.safe") - -local function test_url(path, port, code, headers) - helpers.wait_until(function() - local proxy_client = helpers.http_client("127.0.0.1", port) - - local res = proxy_client:send({ - method = "GET", - path = path, - headers = headers, - }) - - local status = res and res.status - proxy_client:close() - if status == code then - return true - end - end, 10) -end - -for _, strategy in helpers.each_strategy() do - -describe("Incremental Sync RPC #" .. strategy, function() - - lazy_setup(function() - helpers.get_db_utils(strategy, { - "clustering_data_planes", - }) -- runs migrations - - assert(helpers.start_kong({ - role = "control_plane", - cluster_cert = "spec/fixtures/kong_clustering.crt", - cluster_cert_key = "spec/fixtures/kong_clustering.key", - database = strategy, - cluster_listen = "127.0.0.1:9005", - nginx_conf = "spec/fixtures/custom_nginx.template", - cluster_rpc = "on", - cluster_rpc_sync = "on", -- rpc sync - })) - - assert(helpers.start_kong({ - role = "data_plane", - database = "off", - prefix = "servroot2", - cluster_cert = "spec/fixtures/kong_clustering.crt", - cluster_cert_key = "spec/fixtures/kong_clustering.key", - cluster_control_plane = "127.0.0.1:9005", - proxy_listen = "0.0.0.0:9002", - nginx_conf = "spec/fixtures/custom_nginx.template", - nginx_worker_processes = 4, -- multiple workers - cluster_rpc = "on", - cluster_rpc_sync = "on", -- rpc sync - worker_state_update_frequency = 1, - })) - end) - - lazy_teardown(function() - helpers.stop_kong("servroot2") - helpers.stop_kong() - end) - - after_each(function() - helpers.clean_logfile("servroot2/logs/error.log") - helpers.clean_logfile() - end) - - describe("sync works", function() - local route_id - - it("create route on CP", function() - local admin_client = helpers.admin_client(10000) - finally(function() - admin_client:close() - end) - - local res = assert(admin_client:post("/services", { - body = { name = "service-001", url = "https://127.0.0.1:15556/request", }, - headers = {["Content-Type"] = "application/json"} - })) - assert.res_status(201, res) - - res = assert(admin_client:post("/services/service-001/routes", { - body = { paths = { "/001" }, }, - headers = {["Content-Type"] = "application/json"} - })) - local body = assert.res_status(201, res) - local json = cjson.decode(body) - - route_id = json.id - - test_url("/001", 9002, 200) - - assert.logfile().has.line("[kong.sync.v2] config push (connected client)", true) - assert.logfile().has.no.line("unable to update clustering data plane status", true) - - assert.logfile("servroot2/logs/error.log").has.line("[kong.sync.v2] update entity", true) - - -- dp lua-resty-events should work without privileged_agent - assert.logfile("servroot2/logs/error.log").has.line( - "lua-resty-events enable_privileged_agent is false", true) - end) - - it("update route on CP", function() - local admin_client = helpers.admin_client(10000) - finally(function() - admin_client:close() - end) - - local res = assert(admin_client:post("/services", { - body = { name = "service-002", url = "https://127.0.0.1:15556/request", }, - headers = {["Content-Type"] = "application/json"} - })) - assert.res_status(201, res) - - res = assert(admin_client:post("/services/service-002/routes", { - body = { paths = { "/002-foo" }, }, - headers = {["Content-Type"] = "application/json"} - })) - local body = assert.res_status(201, res) - local json = cjson.decode(body) - - route_id = json.id - - test_url("/002-foo", 9002, 200) - - res = assert(admin_client:put("/services/service-002/routes/" .. route_id, { - body = { paths = { "/002-bar" }, }, - headers = {["Content-Type"] = "application/json"} - })) - assert.res_status(200, res) - - test_url("/002-bar", 9002, 200) - - assert.logfile().has.line("[kong.sync.v2] config push (connected client)", true) - assert.logfile().has.no.line("unable to update clustering data plane status", true) - - assert.logfile("servroot2/logs/error.log").has.line("[kong.sync.v2] update entity", true) - end) - - it("delete route on CP", function() - local admin_client = helpers.admin_client(10000) - finally(function() - admin_client:close() - end) - - local res = assert(admin_client:post("/services", { - body = { name = "service-003", url = "https://127.0.0.1:15556/request", }, - headers = {["Content-Type"] = "application/json"} - })) - assert.res_status(201, res) - - res = assert(admin_client:post("/services/service-003/routes", { - body = { paths = { "/003-foo" }, }, - headers = {["Content-Type"] = "application/json"} - })) - local body = assert.res_status(201, res) - local json = cjson.decode(body) - - route_id = json.id - - test_url("/003-foo", 9002, 200) - - assert.logfile().has.line("[kong.sync.v2] config push (connected client)", true) - assert.logfile().has.no.line("unable to update clustering data plane status", true) - - assert.logfile("servroot2/logs/error.log").has.line("[kong.sync.v2] update entity", true) - assert.logfile("servroot2/logs/error.log").has.no.line("[kong.sync.v2] delete entity", true) - - res = assert(admin_client:delete("/services/service-003/routes/" .. route_id)) - assert.res_status(204, res) - - test_url("/003-foo", 9002, 404) - - assert.logfile("servroot2/logs/error.log").has.line("[kong.sync.v2] delete entity", true) - end) - - it("update route on CP with name", function() - local admin_client = helpers.admin_client(10000) - finally(function() - admin_client:close() - end) - - local res = assert(admin_client:post("/services", { - body = { name = "service-004", url = "https://127.0.0.1:15556/request", }, - headers = {["Content-Type"] = "application/json"} - })) - assert.res_status(201, res) - - res = assert(admin_client:post("/services/service-004/routes", { - body = { name = "route-004", paths = { "/004-foo" }, }, - headers = {["Content-Type"] = "application/json"} - })) - assert.res_status(201, res) - - test_url("/004-foo", 9002, 200) - - res = assert(admin_client:put("/services/service-004/routes/route-004", { - body = { paths = { "/004-bar" }, }, - headers = {["Content-Type"] = "application/json"} - })) - assert.res_status(200, res) - - test_url("/004-bar", 9002, 200) - - assert.logfile().has.line("[kong.sync.v2] config push (connected client)", true) - assert.logfile().has.no.line("unable to update clustering data plane status", true) - - assert.logfile("servroot2/logs/error.log").has.line("[kong.sync.v2] update entity", true) - end) - - it("delete route on CP with name", function() - local admin_client = helpers.admin_client(10000) - finally(function() - admin_client:close() - end) - - local res = assert(admin_client:post("/services", { - body = { name = "service-005", url = "https://127.0.0.1:15556/request", }, - headers = {["Content-Type"] = "application/json"} - })) - assert.res_status(201, res) - - res = assert(admin_client:post("/services/service-005/routes", { - body = { name = "route-005", paths = { "/005-foo" }, }, - headers = {["Content-Type"] = "application/json"} - })) - assert.res_status(201, res) - - test_url("/005-foo", 9002, 200) - - assert.logfile().has.line("[kong.sync.v2] config push (connected client)", true) - assert.logfile().has.no.line("unable to update clustering data plane status", true) - - assert.logfile("servroot2/logs/error.log").has.line("[kong.sync.v2] update entity", true) - assert.logfile("servroot2/logs/error.log").has.no.line("[kong.sync.v2] delete entity", true) - - res = assert(admin_client:delete("/services/service-005/routes/route-005")) - assert.res_status(204, res) - - test_url("/005-foo", 9002, 404) - - assert.logfile("servroot2/logs/error.log").has.line("[kong.sync.v2] delete entity", true) - end) - - it("cascade delete on CP", function() - local admin_client = helpers.admin_client(10000) - finally(function() - admin_client:close() - end) - - -- create service and route - - local res = assert(admin_client:post("/services", { - body = { name = "service-006", url = "https://127.0.0.1:15556/request", }, - headers = {["Content-Type"] = "application/json"} - })) - assert.res_status(201, res) - - res = assert(admin_client:post("/services/service-006/routes", { - body = { paths = { "/006-foo" }, }, - headers = {["Content-Type"] = "application/json"} - })) - local body = assert.res_status(201, res) - local json = cjson.decode(body) - - route_id = json.id - - test_url("/006-foo", 9002, 200) - - assert.logfile().has.line("[kong.sync.v2] config push (connected client)", true) - assert.logfile().has.no.line("unable to update clustering data plane status", true) - - assert.logfile("servroot2/logs/error.log").has.line("[kong.sync.v2] update entity", true) - - -- create consumer and key-auth - - res = assert(admin_client:post("/consumers", { - body = { username = "foo", }, - headers = {["Content-Type"] = "application/json"} - })) - assert.res_status(201, res) - - res = assert(admin_client:post("/consumers/foo/key-auth", { - body = { key = "my-key", }, - headers = {["Content-Type"] = "application/json"} - })) - assert.res_status(201, res) - res = assert(admin_client:post("/plugins", { - body = { name = "key-auth", - config = { key_names = {"apikey"} }, - route = { id = route_id }, - }, - headers = {["Content-Type"] = "application/json"} - })) - assert.res_status(201, res) - - test_url("/006-foo", 9002, 200, {["apikey"] = "my-key"}) - - assert.logfile().has.no.line("[kong.sync.v2] new delta due to cascade deleting", true) - assert.logfile("servroot2/logs/error.log").has.no.line("[kong.sync.v2] delete entity", true) - - -- delete consumer and key-auth - - res = assert(admin_client:delete("/consumers/foo")) - assert.res_status(204, res) - - test_url("/006-foo", 9002, 401, {["apikey"] = "my-key"}) - - assert.logfile().has.line("[kong.sync.v2] new delta due to cascade deleting", true) - assert.logfile("servroot2/logs/error.log").has.line("[kong.sync.v2] delete entity", true) - - -- cascade deletion should be the same version - - local ver - local count = 0 - local patt = "delete entity, version: %d+" - local f = io.open("servroot2/logs/error.log", "r") - while true do - local line = f:read("*l") - - if not line then - f:close() - break - end - - local found = line:match(patt) - if found then - ver = ver or found - assert.equal(ver, found) - count = count + 1 - end - end - assert(count > 1) - - end) - end) - -end) - -end -- for _, strategy diff --git a/spec/02-integration/19-incrmental_sync/02-multiple_dp_nodes_spec.lua b/spec/02-integration/19-incrmental_sync/02-multiple_dp_nodes_spec.lua deleted file mode 100644 index fe7f89432a5a..000000000000 --- a/spec/02-integration/19-incrmental_sync/02-multiple_dp_nodes_spec.lua +++ /dev/null @@ -1,113 +0,0 @@ -local helpers = require "spec.helpers" -local cjson = require("cjson.safe") - -local function start_cp(strategy, port) - assert(helpers.start_kong({ - role = "control_plane", - cluster_cert = "spec/fixtures/kong_clustering.crt", - cluster_cert_key = "spec/fixtures/kong_clustering.key", - database = strategy, - cluster_listen = "127.0.0.1:" .. port, - nginx_conf = "spec/fixtures/custom_nginx.template", - cluster_rpc = "on", - cluster_rpc_sync = "on", -- rpc sync - })) -end - -local function start_dp(prefix, port) - assert(helpers.start_kong({ - role = "data_plane", - database = "off", - prefix = prefix, - cluster_cert = "spec/fixtures/kong_clustering.crt", - cluster_cert_key = "spec/fixtures/kong_clustering.key", - cluster_control_plane = "127.0.0.1:9005", - proxy_listen = "0.0.0.0:" .. port, - nginx_conf = "spec/fixtures/custom_nginx.template", - nginx_worker_processes = 4, -- multiple workers - cluster_rpc = "on", - cluster_rpc_sync = "on", -- rpc sync - worker_state_update_frequency = 1, - })) -end - -local function test_url(path, port, code) - helpers.wait_until(function() - local proxy_client = helpers.http_client("127.0.0.1", port) - - local res = proxy_client:send({ - method = "GET", - path = path, - }) - - local status = res and res.status - proxy_client:close() - if status == code then - return true - end - end, 10) -end - -for _, strategy in helpers.each_strategy() do - -describe("Incremental Sync RPC #" .. strategy, function() - - lazy_setup(function() - helpers.get_db_utils(strategy, { - "clustering_data_planes", - }) -- runs migrations - - start_cp(strategy, 9005) - start_dp("servroot2", 9002) - start_dp("servroot3", 9003) - end) - - lazy_teardown(function() - helpers.stop_kong("servroot2") - helpers.stop_kong("servroot3") - helpers.stop_kong() - end) - - describe("sync works with multiple DP nodes", function() - - it("adding/removing routes", function() - local admin_client = helpers.admin_client(10000) - finally(function() - admin_client:close() - end) - - local res = assert(admin_client:post("/services", { - body = { name = "service-001", url = "https://127.0.0.1:15556/request", }, - headers = {["Content-Type"] = "application/json"} - })) - assert.res_status(201, res) - - -- add a route - - res = assert(admin_client:post("/services/service-001/routes", { - body = { paths = { "/001" }, }, - headers = {["Content-Type"] = "application/json"} - })) - assert.res_status(201, res) - local body = assert.res_status(201, res) - local json = cjson.decode(body) - local route_id = json.id - - test_url("/001", 9002, 200) - assert.logfile("servroot2/logs/error.log").has.line("[kong.sync.v2] update entity", true) - - test_url("/001", 9003, 200) - assert.logfile("servroot3/logs/error.log").has.line("[kong.sync.v2] update entity", true) - - -- remove a route - - res = assert(admin_client:delete("/services/service-001/routes/" .. route_id)) - assert.res_status(204, res) - - test_url("/001", 9002, 404) - test_url("/001", 9003, 404) - end) - end) -end) - -end -- for _, strategy diff --git a/spec/05-migration/db/migrations/core/024_380_to_390_spec.lua b/spec/05-migration/db/migrations/core/024_380_to_390_spec.lua index cf0f04513c68..e4b28fbce9a3 100644 --- a/spec/05-migration/db/migrations/core/024_380_to_390_spec.lua +++ b/spec/05-migration/db/migrations/core/024_380_to_390_spec.lua @@ -5,13 +5,4 @@ describe("database migration", function() assert.database_has_relation("clustering_sync_version") assert.table_has_column("clustering_sync_version", "version", "integer") end) - - uh.old_after_up("has created the \"clustering_sync_delta\" table", function() - assert.database_has_relation("clustering_sync_delta") - assert.table_has_column("clustering_sync_delta", "version", "integer") - assert.table_has_column("clustering_sync_delta", "type", "text") - assert.table_has_column("clustering_sync_delta", "pk", "json") - assert.table_has_column("clustering_sync_delta", "ws_id", "uuid") - assert.table_has_column("clustering_sync_delta", "entity", "json") - end) end) diff --git a/spec/05-migration/db/migrations/core/025_390_to_3100_spec.lua b/spec/05-migration/db/migrations/core/025_390_to_3100_spec.lua new file mode 100644 index 000000000000..32c8563aa0f1 --- /dev/null +++ b/spec/05-migration/db/migrations/core/025_390_to_3100_spec.lua @@ -0,0 +1,7 @@ +local uh = require "spec/upgrade_helpers" + +describe("database migration", function() + uh.old_after_up("does not have \"clustering_sync_delta\" table", function() + assert.not_database_has_relation("clustering_sync_delta") + end) +end)