Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(clustering/rpc): rename to cluster_rpc_sync #14025

Merged
merged 4 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion kong/conf_loader/constants.lua
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,7 @@ local CONF_PARSERS = {
cluster_use_proxy = { typ = "boolean" },
cluster_dp_labels = { typ = "array" },
cluster_rpc = { typ = "boolean" },
cluster_incremental_sync = { typ = "boolean" },
cluster_rpc_sync = { typ = "boolean" },
cluster_full_sync_threshold = { typ = "number" },
cluster_cjson = { typ = "boolean" },

Expand Down
6 changes: 3 additions & 3 deletions kong/conf_loader/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -1034,10 +1034,10 @@ local function load(path, custom_conf, opts)
end
end

if conf.cluster_incremental_sync and not conf.cluster_rpc then
log.warn("Cluster incremental sync has been forcibly disabled, " ..
if conf.cluster_rpc_sync and not conf.cluster_rpc then
log.warn("Cluster rpc sync has been forcibly disabled, " ..
"please enable cluster RPC.")
conf.cluster_incremental_sync = false
conf.cluster_rpc_sync = false
end

-- parse and validate pluginserver directives
Expand Down
2 changes: 1 addition & 1 deletion kong/global.lua
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ function _GLOBAL.init_worker_events(kong_config)
local enable_privileged_agent = false
if kong_config.dedicated_config_processing and
kong_config.role == "data_plane" and
not kong.sync -- for incremental sync there is no privileged_agent
not kong.sync -- for rpc sync there is no privileged_agent
then
enable_privileged_agent = true
end
Expand Down
10 changes: 5 additions & 5 deletions kong/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -695,7 +695,7 @@ function Kong.init()
if config.cluster_rpc then
kong.rpc = require("kong.clustering.rpc.manager").new(config, kong.node.get_id())

if config.cluster_incremental_sync then
if config.cluster_rpc_sync then
kong.sync = require("kong.clustering.services.sync").new(db, is_control_plane(config))
kong.sync:init(kong.rpc)
end
Expand Down Expand Up @@ -885,8 +885,8 @@ function Kong.init_worker()
local is_dp_sync_v1 = is_data_plane(kong.configuration) and not kong.sync
local using_dedicated = kong.configuration.dedicated_config_processing

-- CP needs to support both full and incremental sync
-- full sync is only enabled for DP if incremental sync is disabled
-- CP needs to support both v1 and v2 sync
-- v1 sync is only enabled for DP if v2 sync is disabled
if is_cp or is_dp_sync_v1 then
kong.clustering:init_worker()
end
Expand Down Expand Up @@ -992,15 +992,15 @@ function Kong.init_worker()
plugin_servers.start()
end

-- rpc and incremental sync
-- rpc and sync
if is_http_module then

-- init rpc connection
if kong.rpc then
kong.rpc:init_worker()
end

-- init incremental sync
-- init sync
-- should run after rpc init successfully
if kong.sync then
kong.sync:init_worker()
Expand Down
2 changes: 1 addition & 1 deletion kong/pdk/vault.lua
Original file line number Diff line number Diff line change
Expand Up @@ -1444,7 +1444,7 @@ local function new(self)

local not_dbless = conf.database ~= "off" -- postgres
local dp_with_inc_sync = conf.role == "data_plane" and
conf.cluster_incremental_sync
conf.cluster_rpc_sync

return not_dbless or dp_with_inc_sync
end
Expand Down
6 changes: 3 additions & 3 deletions kong/runloop/handler.lua
Original file line number Diff line number Diff line change
Expand Up @@ -363,10 +363,10 @@ local function new_router(version)
-- like rebuild_router_timer. And it relies on core_cache to detect changes.
--
-- 1. stratey off (dbless)
-- incremental_sync on:
-- rpc_sync on:
-- non init worker: true(kong.core_cache)
-- init worker: false
-- incremental_sync off: false
-- rpc_sync off: false
-- 2. strategy on (non dbless): true(kong.core_cache)
local detect_changes = kong.core_cache and
(db.strategy ~= "off" or (kong.sync and get_phase() ~= "init_worker"))
Expand Down Expand Up @@ -986,7 +986,7 @@ return {

-- start some rebuild timers for
-- 1. traditional mode
-- 2. DP with incremental sync on (dbless mode)
-- 2. DP with rpc sync on (dbless mode)
if strategy ~= "off" or kong.sync then
local worker_state_update_frequency = kong.configuration.worker_state_update_frequency or 1

Expand Down
2 changes: 1 addition & 1 deletion kong/templates/kong_defaults.lua
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ cluster_max_payload = 16777216
cluster_use_proxy = off
cluster_dp_labels = NONE
cluster_rpc = off
cluster_incremental_sync = off
cluster_rpc_sync = off
cluster_full_sync_threshold = 512
cluster_cjson = off

Expand Down
2 changes: 1 addition & 1 deletion spec/01-unit/01-db/10-declarative_spec.lua
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ keyauth_credentials:
assert.equals("services|123|fieldname|" .. sha256_hex("test"), key)
end)

-- since incremental sync the param `unique_across_ws` is useless
-- since rpc sync the param `unique_across_ws` is useless
-- this test case is just for compatibility
it("does not omits the workspace id when 'unique_across_ws' is 'true'", function()
local key = unique_field_key("services", "123", "fieldname", "test", true)
Expand Down
4 changes: 2 additions & 2 deletions spec/01-unit/01-db/11-declarative_lmdb_spec.lua
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ describe("#off preserve nulls", function()

local id, item = next(entities.basicauth_credentials)

-- format changed after incremental sync
-- format changed after rpc sync
local cache_key = concat({
"basicauth_credentials|",
item.ws_id,
Expand All @@ -225,7 +225,7 @@ describe("#off preserve nulls", function()
for _, plugin in pairs(entities.plugins) do
if plugin.name == PLUGIN_NAME then

-- format changed after incremental sync
-- format changed after rpc sync
cache_key = concat({
"plugins|",
plugin.ws_id,
Expand Down
10 changes: 5 additions & 5 deletions spec/02-integration/07-sdk/03-cluster_spec.lua
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ fixtures_cp.http_mock.my_server_block = [[
]]

for _, v in ipairs({ {"off", "off"}, {"on", "off"}, {"on", "on"}, }) do
local rpc, inc_sync = v[1], v[2]
local rpc, rpc_sync = v[1], v[2]

for _, strategy in helpers.each_strategy() do
describe("PDK: kong.cluster for #" .. strategy .. " inc_sync=" .. inc_sync, function()
describe("PDK: kong.cluster for #" .. strategy .. " rpc_sync=" .. rpc_sync, function()
local proxy_client

lazy_setup(function()
Expand All @@ -65,7 +65,7 @@ for _, strategy in helpers.each_strategy() do
cluster_listen = "127.0.0.1:9005",
nginx_conf = "spec/fixtures/custom_nginx.template",
cluster_rpc = rpc,
cluster_incremental_sync = inc_sync,
cluster_rpc_sync = rpc_sync,
}, nil, nil, fixtures_cp))

assert(helpers.start_kong({
Expand All @@ -78,7 +78,7 @@ for _, strategy in helpers.each_strategy() do
proxy_listen = "0.0.0.0:9002",
nginx_conf = "spec/fixtures/custom_nginx.template",
cluster_rpc = rpc,
cluster_incremental_sync = inc_sync,
cluster_rpc_sync = rpc_sync,
}, nil, nil, fixtures_dp))
end)

Expand Down Expand Up @@ -116,4 +116,4 @@ for _, strategy in helpers.each_strategy() do
end)
end)
end -- for _, strategy
end -- for inc_sync
end -- for rpc_sync
44 changes: 22 additions & 22 deletions spec/02-integration/09-hybrid_mode/01-sync_spec.lua
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ local KEY_AUTH_PLUGIN


for _, v in ipairs({ {"off", "off"}, {"on", "off"}, {"on", "on"}, }) do
local rpc, inc_sync = v[1], v[2]
local rpc, rpc_sync = v[1], v[2]

for _, strategy in helpers.each_strategy() do

describe("CP/DP communication #" .. strategy .. " inc_sync=" .. inc_sync, function()
describe("CP/DP communication #" .. strategy .. " rpc_sync=" .. rpc_sync, function()

lazy_setup(function()
helpers.get_db_utils(strategy) -- runs migrations
Expand All @@ -31,7 +31,7 @@ describe("CP/DP communication #" .. strategy .. " inc_sync=" .. inc_sync, functi
cluster_listen = "127.0.0.1:9005",
nginx_conf = "spec/fixtures/custom_nginx.template",
cluster_rpc = rpc,
cluster_incremental_sync = inc_sync,
cluster_rpc_sync = rpc_sync,
}))

assert(helpers.start_kong({
Expand All @@ -44,7 +44,7 @@ describe("CP/DP communication #" .. strategy .. " inc_sync=" .. inc_sync, functi
proxy_listen = "0.0.0.0:9002",
nginx_conf = "spec/fixtures/custom_nginx.template",
cluster_rpc = rpc,
cluster_incremental_sync = inc_sync,
cluster_rpc_sync = rpc_sync,
worker_state_update_frequency = 1,
}))

Expand Down Expand Up @@ -348,7 +348,7 @@ describe("CP/DP communication #" .. strategy .. " inc_sync=" .. inc_sync, functi
end)
end)

describe("CP/DP #version check #" .. strategy .. " inc_sync=" .. inc_sync, function()
describe("CP/DP #version check #" .. strategy .. " rpc_sync=" .. rpc_sync, function()
-- for these tests, we do not need a real DP, but rather use the fake DP
-- client so we can mock various values (e.g. node_version)
describe("relaxed compatibility check:", function()
Expand All @@ -368,7 +368,7 @@ describe("CP/DP #version check #" .. strategy .. " inc_sync=" .. inc_sync, funct
nginx_conf = "spec/fixtures/custom_nginx.template",
cluster_version_check = "major_minor",
cluster_rpc = rpc,
cluster_incremental_sync = inc_sync,
cluster_rpc_sync = rpc_sync,
}))

for _, plugin in ipairs(helpers.get_plugins_list()) do
Expand Down Expand Up @@ -625,7 +625,7 @@ describe("CP/DP #version check #" .. strategy .. " inc_sync=" .. inc_sync, funct
end)
end)

describe("CP/DP config sync #" .. strategy .. " inc_sync=" .. inc_sync, function()
describe("CP/DP config sync #" .. strategy .. " rpc_sync=" .. rpc_sync, function()
lazy_setup(function()
helpers.get_db_utils(strategy) -- runs migrations

Expand All @@ -637,7 +637,7 @@ describe("CP/DP config sync #" .. strategy .. " inc_sync=" .. inc_sync, function
db_update_frequency = 3,
cluster_listen = "127.0.0.1:9005",
cluster_rpc = rpc,
cluster_incremental_sync = inc_sync,
cluster_rpc_sync = rpc_sync,
}))

assert(helpers.start_kong({
Expand All @@ -648,7 +648,7 @@ describe("CP/DP config sync #" .. strategy .. " inc_sync=" .. inc_sync, function
cluster_cert_key = "spec/fixtures/kong_clustering.key",
cluster_control_plane = "127.0.0.1:9005",
proxy_listen = "0.0.0.0:9002",
cluster_incremental_sync = inc_sync,
cluster_rpc_sync = rpc_sync,
cluster_rpc = rpc,
worker_state_update_frequency = 1,
}))
Expand Down Expand Up @@ -754,7 +754,7 @@ describe("CP/DP labels #" .. strategy, function()
cluster_listen = "127.0.0.1:9005",
nginx_conf = "spec/fixtures/custom_nginx.template",
cluster_rpc = rpc,
cluster_incremental_sync = inc_sync,
cluster_rpc_sync = rpc_sync,
}))

assert(helpers.start_kong({
Expand All @@ -768,7 +768,7 @@ describe("CP/DP labels #" .. strategy, function()
nginx_conf = "spec/fixtures/custom_nginx.template",
cluster_dp_labels="deployment:mycloud,region:us-east-1",
cluster_rpc = rpc,
cluster_incremental_sync = inc_sync,
cluster_rpc_sync = rpc_sync,
}))
end)

Expand Down Expand Up @@ -796,8 +796,8 @@ describe("CP/DP labels #" .. strategy, function()
assert.equal(CLUSTERING_SYNC_STATUS.NORMAL, v.sync_status)
assert.equal(CLUSTERING_SYNC_STATUS.NORMAL, v.sync_status)
-- TODO: The API output does include labels and certs when the
-- incremental sync is enabled.
if inc_sync == "off" then
-- rpc sync is enabled.
if rpc_sync == "off" then
assert.equal("mycloud", v.labels.deployment)
assert.equal("us-east-1", v.labels.region)
end
Expand All @@ -822,7 +822,7 @@ describe("CP/DP cert details(cluster_mtls = shared) #" .. strategy, function()
cluster_listen = "127.0.0.1:9005",
nginx_conf = "spec/fixtures/custom_nginx.template",
cluster_rpc = rpc,
cluster_incremental_sync = inc_sync,
cluster_rpc_sync = rpc_sync,
}))

assert(helpers.start_kong({
Expand All @@ -836,7 +836,7 @@ describe("CP/DP cert details(cluster_mtls = shared) #" .. strategy, function()
nginx_conf = "spec/fixtures/custom_nginx.template",
cluster_dp_labels="deployment:mycloud,region:us-east-1",
cluster_rpc = rpc,
cluster_incremental_sync = inc_sync,
cluster_rpc_sync = rpc_sync,
}))
end)

Expand All @@ -860,8 +860,8 @@ describe("CP/DP cert details(cluster_mtls = shared) #" .. strategy, function()
for _, v in pairs(json.data) do
if v.ip == "127.0.0.1" then
-- TODO: The API output does include labels and certs when the
-- incremental sync is enabled.
if inc_sync == "off" then
-- rpc sync is enabled.
if rpc_sync == "off" then
assert.equal(1888983905, v.cert_details.expiry_timestamp)
end
return true
Expand All @@ -888,7 +888,7 @@ describe("CP/DP cert details(cluster_mtls = pki) #" .. strategy, function()
cluster_mtls = "pki",
cluster_ca_cert = "spec/fixtures/kong_clustering_ca.crt",
cluster_rpc = rpc,
cluster_incremental_sync = inc_sync,
cluster_rpc_sync = rpc_sync,
}))

assert(helpers.start_kong({
Expand All @@ -905,7 +905,7 @@ describe("CP/DP cert details(cluster_mtls = pki) #" .. strategy, function()
cluster_server_name = "kong_clustering",
cluster_ca_cert = "spec/fixtures/kong_clustering.crt",
cluster_rpc = rpc,
cluster_incremental_sync = inc_sync,
cluster_rpc_sync = rpc_sync,
}))
end)

Expand All @@ -929,8 +929,8 @@ describe("CP/DP cert details(cluster_mtls = pki) #" .. strategy, function()
for _, v in pairs(json.data) do
if v.ip == "127.0.0.1" then
-- TODO: The API output does include labels and certs when the
-- incremental sync is enabled.
if inc_sync == "off" then
-- rpc sync is enabled.
if rpc_sync == "off" then
assert.equal(1897136778, v.cert_details.expiry_timestamp)
end
return true
Expand All @@ -942,4 +942,4 @@ describe("CP/DP cert details(cluster_mtls = pki) #" .. strategy, function()
end)

end -- for _, strategy
end -- for inc_sync
end -- for rpc_sync
Loading
Loading