Skip to content

Commit

Permalink
feat(clustering): entity level validation only and stream lmdb transa…
Browse files Browse the repository at this point in the history
…ction

Signed-off-by: Aapo Talvensaari <[email protected]>
  • Loading branch information
bungle committed Sep 21, 2023
1 parent 9da0bf6 commit 461acc0
Show file tree
Hide file tree
Showing 7 changed files with 494 additions and 313 deletions.
1 change: 1 addition & 0 deletions kong-3.5.0-0.rockspec
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ build = {

["kong.clustering"] = "kong/clustering/init.lua",
["kong.clustering.data_plane"] = "kong/clustering/data_plane.lua",
["kong.clustering.data_plane_stream_protocol"] = "kong/clustering/protocol.lua",
["kong.clustering.control_plane"] = "kong/clustering/control_plane.lua",
["kong.clustering.utils"] = "kong/clustering/utils.lua",
["kong.clustering.events"] = "kong/clustering/events.lua",
Expand Down
171 changes: 45 additions & 126 deletions kong/clustering/control_plane.lua
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,21 @@ local _MT = { __index = _M, }

local semaphore = require("ngx.semaphore")
local cjson = require("cjson.safe")
local declarative = require("kong.db.declarative")
local utils = require("kong.tools.utils")
local clustering_utils = require("kong.clustering.utils")
local compat = require("kong.clustering.compat")
local constants = require("kong.constants")
local events = require("kong.clustering.events")


local export = require("kong.db.declarative.export").export
local calculate_config_hash = require("kong.clustering.config_helper").calculate_config_hash
local get_topologically_sorted_schema_names = require("kong.db.declarative.export").get_topologically_sorted_schema_names
local send_configuration_payload = require("kong.clustering.protocol").send_configuration_payload


local error = error
local assert = assert
local string = string
local setmetatable = setmetatable
local type = type
local pcall = pcall
local pairs = pairs
local ipairs = ipairs
local ngx = ngx
local ngx_log = ngx.log
local timer_at = ngx.timer.at
local cjson_decode = cjson.decode
local cjson_encode = cjson.encode
Expand All @@ -37,46 +32,35 @@ local ngx_update_time = ngx.update_time
local ngx_var = ngx.var
local table_insert = table.insert
local table_remove = table.remove
local sub = string.sub
local isempty = require("table.isempty")
local sleep = ngx.sleep


local plugins_list_to_map = compat.plugins_list_to_map
local update_compatible_payload = compat.update_compatible_payload
local deflate_gzip = utils.deflate_gzip
local connect_dp = clustering_utils.connect_dp


local kong_dict = ngx.shared.kong
local ngx_DEBUG = ngx.DEBUG
local ngx_NOTICE = ngx.NOTICE
local ngx_WARN = ngx.WARN
local ngx_ERR = ngx.ERR
local ngx_OK = ngx.OK
local ngx_ERROR = ngx.ERROR
local ngx_CLOSE = ngx.HTTP_CLOSE


local KONG_DICT = ngx.shared.kong
local PING_WAIT = constants.CLUSTERING_PING_INTERVAL * 1.5
local CLUSTERING_SYNC_STATUS = constants.CLUSTERING_SYNC_STATUS
local DECLARATIVE_EMPTY_CONFIG_HASH = constants.DECLARATIVE_EMPTY_CONFIG_HASH
local PONG_TYPE = "PONG"
local RECONFIGURE_TYPE = "RECONFIGURE"
local _log_prefix = "[clustering] "
local LOG_PREFIX = "[clustering] "


local no_connected_clients_logged


local function handle_export_reconfigure_payload(self)
ngx_log(ngx_DEBUG, _log_prefix, "exporting config")

ngx.log(ngx.DEBUG, LOG_PREFIX, "exporting config")
local ok, p_err, err = pcall(self.export_reconfigure_payload, self)
return ok, p_err or err
end


local function is_timeout(err)
return err and sub(err, -7) == "timeout"
return err and err:sub(-7) == "timeout"
end


Expand All @@ -98,15 +82,15 @@ end


function _M:export_reconfigure_payload()
local config, err, plugins_configured = declarative.export_config()
local config, err, plugins_configured = export()
if not config then
return nil, err
end

-- store serialized plugins map for troubleshooting purposes
local shm_key_name = "clustering:cp_plugins_configured:worker_" .. worker_id()
kong_dict:set(shm_key_name, cjson_encode(plugins_configured))
ngx_log(ngx_DEBUG, "plugin configuration map key: " .. shm_key_name .. " configuration: ", kong_dict:get(shm_key_name))
KONG_DICT:set(shm_key_name, cjson_encode(plugins_configured))
ngx.log(ngx.DEBUG, "plugin configuration map key: " .. shm_key_name .. " configuration: ", KONG_DICT:get(shm_key_name))

local _, hashes = calculate_config_hash(config)

Expand All @@ -126,7 +110,7 @@ function _M:push_config()

local ok, err = self:export_reconfigure_payload()
if not ok then
ngx_log(ngx_ERR, _log_prefix, "unable to export config from database: ", err)
ngx.log(ngx.ERR, LOG_PREFIX, "unable to export config from database: ", err)
return
end

Expand All @@ -139,7 +123,7 @@ function _M:push_config()

ngx_update_time()
local duration = ngx_now() - start
ngx_log(ngx_DEBUG, _log_prefix, "config pushed to ", n, " data-plane nodes in " .. duration .. " seconds")
ngx.log(ngx.DEBUG, LOG_PREFIX, "config pushed to ", n, " data plane nodes in " .. duration .. " seconds")
end


Expand All @@ -153,7 +137,7 @@ function _M:handle_cp_websocket()
local dp_ip = ngx_var.remote_addr
local dp_version = ngx_var.arg_node_version

local wb, log_suffix, ec = connect_dp(dp_id, dp_hostname, dp_ip, dp_version)
local wb, log_suffix, ec = require("kong.clustering.utils").connect_dp(dp_id, dp_hostname, dp_ip, dp_version)
if not wb then
return ngx_exit(ec)
end
Expand Down Expand Up @@ -189,9 +173,9 @@ function _M:handle_cp_websocket()
end

if err then
ngx_log(ngx_ERR, _log_prefix, err, log_suffix)
ngx.log(ngx.ERR, LOG_PREFIX, err, log_suffix)
wb:send_close()
return ngx_exit(ngx_CLOSE)
return ngx_exit(ngx.HTTP_CLOSE)
end

local dp_plugins_map = plugins_list_to_map(data.plugins)
Expand All @@ -203,14 +187,14 @@ function _M:handle_cp_websocket()
local ok
ok, err = kong.db.clustering_data_planes:upsert({ id = dp_id, }, {
last_seen = last_seen,
config_hash = config_hash ~= "" and config_hash or nil,
config_hash = config_hash ~= "" and config_hash or DECLARATIVE_EMPTY_CONFIG_HASH,
hostname = dp_hostname,
ip = dp_ip,
version = dp_version,
sync_status = sync_status, -- TODO: import may have been failed though
}, { ttl = purge_delay })
if not ok then
ngx_log(ngx_ERR, _log_prefix, "unable to update clustering data plane status: ", err, log_suffix)
ngx.log(ngx.ERR, LOG_PREFIX, "unable to update clustering data plane status: ", err, log_suffix)
end
end

Expand All @@ -221,13 +205,13 @@ function _M:handle_cp_websocket()
log_suffix = log_suffix,
})
if err then
ngx_log(ngx_ERR, _log_prefix, err, log_suffix)
ngx.log(ngx.ERR, LOG_PREFIX, err, log_suffix)
wb:send_close()
update_sync_status()
return ngx_exit(ngx_CLOSE)
return ngx_exit(ngx.HTTP_CLOSE)
end

ngx_log(ngx_DEBUG, _log_prefix, "data plane connected", log_suffix)
ngx.log(ngx.DEBUG, LOG_PREFIX, "data plane connected", log_suffix)

local queue
do
Expand Down Expand Up @@ -262,7 +246,7 @@ function _M:handle_cp_websocket()
queue.post()

else
ngx_log(ngx_ERR, _log_prefix, "unable to send initial configuration to data plane: ", err, log_suffix)
ngx.log(ngx.ERR, LOG_PREFIX, "unable to send initial configuration to data plane: ", err, log_suffix)
end

-- how control plane connection management works:
Expand Down Expand Up @@ -303,7 +287,7 @@ function _M:handle_cp_websocket()
end

if typ == "close" then
ngx_log(ngx_DEBUG, _log_prefix, "received close frame from data plane", log_suffix)
ngx.log(ngx.DEBUG, LOG_PREFIX, "received close frame from data plane", log_suffix)
return
end

Expand All @@ -316,7 +300,7 @@ function _M:handle_cp_websocket()
return nil, "invalid websocket frame received from data plane: " .. typ
end

ngx_log(ngx_DEBUG, _log_prefix, "received ping frame from data plane", log_suffix)
ngx.log(ngx.DEBUG, LOG_PREFIX, "received ping frame from data plane", log_suffix)

config_hash = data
last_seen = ngx_time()
Expand All @@ -331,13 +315,6 @@ function _M:handle_cp_websocket()
end)

local write_thread = ngx.thread.spawn(function()
local function send_binary(tbl)
local _, err = wb:send_binary(cjson_encode(tbl))
if err then
return error("unable to send updated configuration to data plane: " .. err)
end
end

while not exiting() do
local ok, err = queue.wait(5)

Expand Down Expand Up @@ -366,10 +343,10 @@ function _M:handle_cp_websocket()
return nil, "failed to send pong frame to data plane: " .. err
end

ngx_log(ngx_NOTICE, _log_prefix, "failed to send pong frame to data plane: ", err, log_suffix)
ngx.log(ngx.NOTICE, LOG_PREFIX, "failed to send pong frame to data plane: ", err, log_suffix)

else
ngx_log(ngx_DEBUG, _log_prefix, "sent pong frame to data plane", log_suffix)
ngx.log(ngx.DEBUG, LOG_PREFIX, "sent pong frame to data plane", log_suffix)
end

-- pong ok
Expand All @@ -386,101 +363,43 @@ function _M:handle_cp_websocket()
})

if not ok then
ngx_log(ngx_WARN, _log_prefix, "unable to send updated configuration to data plane: ", err, log_suffix)
ngx.log(ngx.WARN, LOG_PREFIX, "unable to send updated configuration to data plane: ", err, log_suffix)
if sync_status ~= previous_sync_status then
update_sync_status()
end

goto continue
end

local reconfigure_payload = update_compatible_payload(self.reconfigure_payload, dp_version, log_suffix)
local timetamp = reconfigure_payload.timestamp
local config = reconfigure_payload.config
local hashes = reconfigure_payload.hashes
local hash = hashes.config

send_binary({
type = "reconfigure:start",
hash = hash,
meta = {
_transform = config._transform,
_format_version = config._format_version,
}
})

local schema_names = get_topologically_sorted_schema_names()

local batch = kong.table.new(0, 1000)
for _, name in ipairs(schema_names) do
local entities = config[name]
if entities and not isempty(entities) then
local count = #entities
if count <= 1000 then
send_binary({
type = "entities",
hash = hash,
name = name,
data = assert(deflate_gzip(assert(cjson_encode(entities)))),
})

else

local i = 0
for j, entity in ipairs(entities) do
i = i + 1
batch[i] = entity
if i == 1000 or j == count then
send_binary({
type = "entities",
hash = hash,
name = name,
data = assert(deflate_gzip(assert(cjson_encode(batch)))),
})

kong.table.clear(batch)
i = 0
end
end
end
end
end

send_binary({
type = "reconfigure:end",
hash = hash,
hashes = hashes,
timestamp = timetamp,
})
local payload = assert(update_compatible_payload(self.reconfigure_payload, dp_version, log_suffix))
send_configuration_payload(wb, payload)

::continue::
end
end)

local ok, err, perr = ngx.thread.wait(write_thread, read_thread)

self.clients[wb] = nil

ngx.thread.kill(write_thread)
ngx.thread.kill(read_thread)

wb:send_close()

--TODO: should we update disconnect data plane status?
--sync_status = CLUSTERING_SYNC_STATUS.UNKNOWN
--update_sync_status()

if not ok then
ngx_log(ngx_ERR, _log_prefix, err, log_suffix)
return ngx_exit(ngx_ERROR)
local err_msg = ok and err or perr
if err_msg then
if err_msg:sub(-8) == ": closed" then
ngx.log(ngx.INFO, LOG_PREFIX, "connection to data plane closed", log_suffix)
else
ngx.log(ngx.ERR, LOG_PREFIX, err_msg, log_suffix)
end
return ngx_exit(ngx.ERROR)
end

if perr then
ngx_log(ngx_ERR, _log_prefix, perr, log_suffix)
return ngx_exit(ngx_ERROR)
end

return ngx_exit(ngx_OK)
return ngx_exit(ngx.OK)
end


Expand All @@ -491,7 +410,7 @@ local function push_config_loop(premature, self, push_config_semaphore, delay)

local ok, err = handle_export_reconfigure_payload(self)
if not ok then
ngx_log(ngx_ERR, _log_prefix, "unable to export initial config from database: ", err)
ngx.log(ngx.ERR, LOG_PREFIX, "unable to export initial config from database: ", err)
end

while not exiting() do
Expand All @@ -502,15 +421,15 @@ local function push_config_loop(premature, self, push_config_semaphore, delay)

if not ok then
if err ~= "timeout" then
ngx_log(ngx_ERR, _log_prefix, "semaphore wait error: ", err)
ngx.log(ngx.ERR, LOG_PREFIX, "semaphore wait error: ", err)
end

goto continue
end

if isempty(self.clients) then
if not no_connected_clients_logged then
ngx_log(ngx_DEBUG, _log_prefix, "skipping config push (no connected clients)")
ngx.log(ngx.DEBUG, LOG_PREFIX, "skipping config push (no connected clients)")
no_connected_clients_logged = true
end
sleep(1)
Expand All @@ -526,7 +445,7 @@ local function push_config_loop(premature, self, push_config_semaphore, delay)

ok, err = pcall(self.push_config, self)
if not ok then
ngx_log(ngx_ERR, _log_prefix, "export and pushing config failed: ", err)
ngx.log(ngx.ERR, LOG_PREFIX, "export and pushing config failed: ", err)
goto continue
end

Expand Down Expand Up @@ -574,7 +493,7 @@ function _M:init_worker(basic_info)

-- When "clustering", "push_config" worker event is received by a worker,
-- it loads and pushes the config to its the connected data planes
events.clustering_push_config(function(_)
require("kong.clustering.events").clustering_push_config(function(_)
if push_config_semaphore:count() <= 0 then
-- the following line always executes immediately after the `if` check
-- because `:count` will never yield, end result is that the semaphore
Expand Down
Loading

0 comments on commit 461acc0

Please sign in to comment.