From b7d591da25c9b9e0e6ebd675dc5a8409f2d799a9 Mon Sep 17 00:00:00 2001 From: Datong Sun Date: Wed, 24 Apr 2024 23:51:05 +0800 Subject: [PATCH] feat(clustering): CP/DP RPC framework & dynamic log level RPC (#12320) This PR implements an extensible mechanism where CP and DP nodes can bidirectionally communicate with each other and perform RPC calls. Which can be used for building complex interactions such as config sync, debugging, etc. This PR also implements a simple RPC service for CP to inspect and change the dynamic log level of the connected DP nodes. The functionality is exposed via a new Admin API endpoint `/clustering/data-planes//log-level`. KAG-623 KAG-3751 --- .requirements | 3 +- build/BUILD.bazel | 4 + build/openresty/repositories.bzl | 2 + build/openresty/snappy/BUILD.bazel | 206 ++++++++++ .../openresty/snappy/snappy_repositories.bzl | 15 + changelog/unreleased/kong/cp-dp-rpc.yml | 3 + .../unreleased/kong/dynamic-log-level-rpc.yml | 6 + kong-3.7.0-0.rockspec | 12 + kong/api/routes/debug.lua | 46 +++ kong/clustering/control_plane.lua | 8 + kong/clustering/rpc/callbacks.lua | 47 +++ kong/clustering/rpc/concentrator.lua | 303 +++++++++++++++ kong/clustering/rpc/future.lua | 73 ++++ kong/clustering/rpc/json_rpc_v2.lua | 41 ++ kong/clustering/rpc/manager.lua | 365 ++++++++++++++++++ kong/clustering/rpc/queue.lua | 73 ++++ kong/clustering/rpc/socket.lua | 284 ++++++++++++++ kong/clustering/rpc/utils.lua | 45 +++ kong/clustering/services/debug.lua | 70 ++++ kong/conf_loader/constants.lua | 1 + kong/constants.lua | 1 + kong/db/migrations/core/023_360_to_370.lua | 23 ++ kong/db/migrations/core/init.lua | 1 + .../entities/clustering_data_planes.lua | 2 + kong/db/schema/typedefs.lua | 5 + kong/db/strategies/postgres/connector.lua | 1 + kong/init.lua | 34 ++ kong/templates/kong_defaults.lua | 1 + kong/templates/nginx_kong.lua | 8 + .../fixtures/ubuntu-22.04-amd64.txt | 7 + spec/01-unit/01-db/06-postgres_spec.lua | 16 +- spec/01-unit/04-prefix_handler_spec.lua | 26 ++ .../09-hybrid_mode/01-sync_spec.lua | 1 - .../18-hybrid_rpc/01-rpc_spec.lua | 62 +++ .../18-hybrid_rpc/02-log-level_spec.lua | 181 +++++++++ .../18-hybrid_rpc/03-inert_spec.lua | 101 +++++ .../migrations/core/023_360_to_370_spec.lua | 12 + 37 files changed, 2079 insertions(+), 10 deletions(-) create mode 100644 build/openresty/snappy/BUILD.bazel create mode 100644 build/openresty/snappy/snappy_repositories.bzl create mode 100644 changelog/unreleased/kong/cp-dp-rpc.yml create mode 100644 changelog/unreleased/kong/dynamic-log-level-rpc.yml create mode 100644 kong/clustering/rpc/callbacks.lua create mode 100644 kong/clustering/rpc/concentrator.lua create mode 100644 kong/clustering/rpc/future.lua create mode 100644 kong/clustering/rpc/json_rpc_v2.lua create mode 100644 kong/clustering/rpc/manager.lua create mode 100644 kong/clustering/rpc/queue.lua create mode 100644 kong/clustering/rpc/socket.lua create mode 100644 kong/clustering/rpc/utils.lua create mode 100644 kong/clustering/services/debug.lua create mode 100644 kong/db/migrations/core/023_360_to_370.lua create mode 100644 spec/02-integration/18-hybrid_rpc/01-rpc_spec.lua create mode 100644 spec/02-integration/18-hybrid_rpc/02-log-level_spec.lua create mode 100644 spec/02-integration/18-hybrid_rpc/03-inert_spec.lua create mode 100644 spec/05-migration/db/migrations/core/023_360_to_370_spec.lua diff --git a/.requirements b/.requirements index 0acb3686b48f..abe66cf45f72 100644 --- a/.requirements +++ b/.requirements @@ -11,8 +11,9 @@ LIBEXPAT=2.6.2 LUA_KONG_NGINX_MODULE=a8411f7cf4289049f0bd3e8e40088e7256389ed3 # 0.11.0 LUA_RESTY_LMDB=7d2581cbe30cde18a8482d820c227ca0845c0ded # 1.4.2 LUA_RESTY_EVENTS=8448a92cec36ac04ea522e78f6496ba03c9b1fd8 # 0.2.0 -LUA_RESTY_WEBSOCKET=60eafc3d7153bceb16e6327074e0afc3d94b1316 # 0.4.0 +LUA_RESTY_WEBSOCKET=966c69c39f03029b9b42ec0f8e55aaed7d6eebc0 # 0.4.0.1 ATC_ROUTER=ffd11db657115769bf94f0c4f915f98300bc26b6 # 1.6.2 +SNAPPY=23b3286820105438c5dbb9bc22f1bb85c5812c8a # 1.2.0 KONG_MANAGER=nightly NGX_WASM_MODULE=3bd94e61c55415ccfb0f304fa51143a7d630d6ae diff --git a/build/BUILD.bazel b/build/BUILD.bazel index adecda775a80..865db47cdfe4 100644 --- a/build/BUILD.bazel +++ b/build/BUILD.bazel @@ -103,6 +103,7 @@ kong_directory_genrule( "@openresty", "@openresty//:luajit", "@protoc//:all_srcs", + "@snappy//:snappy", ] + select({ "@kong//:skip_webui_flags": [], "//conditions:default": [ @@ -152,6 +153,9 @@ kong_directory_genrule( tar -cC ${LUAJIT}/share . | tar -xC ${BUILD_DESTDIR}/openresty/luajit/share chmod -R "+rw" ${BUILD_DESTDIR}/openresty/luajit + SNAPPY=${WORKSPACE_PATH}/$(dirname $(echo '$(locations @snappy//:snappy)' | awk '{print $1}')) + cp ${SNAPPY}/libsnappy.so ${BUILD_DESTDIR}/kong/lib + LUAROCKS=${WORKSPACE_PATH}/$(dirname '$(location @luarocks//:luarocks_make)')/luarocks_tree cp -r ${LUAROCKS}/. ${BUILD_DESTDIR}/. diff --git a/build/openresty/repositories.bzl b/build/openresty/repositories.bzl index 8c937144cbdf..dbcb9515830e 100644 --- a/build/openresty/repositories.bzl +++ b/build/openresty/repositories.bzl @@ -10,6 +10,7 @@ load("//build/openresty/atc_router:atc_router_repositories.bzl", "atc_router_rep load("//build/openresty/wasmx:wasmx_repositories.bzl", "wasmx_repositories") load("//build/openresty/wasmx/filters:repositories.bzl", "wasm_filters_repositories") load("//build/openresty/brotli:brotli_repositories.bzl", "brotli_repositories") +load("//build/openresty/snappy:snappy_repositories.bzl", "snappy_repositories") # This is a dummy file to export the module's repository. _NGINX_MODULE_DUMMY_FILE = """ @@ -27,6 +28,7 @@ def openresty_repositories(): wasmx_repositories() wasm_filters_repositories() brotli_repositories() + snappy_repositories() openresty_version = KONG_VAR["OPENRESTY"] diff --git a/build/openresty/snappy/BUILD.bazel b/build/openresty/snappy/BUILD.bazel new file mode 100644 index 000000000000..7830623b5e98 --- /dev/null +++ b/build/openresty/snappy/BUILD.bazel @@ -0,0 +1,206 @@ +# Copyright 2023 Google Inc. All Rights Reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +package(default_visibility = ["//visibility:public"]) + +licenses(["notice"]) + +SNAPPY_VERSION = (1, 1, 10) + +config_setting( + name = "windows", + constraint_values = ["@platforms//os:windows"], +) + +cc_library( + name = "config", + hdrs = ["config.h"], + defines = ["HAVE_CONFIG_H"], +) + +cc_library( + name = "snappy-stubs-public", + hdrs = [":snappy-stubs-public.h"], +) + +cc_library( + name = "snappy-stubs-internal", + srcs = ["snappy-stubs-internal.cc"], + hdrs = ["snappy-stubs-internal.h"], + deps = [ + ":config", + ":snappy-stubs-public", + ], +) + +cc_library( + name = "snappy", + srcs = [ + "snappy.cc", + "snappy-c.cc", + "snappy-internal.h", + "snappy-sinksource.cc", + ], + hdrs = [ + "snappy.h", + "snappy-c.h", + "snappy-sinksource.h", + ], + copts = select({ + ":windows": [], + "//conditions:default": [ + "-Wno-sign-compare", + ], + }), + deps = [ + ":config", + ":snappy-stubs-internal", + ":snappy-stubs-public", + ], +) + +filegroup( + name = "testdata", + srcs = glob(["testdata/*"]), +) + +cc_library( + name = "snappy-test", + testonly = True, + srcs = [ + "snappy-test.cc", + "snappy_test_data.cc", + ], + hdrs = [ + "snappy-test.h", + "snappy_test_data.h", + ], + deps = [":snappy-stubs-internal"], +) + +cc_test( + name = "snappy_benchmark", + srcs = ["snappy_benchmark.cc"], + data = [":testdata"], + deps = [ + ":snappy", + ":snappy-test", + "//third_party/benchmark:benchmark_main", + ], +) + +cc_test( + name = "snappy_unittest", + srcs = [ + "snappy_unittest.cc", + ], + data = [":testdata"], + deps = [ + ":snappy", + ":snappy-test", + "//third_party/googletest:gtest_main", + ], +) + +# Generate a config.h similar to what cmake would produce. +genrule( + name = "config_h", + outs = ["config.h"], + cmd = """cat <$@ +#define HAVE_STDDEF_H 1 +#define HAVE_STDINT_H 1 +#ifdef __has_builtin +# if !defined(HAVE_BUILTIN_EXPECT) && __has_builtin(__builtin_expect) +# define HAVE_BUILTIN_EXPECT 1 +# endif +# if !defined(HAVE_BUILTIN_CTZ) && __has_builtin(__builtin_ctzll) +# define HAVE_BUILTIN_CTZ 1 +# endif +# if !defined(HAVE_BUILTIN_PREFETCH) && __has_builtin(__builtin_prefetech) +# define HAVE_BUILTIN_PREFETCH 1 +# endif +#elif defined(__GNUC__) && (__GNUC__ > 3 || __GNUC__ == 3 && __GNUC_MINOR__ >= 4) +# ifndef HAVE_BUILTIN_EXPECT +# define HAVE_BUILTIN_EXPECT 1 +# endif +# ifndef HAVE_BUILTIN_CTZ +# define HAVE_BUILTIN_CTZ 1 +# endif +# ifndef HAVE_BUILTIN_PREFETCH +# define HAVE_BUILTIN_PREFETCH 1 +# endif +#endif + +#if defined(_WIN32) && !defined(HAVE_WINDOWS_H) +#define HAVE_WINDOWS_H 1 +#endif + +#ifdef __has_include +# if !defined(HAVE_BYTESWAP_H) && __has_include() +# define HAVE_BYTESWAP_H 1 +# endif +# if !defined(HAVE_UNISTD_H) && __has_include() +# define HAVE_UNISTD_H 1 +# endif +# if !defined(HAVE_SYS_ENDIAN_H) && __has_include() +# define HAVE_SYS_ENDIAN_H 1 +# endif +# if !defined(HAVE_SYS_MMAN_H) && __has_include() +# define HAVE_SYS_MMAN_H 1 +# endif +# if !defined(HAVE_SYS_UIO_H) && __has_include() +# define HAVE_SYS_UIO_H 1 +# endif +# if !defined(HAVE_SYS_TIME_H) && __has_include() +# define HAVE_SYS_TIME_H 1 +# endif +#endif + +#ifndef SNAPPY_IS_BIG_ENDIAN +# ifdef __s390x__ +# define SNAPPY_IS_BIG_ENDIAN 1 +# elif defined(__BYTE_ORDER__) && defined(__ORDER_BIG_ENDIAN__) && __BYTE_ORDER__ == __ORDER_BIG_ENDIAN__ +# define SNAPPY_IS_BIG_ENDIAN 1 +# endif +#endif +EOF +""", +) + +genrule( + name = "snappy_stubs_public_h", + srcs = ["snappy-stubs-public.h.in"], + outs = ["snappy-stubs-public.h"], + # Assume sys/uio.h is available on non-Windows. + # Set the version numbers. + cmd = ("""sed -e 's/$${HAVE_SYS_UIO_H_01}/!_WIN32/g' \ + -e 's/$${PROJECT_VERSION_MAJOR}/%d/g' \ + -e 's/$${PROJECT_VERSION_MINOR}/%d/g' \ + -e 's/$${PROJECT_VERSION_PATCH}/%d/g' \ + $< >$@""" % SNAPPY_VERSION), +) diff --git a/build/openresty/snappy/snappy_repositories.bzl b/build/openresty/snappy/snappy_repositories.bzl new file mode 100644 index 000000000000..4dbd7eeebdb8 --- /dev/null +++ b/build/openresty/snappy/snappy_repositories.bzl @@ -0,0 +1,15 @@ +"""A module defining the dependency snappy""" + +load("@bazel_tools//tools/build_defs/repo:git.bzl", "new_git_repository") +load("@bazel_tools//tools/build_defs/repo:utils.bzl", "maybe") +load("@kong_bindings//:variables.bzl", "KONG_VAR") + +def snappy_repositories(): + maybe( + new_git_repository, + name = "snappy", + branch = KONG_VAR["SNAPPY"], + remote = "https://github.com/google/snappy", + visibility = ["//visibility:public"], # let this to be referenced by openresty build + build_file = "//build/openresty/snappy:BUILD.bazel", + ) diff --git a/changelog/unreleased/kong/cp-dp-rpc.yml b/changelog/unreleased/kong/cp-dp-rpc.yml new file mode 100644 index 000000000000..6dcc77c02e7c --- /dev/null +++ b/changelog/unreleased/kong/cp-dp-rpc.yml @@ -0,0 +1,3 @@ +message: "Remote procedure call (RPC) framework for Hybrid mode deployments." +type: feature +scope: Clustering diff --git a/changelog/unreleased/kong/dynamic-log-level-rpc.yml b/changelog/unreleased/kong/dynamic-log-level-rpc.yml new file mode 100644 index 000000000000..69096eb0afe1 --- /dev/null +++ b/changelog/unreleased/kong/dynamic-log-level-rpc.yml @@ -0,0 +1,6 @@ +message: | + Dynamic log level over Hybrid mode RPC which allows setting DP log level + to a different level for specified duration before reverting back + to the `kong.conf` configured value. +type: feature +scope: Clustering diff --git a/kong-3.7.0-0.rockspec b/kong-3.7.0-0.rockspec index 8aeb8719bef1..35a94a8afcac 100644 --- a/kong-3.7.0-0.rockspec +++ b/kong-3.7.0-0.rockspec @@ -42,6 +42,7 @@ dependencies = { "lua-resty-timer-ng == 0.2.7", "lpeg == 1.1.0", "lua-resty-ljsonschema == 1.1.6-2", + "lua-resty-snappy == 1.0-1", } build = { type = "builtin", @@ -84,6 +85,16 @@ build = { ["kong.clustering.compat.checkers"] = "kong/clustering/compat/checkers.lua", ["kong.clustering.config_helper"] = "kong/clustering/config_helper.lua", ["kong.clustering.tls"] = "kong/clustering/tls.lua", + ["kong.clustering.services.debug"] = "kong/clustering/services/debug.lua", + + ["kong.clustering.rpc.callbacks"] = "kong/clustering/rpc/callbacks.lua", + ["kong.clustering.rpc.future"] = "kong/clustering/rpc/future.lua", + ["kong.clustering.rpc.json_rpc_v2"] = "kong/clustering/rpc/json_rpc_v2.lua", + ["kong.clustering.rpc.manager"] = "kong/clustering/rpc/manager.lua", + ["kong.clustering.rpc.queue"] = "kong/clustering/rpc/queue.lua", + ["kong.clustering.rpc.socket"] = "kong/clustering/rpc/socket.lua", + ["kong.clustering.rpc.utils"] = "kong/clustering/rpc/utils.lua", + ["kong.clustering.rpc.concentrator"] = "kong/clustering/rpc/concentrator.lua", ["kong.cluster_events"] = "kong/cluster_events/init.lua", ["kong.cluster_events.strategies.postgres"] = "kong/cluster_events/strategies/postgres.lua", @@ -291,6 +302,7 @@ build = { ["kong.db.migrations.core.020_330_to_340"] = "kong/db/migrations/core/020_330_to_340.lua", ["kong.db.migrations.core.021_340_to_350"] = "kong/db/migrations/core/021_340_to_350.lua", ["kong.db.migrations.core.022_350_to_360"] = "kong/db/migrations/core/022_350_to_360.lua", + ["kong.db.migrations.core.023_360_to_370"] = "kong/db/migrations/core/023_360_to_370.lua", ["kong.db.migrations.operations.200_to_210"] = "kong/db/migrations/operations/200_to_210.lua", ["kong.db.migrations.operations.212_to_213"] = "kong/db/migrations/operations/212_to_213.lua", ["kong.db.migrations.operations.280_to_300"] = "kong/db/migrations/operations/280_to_300.lua", diff --git a/kong/api/routes/debug.lua b/kong/api/routes/debug.lua index dade7628d0c3..b783a9fd1097 100644 --- a/kong/api/routes/debug.lua +++ b/kong/api/routes/debug.lua @@ -11,6 +11,7 @@ local kong = kong local pcall = pcall local type = type local tostring = tostring +local tonumber = tonumber local get_log_level = require("resty.kong.log").get_log_level @@ -127,4 +128,49 @@ routes[cluster_name] = { end } + +if kong.rpc then + routes["/clustering/data-planes/:node_id/log-level"] = { + GET = function(self) + local res, err = + kong.rpc:call(self.params.node_id, "kong.debug.log_level.v1.get_log_level") + if not res then + return kong.response.exit(500, { message = err, }) + end + + return kong.response.exit(200, res) + end, + PUT = function(self) + local new_level = self.params.current_level + local timeout = self.params.timeout and + math.ceil(tonumber(self.params.timeout)) or nil + + if not new_level then + return kong.response.exit(400, { message = "Required parameter \"current_level\" is missing.", }) + end + + local res, err = kong.rpc:call(self.params.node_id, + "kong.debug.log_level.v1.set_log_level", + new_level, + timeout) + if not res then + return kong.response.exit(500, { message = err, }) + end + + return kong.response.exit(201) + end, + DELETE = function(self) + local res, err = kong.rpc:call(self.params.node_id, + "kong.debug.log_level.v1.set_log_level", + "warn", + 0) + if not res then + return kong.response.exit(500, { message = err, }) + end + + return kong.response.exit(204) + end, + } +end + return routes diff --git a/kong/clustering/control_plane.lua b/kong/clustering/control_plane.lua index 33d427424e7c..6bdfb24e192a 100644 --- a/kong/clustering/control_plane.lua +++ b/kong/clustering/control_plane.lua @@ -238,6 +238,12 @@ function _M:handle_cp_websocket(cert) local sync_status = CLUSTERING_SYNC_STATUS.UNKNOWN local purge_delay = self.conf.cluster_data_plane_purge_delay local update_sync_status = function() + local rpc_peers + + if self.conf.cluster_rpc then + rpc_peers = kong.rpc:get_peers() + end + local ok ok, err = kong.db.clustering_data_planes:upsert({ id = dp_id }, { last_seen = last_seen, @@ -250,6 +256,8 @@ function _M:handle_cp_websocket(cert) sync_status = sync_status, -- TODO: import may have been failed though labels = data.labels, cert_details = dp_cert_details, + -- only update rpc_capabilities if dp_id is connected + rpc_capabilities = rpc_peers and rpc_peers[dp_id] or {}, }, { ttl = purge_delay }) if not ok then ngx_log(ngx_ERR, _log_prefix, "unable to update clustering data plane status: ", err, log_suffix) diff --git a/kong/clustering/rpc/callbacks.lua b/kong/clustering/rpc/callbacks.lua new file mode 100644 index 000000000000..f4aefcb5b65b --- /dev/null +++ b/kong/clustering/rpc/callbacks.lua @@ -0,0 +1,47 @@ +local _M = {} +local _MT = { __index = _M, } + + +local utils = require("kong.clustering.rpc.utils") + + +local parse_method_name = utils.parse_method_name + + +function _M.new() + local self = { + callbacks = {}, + capabilities = {}, -- updated as register() is called + capabilities_list = {}, -- updated as register() is called + } + + return setmetatable(self, _MT) +end + + +function _M:register(method, func) + if self.callbacks[method] then + error("duplicate registration of " .. method) + end + + local cap, func_or_err = parse_method_name(method) + if not cap then + return nil, "unable to get capabilities: " .. func_or_err + end + + if not self.capabilities[cap] then + self.capabilities[cap] = true + table.insert(self.capabilities_list, cap) + end + self.callbacks[method] = func +end + + +-- returns a list of capabilities of this node, like: +-- ["kong.meta.v1", "kong.debug.v1", ...] +function _M:get_capabilities_list() + return self.capabilities_list +end + + +return _M diff --git a/kong/clustering/rpc/concentrator.lua b/kong/clustering/rpc/concentrator.lua new file mode 100644 index 000000000000..a7815d7a6c19 --- /dev/null +++ b/kong/clustering/rpc/concentrator.lua @@ -0,0 +1,303 @@ +local _M = {} +local _MT = { __index = _M, } + + +local uuid = require("resty.jit-uuid") +local queue = require("kong.clustering.rpc.queue") +local cjson = require("cjson") +local jsonrpc = require("kong.clustering.rpc.json_rpc_v2") +local rpc_utils = require("kong.clustering.rpc.utils") + + +local setmetatable = setmetatable +local tostring = tostring +local pcall = pcall +local assert = assert +local string_format = string.format +local cjson_decode = cjson.decode +local cjson_encode = cjson.encode +local exiting = ngx.worker.exiting +local is_timeout = rpc_utils.is_timeout +local ngx_log = ngx.log +local ngx_ERR = ngx.ERR +local ngx_WARN = ngx.WARN +local ngx_DEBUG = ngx.DEBUG + + +local RESP_CHANNEL_PREFIX = "rpc:resp:" -- format: rpc:resp: +local REQ_CHANNEL_PREFIX = "rpc:req:" -- format: rpc:req: + + +local RPC_REQUEST_ENQUEUE_SQL = [[ +BEGIN; + INSERT INTO clustering_rpc_requests ( + "node_id", + "reply_to", + "ttl", + "payload" + ) VALUES ( + %s, + %s, + CURRENT_TIMESTAMP(3) AT TIME ZONE 'UTC' + INTERVAL '%d second', + %s + ); + SELECT pg_notify(%s, NULL); +COMMIT; +]] + + +local RPC_REQUEST_DEQUEUE_SQL = [[ +BEGIN; + DELETE FROM + clustering_rpc_requests + USING ( + SELECT * FROM clustering_rpc_requests WHERE node_id = %s FOR UPDATE SKIP LOCKED + ) q + WHERE q.id = clustering_rpc_requests.id RETURNING clustering_rpc_requests.*; +COMMIT; +]] + + +function _M.new(manager, db) + local self = { + manager = manager, + db = db, + interest = {}, -- id: callback pair + sub_unsub = queue.new(4096), -- pub/sub event queue, executed on the read thread + sequence = 0, + } + + return setmetatable(self, _MT) +end + + +function _M:_get_next_id() + local res = self.sequence + self.sequence = res + 1 + + return res +end + + +local function enqueue_notifications(notifications, notifications_queue) + assert(notifications_queue) + + if notifications then + for _, n in ipairs(notifications) do + assert(notifications_queue:push(n)) + end + end +end + + +function _M:_event_loop(lconn) + local notifications_queue = queue.new(4096) + local rpc_resp_channel_name = RESP_CHANNEL_PREFIX .. self.worker_id + + -- we always subscribe to our worker's receiving channel first + local res, err = lconn:query('LISTEN "' .. rpc_resp_channel_name .. '";') + if not res then + return nil, "unable to subscribe to concentrator response channel: " .. err + end + + while not exiting() do + while true do + local n, err = notifications_queue:pop(0) + if not n then + if err then + return nil, "unable to pop from notifications queue: " .. err + end + + break + end + + assert(n.operation == "notification") + + if n.channel == rpc_resp_channel_name then + -- an response for a previous RPC call we asked for + local payload = cjson_decode(n.payload) + assert(payload.jsonrpc == "2.0") + + -- response + local cb = self.interest[payload.id] + self.interest[payload.id] = nil -- edge trigger only once + + if cb then + local res, err = cb(payload) + if not res then + ngx_log(ngx_WARN, "[rpc] concentrator response interest handler failed: id: ", + payload.id, ", err: ", err) + end + + else + ngx_log(ngx_WARN, "[rpc] no interest for concentrator response id: ", payload.id, ", dropping it") + end + + else + -- other CP inside the cluster asked us to forward a call + assert(n.channel:sub(1, #REQ_CHANNEL_PREFIX) == REQ_CHANNEL_PREFIX, + "unexpected concentrator request channel name: " .. n.channel) + + local target_id = n.channel:sub(#REQ_CHANNEL_PREFIX + 1) + local sql = string_format(RPC_REQUEST_DEQUEUE_SQL, self.db.connector:escape_literal(target_id)) + local calls, err = self.db.connector:query(sql) + if not calls then + return nil, "concentrator request dequeue query failed: " .. err + end + + assert(calls[1] == true) + ngx_log(ngx_DEBUG, "concentrator got ", calls[2].affected_rows, + " calls from database for node ", target_id) + for _, call in ipairs(calls[2]) do + local payload = assert(call.payload) + local reply_to = assert(call.reply_to, + "unknown requester for RPC") + + local res, err = self.manager:_local_call(target_id, payload.method, + payload.params) + if res then + -- call success + res, err = self:_enqueue_rpc_response(reply_to, { + jsonrpc = "2.0", + id = payload.id, + result = res, + }) + if not res then + ngx_log(ngx_WARN, "[rpc] unable to enqueue RPC call result: ", err) + end + + else + -- call failure + res, err = self:_enqueue_rpc_response(reply_to, { + jsonrpc = "2.0", + id = payload.id, + error = { + code = jsonrpc.SERVER_ERROR, + message = tostring(err), + } + }) + if not res then + ngx_log(ngx_WARN, "[rpc] unable to enqueue RPC error: ", err) + end + end + end + end + end + + local res, err = lconn:wait_for_notification() + if not res then + if is_timeout(err) then + return nil, "wait_for_notification error: " .. err + end + + repeat + local sql, err = self.sub_unsub:pop(0) + if err then + return nil, err + end + + local _, notifications + res, err, _, notifications = lconn:query(sql or "SELECT 1;") -- keepalive + if not res then + return nil, "query to Postgres failed: " .. err + end + + enqueue_notifications(notifications, notifications_queue) + until not sql + + else + notifications_queue:push(res) + end + end +end + + +function _M:start(delay) + if not self.worker_id then + -- this can not be generated inside `:new()` as ngx.worker.id() + -- does not yet exist there and can only be generated inside + -- init_worker phase + self.worker_id = uuid.generate_v5(kong.node.get_id(), + tostring(ngx.worker.id())) + end + + assert(ngx.timer.at(delay or 0, function(premature) + if premature then + return + end + + local lconn = self.db.connector:connect("write") + lconn:settimeout(1000) + self.db.connector:store_connection(nil, "write") + + local _, res_or_perr, err = pcall(self._event_loop, self, lconn) + -- _event_loop never returns true + local delay = math.random(5, 10) + + ngx_log(ngx_ERR, "[rpc] concentrator event loop error: ", + res_or_perr or err, ", reconnecting in ", + math.floor(delay), " seconds") + + local res, err = lconn:disconnect() + if not res then + ngx_log(ngx_ERR, "[rpc] unable to close postgres connection: ", err) + end + + self:start(delay) + end)) +end + + +-- enqueue a RPC request to DP node with ID node_id +function _M:_enqueue_rpc_request(node_id, payload) + local sql = string_format(RPC_REQUEST_ENQUEUE_SQL, + self.db.connector:escape_literal(node_id), + self.db.connector:escape_literal(self.worker_id), + 5, + self.db.connector:escape_literal(cjson_encode(payload)), + self.db.connector:escape_literal(REQ_CHANNEL_PREFIX .. node_id)) + return self.db.connector:query(sql) +end + + +-- enqueue a RPC response from CP worker with ID worker_id +function _M:_enqueue_rpc_response(worker_id, payload) + local sql = string_format("SELECT pg_notify(%s, %s);", + self.db.connector:escape_literal(RESP_CHANNEL_PREFIX .. worker_id), + self.db.connector:escape_literal(cjson_encode(payload))) + return self.db.connector:query(sql) +end + + +-- subscribe to RPC calls for worker with ID node_id +function _M:_enqueue_subscribe(node_id) + return self.sub_unsub:push('LISTEN "' .. REQ_CHANNEL_PREFIX .. node_id .. '";') +end + + +-- unsubscribe to RPC calls for worker with ID node_id +function _M:_enqueue_unsubscribe(node_id) + return self.sub_unsub:push('UNLISTEN "' .. REQ_CHANNEL_PREFIX .. node_id .. '";') +end + + +-- asynchronously start executing a RPC, node_id is +-- needed for this implementation, because all nodes +-- over concentrator shares the same "socket" object +-- This way the manager code wouldn't tell the difference +-- between calls made over WebSocket or concentrator +function _M:call(node_id, method, params, callback) + local id = self:_get_next_id() + + self.interest[id] = callback + + return self:_enqueue_rpc_request(node_id, { + jsonrpc = "2.0", + method = method, + params = params, + id = id, + }) +end + + +return _M diff --git a/kong/clustering/rpc/future.lua b/kong/clustering/rpc/future.lua new file mode 100644 index 000000000000..230d8bf09983 --- /dev/null +++ b/kong/clustering/rpc/future.lua @@ -0,0 +1,73 @@ +local _M = {} +local _MT = { __index = _M, } + + +local semaphore = require("ngx.semaphore") + + +local STATE_NEW = 1 +local STATE_IN_PROGRESS = 2 +local STATE_SUCCEED = 3 +local STATE_ERRORED = 4 + + +function _M.new(node_id, socket, method, params) + local self = { + method = method, + params = params, + sema = semaphore.new(), + socket = socket, + node_id = node_id, + id = nil, + result = nil, + error = nil, + state = STATE_NEW, -- STATE_* + } + + return setmetatable(self, _MT) +end + + +-- start executing the future +function _M:start() + assert(self.state == STATE_NEW) + self.state = STATE_IN_PROGRESS + + local callback = function(resp) + assert(resp.jsonrpc == "2.0") + + if resp.result then + -- succeeded + self.result = resp.result + self.state = STATE_SUCCEED + + else + -- errored + self.error = resp.error + self.state = STATE_ERRORED + end + + self.sema:post() + + return true + end + + return self.socket:call(self.node_id, + self.method, + self.params, callback) +end + + +function _M:wait(timeout) + assert(self.state == STATE_IN_PROGRESS) + + local res, err = self.sema:wait(timeout) + if not res then + return res, err + end + + return self.state == STATE_SUCCEED +end + + +return _M diff --git a/kong/clustering/rpc/json_rpc_v2.lua b/kong/clustering/rpc/json_rpc_v2.lua new file mode 100644 index 000000000000..c5ece5d538e9 --- /dev/null +++ b/kong/clustering/rpc/json_rpc_v2.lua @@ -0,0 +1,41 @@ +local assert = assert +local tostring = tostring + + +local _M = { + PARSE_ERROR = -32700, + INVALID_REQUEST = -32600, + METHOD_NOT_FOUND = -32601, + INVALID_PARAMS = -32602, + INTERNAL_ERROR = -32603, + SERVER_ERROR = -32000, +} + + +local ERROR_MSG = { + [_M.PARSE_ERROR] = "Parse error", + [_M.INVALID_REQUEST] = "Invalid Request", + [_M.METHOD_NOT_FOUND] = "Method not found", + [_M.INVALID_PARAMS] = "Invalid params", + [_M.INTERNAL_ERROR] = "Internal error", + [_M.SERVER_ERROR] = "Server error", +} + + +function _M.new_error(id, code, msg) + if not msg then + msg = assert(ERROR_MSG[code], "unknown code: " .. tostring(code)) + end + + return { + jsonrpc = "2.0", + id = id, + error = { + code = code, + message = msg, + } + } +end + + +return _M diff --git a/kong/clustering/rpc/manager.lua b/kong/clustering/rpc/manager.lua new file mode 100644 index 000000000000..5104fdab7235 --- /dev/null +++ b/kong/clustering/rpc/manager.lua @@ -0,0 +1,365 @@ +local _M = {} +local _MT = { __index = _M, } + + +local server = require("resty.websocket.server") +local client = require("resty.websocket.client") +local socket = require("kong.clustering.rpc.socket") +local concentrator = require("kong.clustering.rpc.concentrator") +local future = require("kong.clustering.rpc.future") +local utils = require("kong.clustering.rpc.utils") +local callbacks = require("kong.clustering.rpc.callbacks") +local clustering_tls = require("kong.clustering.tls") +local constants = require("kong.constants") +local table_isempty = require("table.isempty") +local pl_tablex = require("pl.tablex") +local cjson = require("cjson.safe") + + +local ngx_var = ngx.var +local ngx_ERR = ngx.ERR +local ngx_log = ngx.log +local ngx_exit = ngx.exit +local ngx_time = ngx.time +local exiting = ngx.worker.exiting +local pl_tablex_makeset = pl_tablex.makeset +local cjson_encode = cjson.encode +local cjson_decode = cjson.decode +local validate_client_cert = clustering_tls.validate_client_cert +local CLUSTERING_PING_INTERVAL = constants.CLUSTERING_PING_INTERVAL + + +local WS_OPTS = { + timeout = constants.CLUSTERING_TIMEOUT, + max_payload_len = kong.configuration.cluster_max_payload, +} +local KONG_VERSION = kong.version + + +-- create a new RPC manager, node_id is own node_id +function _M.new(conf, node_id) + local self = { + -- clients[node_id]: { socket1 => true, socket2 => true, ... } + clients = {}, + client_capabilities = {}, + node_id = node_id, + conf = conf, + cluster_cert = assert(clustering_tls.get_cluster_cert(conf)), + cluster_cert_key = assert(clustering_tls.get_cluster_cert_key(conf)), + callbacks = callbacks.new(), + } + + self.concentrator = concentrator.new(self, kong.db) + + return setmetatable(self, _MT) +end + + +function _M:_add_socket(socket, capabilities_list) + local sockets = self.clients[socket.node_id] + if not sockets then + assert(self.concentrator:_enqueue_subscribe(socket.node_id)) + sockets = setmetatable({}, { __mode = "k", }) + self.clients[socket.node_id] = sockets + end + + self.client_capabilities[socket.node_id] = { + set = pl_tablex_makeset(capabilities_list), + list = capabilities_list, + } + + assert(not sockets[socket]) + + sockets[socket] = true +end + + +function _M:_remove_socket(socket) + local sockets = assert(self.clients[socket.node_id]) + + assert(sockets[socket]) + + sockets[socket] = nil + + if table_isempty(sockets) then + self.clients[socket.node_id] = nil + self.client_capabilities[socket.node_id] = nil + assert(self.concentrator:_enqueue_unsubscribe(socket.node_id)) + end +end + + +-- Helper that finds a node by node_id and check +-- if capability is supported +-- Returns: "local" if found locally, +-- or "concentrator" if found from the concentrator +-- In case of error, return nil, err instead +function _M:_find_node_and_check_capability(node_id, cap) + if self.client_capabilities[node_id] then + if not self.client_capabilities[node_id].set[cap] then + return nil, "requested capability does not exist, capability: " .. + cap .. ", node_id: " .. node_id + end + + return "local" + end + + -- does concentrator knows more about this client? + local res, err = kong.db.clustering_data_planes:select({ id = node_id }) + if err then + return nil, "unable to query concentrator " .. err + end + + if not res or ngx_time() - res.last_seen > CLUSTERING_PING_INTERVAL * 2 then + return nil, "node is not connected, node_id: " .. node_id + end + + for _, c in ipairs(res.rpc_capabilities) do + if c == cap then + return "concentrator" + end + end + + return nil, "requested capability does not exist, capability: " .. + cap .. ", node_id: " .. node_id +end + + +-- low level helper used internally by :call() and concentrator +-- this one does not consider forwarding using concentrator +-- when node does not exist +function _M:_local_call(node_id, method, params) + if not self.client_capabilities[node_id] then + return nil, "node is not connected, node_id: " .. node_id + end + + local cap = utils.parse_method_name(method) + if not self.client_capabilities[node_id].set[cap] then + return nil, "requested capability does not exist, capability: " .. + cap .. ", node_id: " .. node_id + end + + local s = next(self.clients[node_id]) -- TODO: better LB? + + local fut = future.new(node_id, s, method, params) + assert(fut:start()) + + local ok, err = fut:wait(5) + if err then + return nil, err + end + + if ok then + return fut.result + end + + return nil, fut.error.message +end + + +-- public interface, try call on node_id locally first, +-- if node is not connected, try concentrator next +function _M:call(node_id, method, ...) + local cap = utils.parse_method_name(method) + + local res, err = self:_find_node_and_check_capability(node_id, cap) + if not res then + return nil, err + end + + local params = {...} + + if res == "local" then + res, err = self:_local_call(node_id, method, params) + if not res then + return nil, err + end + + return res + end + + assert(res == "concentrator") + + -- try concentrator + local fut = future.new(node_id, self.concentrator, method, params) + assert(fut:start()) + + local ok, err = fut:wait(5) + if err then + return nil, err + end + + if ok then + return fut.result + end + + return nil, fut.error.message +end + + +-- handle incoming client connections +function _M:handle_websocket() + local kong_version = ngx_var.http_x_kong_version + local node_id = ngx_var.http_x_kong_node_id + local rpc_protocol = ngx_var.http_sec_websocket_protocol + local content_encoding = ngx_var.http_content_encoding + local rpc_capabilities = ngx_var.http_x_kong_rpc_capabilities + + if not kong_version then + ngx_log(ngx_ERR, "[rpc] client did not provide version number") + return ngx_exit(ngx.HTTP_CLOSE) + end + + if not node_id then + ngx_log(ngx_ERR, "[rpc] client did not provide node ID") + return ngx_exit(ngx.HTTP_CLOSE) + end + + if content_encoding ~= "x-snappy-framed" then + ngx_log(ngx_ERR, "[rpc] client does use Snappy compressed frames") + return ngx_exit(ngx.HTTP_CLOSE) + end + + if rpc_protocol ~= "kong.rpc.v1" then + ngx_log(ngx_ERR, "[rpc] unknown RPC protocol: " .. + tostring(rpc_protocol) .. + ", doesn't know how to communicate with client") + return ngx_exit(ngx.HTTP_CLOSE) + end + + if not rpc_capabilities then + ngx_log(ngx_ERR, "[rpc] client did not provide capability list") + return ngx_exit(ngx.HTTP_CLOSE) + end + + rpc_capabilities = cjson_decode(rpc_capabilities) + if not rpc_capabilities then + ngx_log(ngx_ERR, "[rpc] failed to decode client capability list") + return ngx_exit(ngx.HTTP_CLOSE) + end + + local cert, err = validate_client_cert(self.conf, self.cluster_cert, ngx_var.ssl_client_raw_cert) + if not cert then + ngx_log(ngx_ERR, "[rpc] client's certificate failed validation: ", err) + return ngx_exit(ngx.HTTP_CLOSE) + end + + ngx.header["X-Kong-RPC-Capabilities"] = cjson_encode(self.callbacks:get_capabilities_list()) + + local wb, err = server:new(WS_OPTS) + if not wb then + ngx_log(ngx_ERR, "[rpc] unable to establish WebSocket connection with client: ", err) + return ngx_exit(ngx.HTTP_CLOSE) + end + + local s = socket.new(self, wb, node_id) + self:_add_socket(s, rpc_capabilities) + + s:start() + local res, err = s:join() + self:_remove_socket(s) + + if not res then + ngx_log(ngx_ERR, "[rpc] RPC connection broken: ", err, " node_id: ", node_id) + return ngx_exit(ngx.ERROR) + end + + return ngx_exit(ngx.OK) +end + + +function _M:connect(premature, node_id, host, path, cert, key) + if premature then + return + end + + local uri = "wss://" .. host .. path + + local opts = { + ssl_verify = true, + client_cert = cert, + client_priv_key = key, + protocols = "kong.rpc.v1", + headers = { + "X-Kong-Version: " .. KONG_VERSION, + "X-Kong-Node-Id: " .. self.node_id, + "X-Kong-Hostname: " .. kong.node.get_hostname(), + "X-Kong-RPC-Capabilities: " .. cjson_encode(self.callbacks:get_capabilities_list()), + "Content-Encoding: x-snappy-framed" + }, + } + + if self.conf.cluster_mtls == "shared" then + opts.server_name = "kong_clustering" + + else + -- server_name will be set to the host if it is not explicitly defined here + if self.conf.cluster_server_name ~= "" then + opts.server_name = self.conf.cluster_server_name + end + end + + local reconnection_delay = math.random(5, 10) + + local c = assert(client:new(WS_OPTS)) + + local ok, err = c:connect(uri, opts) + if not ok then + ngx_log(ngx_ERR, "[rpc] unable to connect to peer: ", err) + goto err + end + + do + local resp_headers = c:get_resp_headers() + -- FIXME: resp_headers should not be case sensitive + if not resp_headers or not resp_headers["x_kong_rpc_capabilities"] then + ngx_log(ngx_ERR, "[rpc] peer did not provide capability list, node_id: ", node_id) + c:send_close() -- can't do much if this fails + goto err + end + + local capabilities = resp_headers["x_kong_rpc_capabilities"] + capabilities = cjson_decode(capabilities) + if not capabilities then + ngx_log(ngx_ERR, "[rpc] unable to decode peer capability list, node_id: ", node_id, + " list: ", capabilities) + c:send_close() -- can't do much if this fails + goto err + end + + local s = socket.new(self, c, node_id) + s:start() + self:_add_socket(s, capabilities) + + ok, err = s:join() -- main event loop + + self:_remove_socket(s) + + if not ok then + ngx_log(ngx_ERR, "[rpc] connection to node_id: ", node_id, " broken, err: ", + err, ", reconnecting in ", reconnection_delay, " seconds") + end + end + + ::err:: + + if not exiting() then + ngx.timer.at(reconnection_delay, function(premature) + self:connect(premature, node_id, host, path, cert, key) + end) + end +end + + +function _M:get_peers() + local res = {} + + for node_id, cap in pairs(self.client_capabilities) do + res[node_id] = cap.list + end + + return res +end + + +return _M diff --git a/kong/clustering/rpc/queue.lua b/kong/clustering/rpc/queue.lua new file mode 100644 index 000000000000..7b07705be8f5 --- /dev/null +++ b/kong/clustering/rpc/queue.lua @@ -0,0 +1,73 @@ +local semaphore = require("ngx.semaphore") +local table_new = require("table.new") +local rpc_utils = require("kong.clustering.rpc.utils") + + +local assert = assert +local setmetatable = setmetatable +local math_min = math.min +local is_timeout = rpc_utils.is_timeout + + +local _M = {} +local _MT = { __index = _M, } + + +local DEFAULT_QUEUE_LEN = 128 + + +function _M.new(max_len) + local self = { + semaphore = assert(semaphore.new()), + max = max_len, + + elts = table_new(math_min(max_len, DEFAULT_QUEUE_LEN), 0), + first = 0, + last = -1, + } + + return setmetatable(self, _MT) +end + + +function _M:push(item) + local last = self.last + + if last - self.first + 1 >= self.max then + return nil, "queue overflow" + end + + last = last + 1 + self.last = last + self.elts[last] = item + + self.semaphore:post() + + return true +end + + +function _M:pop(timeout) + local ok, err = self.semaphore:wait(timeout) + if not ok then + if is_timeout(err) then + return nil + end + + return nil, err + end + + local first = self.first + + -- queue can not be empty because semaphore succeed + assert(first <= self.last) + + local item = self.elts[first] + self.elts[first] = nil + self.first = first + 1 + + return item +end + + +return _M diff --git a/kong/clustering/rpc/socket.lua b/kong/clustering/rpc/socket.lua new file mode 100644 index 000000000000..243a44522fc2 --- /dev/null +++ b/kong/clustering/rpc/socket.lua @@ -0,0 +1,284 @@ +-- socket represents an open WebSocket connection +-- unlike the WebSocket object, it can be accessed via different requests +-- with the help of semaphores + + +local _M = {} +local _MT = { __index = _M, } + + +local utils = require("kong.clustering.rpc.utils") +local queue = require("kong.clustering.rpc.queue") +local jsonrpc = require("kong.clustering.rpc.json_rpc_v2") +local constants = require("kong.constants") + + +local assert = assert +local string_format = string.format +local kong = kong +local is_timeout = utils.is_timeout +local compress_payload = utils.compress_payload +local decompress_payload = utils.decompress_payload +local exiting = ngx.worker.exiting +local ngx_time = ngx.time +local ngx_log = ngx.log +local new_error = jsonrpc.new_error + + +local CLUSTERING_PING_INTERVAL = constants.CLUSTERING_PING_INTERVAL +local PING_WAIT = CLUSTERING_PING_INTERVAL * 1.5 +local PING_TYPE = "PING" +local PONG_TYPE = "PONG" +local ngx_WARN = ngx.WARN +local ngx_DEBUG = ngx.DEBUG + + +-- create a new socket wrapper, wb is the WebSocket object to use +-- timeout and max_payload_len must already been set by caller when +-- creating the `wb` object +function _M.new(manager, wb, node_id) + local self = { + wb = wb, + interest = {}, -- id: callback pair + outgoing = queue.new(4096), -- write queue + manager = manager, + node_id = node_id, + sequence = 0, + } + + return setmetatable(self, _MT) +end + + +function _M:_get_next_id() + local res = self.sequence + self.sequence = res + 1 + + return res +end + + +function _M._dispatch(premature, self, cb, payload) + if premature then + return + end + + local res, err = cb(self.node_id, unpack(payload.params)) + if not res then + ngx_log(ngx_WARN, "[rpc] RPC callback failed: ", err) + + res, err = self.outgoing:push(new_error(payload.id, jsonrpc.SERVER_ERROR, + tostring(err))) + if not res then + ngx_log(ngx_WARN, "[rpc] unable to push RPC call error: ", err) + end + + return + end + + -- success + res, err = self.outgoing:push({ + jsonrpc = "2.0", + id = payload.id, + result = res, + }) + if not res then + ngx_log(ngx_WARN, "[rpc] unable to push RPC call result: ", err) + end +end + + +-- start reader and writer thread and event loop +function _M:start() + self.read_thread = ngx.thread.spawn(function() + local last_seen = ngx_time() + + while not exiting() do + local data, typ, err = self.wb:recv_frame() + + if err then + if not is_timeout(err) then + return nil, err + end + + local waited = ngx_time() - last_seen + if waited > PING_WAIT then + return nil, "did not receive ping frame from other end within " .. + PING_WAIT .. " seconds" + end + + if waited > CLUSTERING_PING_INTERVAL then + local res, err = self.outgoing:push(PING_TYPE) + if not res then + return nil, "unable to send ping: " .. err + end + end + + -- timeout + goto continue + end + + last_seen = ngx_time() + + if typ == "ping" then + local res, err = self.outgoing:push(PONG_TYPE) + if not res then + return nil, "unable to handle ping: " .. err + end + + goto continue + end + + if typ == "pong" then + ngx_log(ngx_DEBUG, "[rpc] got PONG frame") + + goto continue + end + + if typ == "close" then + return true + end + + assert(typ == "binary") + + local payload = decompress_payload(data) + assert(payload.jsonrpc == "2.0") + + if payload.method then + -- invoke + + local dispatch_cb = self.manager.callbacks.callbacks[payload.method] + if not dispatch_cb then + local res, err = self.outgoing:push(new_error(payload.id, jsonrpc.METHOD_NOT_FOUND)) + if not res then + return nil, "unable to send \"METHOD_NOT_FOUND\" error back to client: " .. err + end + + goto continue + end + + -- call dispatch + local res, err = kong.timer:named_at(string_format("JSON-RPC callback for node_id: %s, id: %d, method: %s", + self.node_id, payload.id, payload.method), + 0, _M._dispatch, self, dispatch_cb, payload) + if not res then + local reso, erro = self.outgoing:push(new_error(payload.id, jsonrpc.INTERNAL_ERROR)) + if not reso then + return nil, "unable to send \"INTERNAL_ERROR\" error back to client: " .. erro + end + + return nil, "unable to dispatch JSON-RPC callback: " .. err + end + + else + -- response + local interest_cb = self.interest[payload.id] + self.interest[payload.id] = nil -- edge trigger only once + + if not interest_cb then + ngx_log(ngx_WARN, "[rpc] no interest for RPC response id: ", payload.id, ", dropping it") + + goto continue + end + + local res, err = interest_cb(payload) + if not res then + ngx_log(ngx_WARN, "[rpc] RPC response interest handler failed: id: ", + payload.id, ", err: ", err) + end + end + + ::continue:: + end + end) + + self.write_thread = ngx.thread.spawn(function() + while not exiting() do + local payload, err = self.outgoing:pop(5) + if err then + return nil, err + end + + if payload then + if payload == PING_TYPE then + local _, err = self.wb:send_ping() + if err then + return nil, "failed to send PING frame to peer: " .. err + + else + ngx_log(ngx_DEBUG, "[rpc] sent PING frame to peer") + end + + elseif payload == PONG_TYPE then + local _, err = self.wb:send_pong() + if err then + return nil, "failed to send PONG frame to peer: " .. err + + else + ngx_log(ngx_DEBUG, "[rpc] sent PONG frame to peer") + end + + else + assert(type(payload) == "table") + + local bytes, err = self.wb:send_binary(compress_payload(payload)) + if not bytes then + return nil, err + end + end + end + end + end) +end + + +function _M:join() + local ok, err, perr = ngx.thread.wait(self.write_thread, self.read_thread) + self:stop() + + if not ok then + return nil, err + end + + if perr then + return nil, perr + end + + return true +end + + +function _M:stop() + ngx.thread.kill(self.write_thread) + ngx.thread.kill(self.read_thread) + + if self.wb.close then + self.wb:close() + + else + self.wb:send_close() + end +end + + +-- asynchronously start executing a RPC, _node_id is not +-- needed for this implementation, but it is important +-- for concentrator socket, so we include it just to keep +-- the signature consistent +function _M:call(node_id, method, params, callback) + assert(node_id == self.node_id) + + local id = self:_get_next_id() + + self.interest[id] = callback + + return self.outgoing:push({ + jsonrpc = "2.0", + method = method, + params = params, + id = id, + }) +end + + +return _M diff --git a/kong/clustering/rpc/utils.lua b/kong/clustering/rpc/utils.lua new file mode 100644 index 000000000000..544d2892932f --- /dev/null +++ b/kong/clustering/rpc/utils.lua @@ -0,0 +1,45 @@ +local _M = {} +local pl_stringx = require("pl.stringx") +local cjson = require("cjson") +local snappy = require("resty.snappy") + + +local string_sub = string.sub +local assert = assert +local cjson_encode = cjson.encode +local cjson_decode = cjson.decode +local rfind = pl_stringx.rfind +local snappy_compress = snappy.compress +local snappy_uncompress = snappy.uncompress + + +function _M.parse_method_name(method) + local pos = rfind(method, ".") + if not pos then + return nil, "not a valid method name" + end + + return method:sub(1, pos - 1), method:sub(pos + 1) +end + + +function _M.is_timeout(err) + return err and (err == "timeout" or string_sub(err, -7) == "timeout") +end + + +function _M.compress_payload(payload) + local json = cjson_encode(payload) + local data = assert(snappy_compress(json)) + return data +end + + +function _M.decompress_payload(compressed) + local json = assert(snappy_uncompress(compressed)) + local data = cjson_decode(json) + return data +end + + +return _M diff --git a/kong/clustering/services/debug.lua b/kong/clustering/services/debug.lua new file mode 100644 index 000000000000..387b19e62a1f --- /dev/null +++ b/kong/clustering/services/debug.lua @@ -0,0 +1,70 @@ +local _M = {} + + +local resty_log = require("resty.kong.log") +local constants = require("kong.constants") + + +local tostring = tostring + + +local function rpc_set_log_level(_node_id, new_log_level, timeout) + if not constants.LOG_LEVELS[new_log_level] then + return nil, "unknown log level: " .. tostring(new_log_level) + end + + if type(new_log_level) == "string" then + new_log_level = constants.LOG_LEVELS[new_log_level] + end + + local timeout = math.ceil(timeout or constants.DYN_LOG_LEVEL_DEFAULT_TIMEOUT) + + local _, _, original_level = resty_log.get_log_level() + if new_log_level == original_level then + timeout = 0 + end + + -- this function should not fail, if it throws exception, let RPC framework handle it + resty_log.set_log_level(new_log_level, timeout) + + local data = { + log_level = new_log_level, + timeout = timeout, + } + -- broadcast to all workers in a node + local ok, err = kong.worker_events.post("debug", "log_level", data) + if not ok then + return nil, err + end + + -- store in shm so that newly spawned workers can update their log levels + ok, err = ngx.shared.kong:set(constants.DYN_LOG_LEVEL_KEY, new_log_level, timeout) + if not ok then + return nil, err + end + + ok, err = ngx.shared.kong:set(constants.DYN_LOG_LEVEL_TIMEOUT_AT_KEY, ngx.time() + timeout, timeout) + if not ok then + return nil, err + end + + return true +end + + +local function rpc_get_log_level(_node_id) + local current_level, timeout, original_level = resty_log.get_log_level() + return { current_level = constants.LOG_LEVELS[current_level], + timeout = timeout, + original_level = constants.LOG_LEVELS[original_level], + } +end + + +function _M.init(manager) + manager.callbacks:register("kong.debug.log_level.v1.get_log_level", rpc_get_log_level) + manager.callbacks:register("kong.debug.log_level.v1.set_log_level", rpc_set_log_level) +end + + +return _M diff --git a/kong/conf_loader/constants.lua b/kong/conf_loader/constants.lua index 94f402451bfb..cda8a9a9ccdb 100644 --- a/kong/conf_loader/constants.lua +++ b/kong/conf_loader/constants.lua @@ -497,6 +497,7 @@ local CONF_PARSERS = { cluster_max_payload = { typ = "number" }, cluster_use_proxy = { typ = "boolean" }, cluster_dp_labels = { typ = "array" }, + cluster_rpc = { typ = "boolean" }, kic = { typ = "boolean" }, pluginserver_names = { typ = "array" }, diff --git a/kong/constants.lua b/kong/constants.lua index 2941ffae3f6f..0050ab1fee4f 100644 --- a/kong/constants.lua +++ b/kong/constants.lua @@ -256,6 +256,7 @@ local constants = { DYN_LOG_LEVEL_KEY = "kong:dyn_log_level", DYN_LOG_LEVEL_TIMEOUT_AT_KEY = "kong:dyn_log_level_timeout_at", + DYN_LOG_LEVEL_DEFAULT_TIMEOUT = 60, ADMIN_GUI_KCONFIG_CACHE_KEY = "admin:gui:kconfig", diff --git a/kong/db/migrations/core/023_360_to_370.lua b/kong/db/migrations/core/023_360_to_370.lua new file mode 100644 index 000000000000..f769ca0b7bcd --- /dev/null +++ b/kong/db/migrations/core/023_360_to_370.lua @@ -0,0 +1,23 @@ +return { + postgres = { + up = [[ + CREATE TABLE IF NOT EXISTS "clustering_rpc_requests" ( + "id" BIGSERIAL PRIMARY KEY, + "node_id" UUID NOT NULL, + "reply_to" UUID NOT NULL, + "ttl" TIMESTAMP WITH TIME ZONE NOT NULL, + "payload" JSON NOT NULL + ); + + CREATE INDEX IF NOT EXISTS "clustering_rpc_requests_node_id_idx" ON "clustering_rpc_requests" ("node_id"); + + DO $$ + BEGIN + ALTER TABLE IF EXISTS ONLY "clustering_data_planes" ADD "rpc_capabilities" TEXT[]; + EXCEPTION WHEN DUPLICATE_COLUMN THEN + -- Do nothing, accept existing state + END; + $$; + ]] + } +} diff --git a/kong/db/migrations/core/init.lua b/kong/db/migrations/core/init.lua index b19a271ce7aa..2f18b1cb5f76 100644 --- a/kong/db/migrations/core/init.lua +++ b/kong/db/migrations/core/init.lua @@ -20,4 +20,5 @@ return { "020_330_to_340", "021_340_to_350", "022_350_to_360", + "023_360_to_370", } diff --git a/kong/db/schema/entities/clustering_data_planes.lua b/kong/db/schema/entities/clustering_data_planes.lua index fb1f43db0990..09aee82548af 100644 --- a/kong/db/schema/entities/clustering_data_planes.lua +++ b/kong/db/schema/entities/clustering_data_planes.lua @@ -46,5 +46,7 @@ return { description = "Certificate details of the DPs.", }, }, + { rpc_capabilities = { type = "set", description = "An array of RPC capabilities this node supports.", + elements = typedefs.capability, } }, }, } diff --git a/kong/db/schema/typedefs.lua b/kong/db/schema/typedefs.lua index 66bfc0a5d724..0b0de71d9ea1 100644 --- a/kong/db/schema/typedefs.lua +++ b/kong/db/schema/typedefs.lua @@ -456,6 +456,11 @@ typedefs.tags = Schema.define { description = "A set of strings representing tags." } +typedefs.capability = Schema.define { + type = "string", + description = "A string representing an RPC capability." +} + local http_protocols = {} for p, s in pairs(constants.PROTOCOLS_WITH_SUBSYSTEM) do if s == "http" then diff --git a/kong/db/strategies/postgres/connector.lua b/kong/db/strategies/postgres/connector.lua index 102259dc5beb..80fd6251fd43 100644 --- a/kong/db/strategies/postgres/connector.lua +++ b/kong/db/strategies/postgres/connector.lua @@ -146,6 +146,7 @@ do local res, err = utils_toposort(table_names, get_table_name_neighbors) if res then + insert(res, 1, "clustering_rpc_requests") insert(res, 1, "cluster_events") end diff --git a/kong/init.lua b/kong/init.lua index 2c837dd0e52b..248ae521c593 100644 --- a/kong/init.lua +++ b/kong/init.lua @@ -687,6 +687,14 @@ function Kong.init() if is_http_module and (is_data_plane(config) or is_control_plane(config)) then kong.clustering = require("kong.clustering").new(config) + + if config.cluster_rpc then + kong.rpc = require("kong.clustering.rpc.manager").new(config, kong.node.get_id()) + + if is_data_plane(config) then + require("kong.clustering.services.debug").init(kong.rpc) + end + end end assert(db.vaults:load_vault_schemas(config.loaded_vaults)) @@ -961,6 +969,23 @@ function Kong.init_worker() if kong.clustering then kong.clustering:init_worker() + + local cluster_tls = require("kong.clustering.tls") + + if kong.rpc and is_http_module then + if is_data_plane(kong.configuration) then + ngx.timer.at(0, function(premature) + kong.rpc:connect(premature, + "control_plane", kong.configuration.cluster_control_plane, + "/v2/outlet", + cluster_tls.get_cluster_cert(kong.configuration).cdata, + cluster_tls.get_cluster_cert_key(kong.configuration)) + end) + + else -- control_plane + kong.rpc.concentrator:start() + end + end end ok, err = wasm.init_worker() @@ -1934,6 +1959,15 @@ function Kong.stream_api() end +function Kong.serve_cluster_rpc_listener(options) + log_init_worker_errors() + + ngx.ctx.KONG_PHASE = PHASES.cluster_listener + + return kong.rpc:handle_websocket() +end + + do local events = require "kong.runloop.events" Kong.stream_config_listener = events.stream_reconfigure_listener diff --git a/kong/templates/kong_defaults.lua b/kong/templates/kong_defaults.lua index 86b22f5760cd..62d117290a9b 100644 --- a/kong/templates/kong_defaults.lua +++ b/kong/templates/kong_defaults.lua @@ -41,6 +41,7 @@ cluster_ocsp = off cluster_max_payload = 16777216 cluster_use_proxy = off cluster_dp_labels = NONE +cluster_rpc = on lmdb_environment_path = dbless.lmdb lmdb_map_size = 2048m diff --git a/kong/templates/nginx_kong.lua b/kong/templates/nginx_kong.lua index 9e4127c489bc..5053c26764ce 100644 --- a/kong/templates/nginx_kong.lua +++ b/kong/templates/nginx_kong.lua @@ -566,6 +566,14 @@ server { Kong.serve_cluster_listener() } } + +> if cluster_rpc then + location = /v2/outlet { + content_by_lua_block { + Kong.serve_cluster_rpc_listener() + } + } +> end -- cluster_rpc is enabled } > end -- role == "control_plane" diff --git a/scripts/explain_manifest/fixtures/ubuntu-22.04-amd64.txt b/scripts/explain_manifest/fixtures/ubuntu-22.04-amd64.txt index 443c3426f7f8..cb1dca234d03 100644 --- a/scripts/explain_manifest/fixtures/ubuntu-22.04-amd64.txt +++ b/scripts/explain_manifest/fixtures/ubuntu-22.04-amd64.txt @@ -54,6 +54,13 @@ Needed : - libc.so.6 +- Path : /usr/local/kong/lib/libsnappy.so + Needed : + - libstdc++.so.6 + - libm.so.6 + - libgcc_s.so.1 + - libc.so.6 + - Path : /usr/local/kong/lib/libssl.so.3 Needed : - libstdc++.so.6 diff --git a/spec/01-unit/01-db/06-postgres_spec.lua b/spec/01-unit/01-db/06-postgres_spec.lua index c1a8ec127642..04382b144821 100644 --- a/spec/01-unit/01-db/06-postgres_spec.lua +++ b/spec/01-unit/01-db/06-postgres_spec.lua @@ -269,7 +269,7 @@ describe("kong.db [#postgres] connector", function() local ts = connector._get_topologically_sorted_table_names it("prepends cluster_events no matter what", function() - assert.same({"cluster_events"}, ts({})) + assert.same({"cluster_events", "clustering_rpc_requests"}, ts({})) end) it("sorts an array of unrelated schemas alphabetically by name", function() @@ -277,7 +277,7 @@ describe("kong.db [#postgres] connector", function() local b = schema_new({ name = "b", ttl = true, fields = {} }) local c = schema_new({ name = "c", ttl = true, fields = {} }) - assert.same({"cluster_events", "a", "b", "c"}, ts({ c, a, b })) + assert.same({"cluster_events", "clustering_rpc_requests", "a", "b", "c"}, ts({ c, a, b })) end) it("ignores non-ttl schemas", function() @@ -285,7 +285,7 @@ describe("kong.db [#postgres] connector", function() local b = schema_new({ name = "b", fields = {} }) local c = schema_new({ name = "c", ttl = true, fields = {} }) - assert.same({"cluster_events", "a", "c"}, ts({ c, a, b })) + assert.same({"cluster_events", "clustering_rpc_requests", "a", "c"}, ts({ c, a, b })) end) it("it puts destinations first", function() @@ -306,14 +306,14 @@ describe("kong.db [#postgres] connector", function() } }) - assert.same({"cluster_events", "a", "c", "b"}, ts({ a, b, c })) + assert.same({"cluster_events", "clustering_rpc_requests", "a", "c", "b"}, ts({ a, b, c })) end) it("puts core entities first, even when no relations", function() local a = schema_new({ name = "a", ttl = true, fields = {} }) local routes = schema_new({ name = "routes", ttl = true, fields = {} }) - assert.same({"cluster_events", "routes", "a"}, ts({ a, routes })) + assert.same({"cluster_events", "clustering_rpc_requests", "routes", "a"}, ts({ a, routes })) end) it("puts workspaces before core and others, when no relations", function() @@ -321,7 +321,7 @@ describe("kong.db [#postgres] connector", function() local workspaces = schema_new({ name = "workspaces", ttl = true, fields = {} }) local routes = schema_new({ name = "routes", ttl = true, fields = {} }) - assert.same({"cluster_events", "workspaces", "routes", "a"}, ts({ a, routes, workspaces })) + assert.same({"cluster_events", "clustering_rpc_requests", "workspaces", "routes", "a"}, ts({ a, routes, workspaces })) end) it("puts workspaces first, core entities second, and other entities afterwards, even with relations", function() @@ -343,7 +343,7 @@ describe("kong.db [#postgres] connector", function() } }) local workspaces = schema_new({ name = "workspaces", ttl = true, fields = {} }) - assert.same({ "cluster_events", "workspaces", "services", "routes", "a", "b" }, + assert.same({ "cluster_events", "clustering_rpc_requests", "workspaces", "services", "routes", "a", "b" }, ts({ services, b, a, workspaces, routes })) end) @@ -358,7 +358,7 @@ describe("kong.db [#postgres] connector", function() { a = { type = "foreign", reference = "a" } } -- we somehow forced workspaces to depend on a } }) - assert.same({ "cluster_events", "a", "workspaces", "services" }, ts({ services, a, workspaces })) + assert.same({ "cluster_events", "clustering_rpc_requests", "a", "workspaces", "services" }, ts({ services, a, workspaces })) end) it("returns an error if cycles are found", function() diff --git a/spec/01-unit/04-prefix_handler_spec.lua b/spec/01-unit/04-prefix_handler_spec.lua index eb0dfd76c7a9..9c3724a98d1f 100644 --- a/spec/01-unit/04-prefix_handler_spec.lua +++ b/spec/01-unit/04-prefix_handler_spec.lua @@ -308,6 +308,32 @@ describe("NGINX conf compiler", function() assert.not_matches("ssl_certificate_by_lua_block", kong_nginx_conf) assert.not_matches("ssl_dhparam", kong_nginx_conf) end) + + it("renders RPC server", function() + local conf = assert(conf_loader(helpers.test_conf_path, { + role = "control_plane", + cluster_cert = "spec/fixtures/kong_clustering.crt", + cluster_cert_key = "spec/fixtures/kong_clustering.key", + cluster_listen = "127.0.0.1:9005", + nginx_conf = "spec/fixtures/custom_nginx.template", + })) + local kong_nginx_conf = prefix_handler.compile_kong_conf(conf) + assert.matches("location = /v2/outlet {", kong_nginx_conf) + end) + + it("does not renders RPC server when inert", function() + local conf = assert(conf_loader(helpers.test_conf_path, { + role = "control_plane", + cluster_cert = "spec/fixtures/kong_clustering.crt", + cluster_cert_key = "spec/fixtures/kong_clustering.key", + cluster_listen = "127.0.0.1:9005", + cluster_rpc = "off", + nginx_conf = "spec/fixtures/custom_nginx.template", + })) + local kong_nginx_conf = prefix_handler.compile_kong_conf(conf) + assert.not_matches("location = /v2/outlet {", kong_nginx_conf) + end) + describe("handles client_ssl", function() it("on", function() local conf = assert(conf_loader(helpers.test_conf_path, { diff --git a/spec/02-integration/09-hybrid_mode/01-sync_spec.lua b/spec/02-integration/09-hybrid_mode/01-sync_spec.lua index a27d02faf785..59eebe4b887d 100644 --- a/spec/02-integration/09-hybrid_mode/01-sync_spec.lua +++ b/spec/02-integration/09-hybrid_mode/01-sync_spec.lua @@ -69,7 +69,6 @@ describe("CP/DP communication #" .. strategy, function() assert.near(14 * 86400, v.ttl, 3) assert.matches("^(%d+%.%d+)%.%d+", v.version) assert.equal(CLUSTERING_SYNC_STATUS.NORMAL, v.sync_status) - assert.equal(CLUSTERING_SYNC_STATUS.NORMAL, v.sync_status) return true end end diff --git a/spec/02-integration/18-hybrid_rpc/01-rpc_spec.lua b/spec/02-integration/18-hybrid_rpc/01-rpc_spec.lua new file mode 100644 index 000000000000..1f0ce4bbb919 --- /dev/null +++ b/spec/02-integration/18-hybrid_rpc/01-rpc_spec.lua @@ -0,0 +1,62 @@ +local helpers = require "spec.helpers" +local cjson = require("cjson.safe") + +for _, strategy in helpers.each_strategy() do + describe("Hybrid Mode RPC #" .. strategy, function() + + lazy_setup(function() + helpers.get_db_utils(strategy, { + "clustering_data_planes", + }) -- runs migrations + + assert(helpers.start_kong({ + role = "control_plane", + cluster_cert = "spec/fixtures/kong_clustering.crt", + cluster_cert_key = "spec/fixtures/kong_clustering.key", + database = strategy, + cluster_listen = "127.0.0.1:9005", + nginx_conf = "spec/fixtures/custom_nginx.template", + })) + + assert(helpers.start_kong({ + role = "data_plane", + database = "off", + prefix = "servroot2", + cluster_cert = "spec/fixtures/kong_clustering.crt", + cluster_cert_key = "spec/fixtures/kong_clustering.key", + cluster_control_plane = "127.0.0.1:9005", + proxy_listen = "0.0.0.0:9002", + nginx_conf = "spec/fixtures/custom_nginx.template", + })) + end) + + lazy_teardown(function() + helpers.stop_kong("servroot2") + helpers.stop_kong() + end) + + describe("status API", function() + it("shows DP RPC capability status", function() + helpers.wait_until(function() + local admin_client = helpers.admin_client() + finally(function() + admin_client:close() + end) + + local res = assert(admin_client:get("/clustering/data-planes")) + local body = assert.res_status(200, res) + local json = cjson.decode(body) + + for _, v in pairs(json.data) do + if v.ip == "127.0.0.1" and v.rpc_capabilities and #v.rpc_capabilities ~= 0 then + table.sort(v.rpc_capabilities) + assert.near(14 * 86400, v.ttl, 3) + assert.same({ "kong.debug.log_level.v1", }, v.rpc_capabilities) + return true + end + end + end, 10) + end) + end) + end) +end diff --git a/spec/02-integration/18-hybrid_rpc/02-log-level_spec.lua b/spec/02-integration/18-hybrid_rpc/02-log-level_spec.lua new file mode 100644 index 000000000000..fcebad0695fc --- /dev/null +++ b/spec/02-integration/18-hybrid_rpc/02-log-level_spec.lua @@ -0,0 +1,181 @@ +local helpers = require "spec.helpers" +local cjson = require("cjson.safe") + + +local function obtain_dp_node_id() + local dp_node_id + + helpers.wait_until(function() + local admin_client = helpers.admin_client() + finally(function() + admin_client:close() + end) + + local res = assert(admin_client:get("/clustering/data-planes")) + local body = assert.res_status(200, res) + local json = cjson.decode(body) + + for _, v in pairs(json.data) do + if v.ip == "127.0.0.1" and ngx.time() - v.last_seen < 3 then + dp_node_id = v.id + return true + end + end + end, 10) + + return dp_node_id +end + + +for _, strategy in helpers.each_strategy() do + describe("Hybrid Mode RPC #" .. strategy, function() + + lazy_setup(function() + helpers.get_db_utils(strategy, { + "clustering_data_planes", + }) -- runs migrations + + assert(helpers.start_kong({ + role = "control_plane", + cluster_cert = "spec/fixtures/kong_clustering.crt", + cluster_cert_key = "spec/fixtures/kong_clustering.key", + database = strategy, + cluster_listen = "127.0.0.1:9005", + nginx_conf = "spec/fixtures/custom_nginx.template", + })) + + assert(helpers.start_kong({ + role = "data_plane", + database = "off", + prefix = "servroot2", + cluster_cert = "spec/fixtures/kong_clustering.crt", + cluster_cert_key = "spec/fixtures/kong_clustering.key", + cluster_control_plane = "127.0.0.1:9005", + proxy_listen = "0.0.0.0:9002", + nginx_conf = "spec/fixtures/custom_nginx.template", + })) + end) + + lazy_teardown(function() + helpers.stop_kong("servroot2") + helpers.stop_kong() + end) + + describe("Dynamic log level over RPC", function() + it("can get the current log level", function() + local dp_node_id = obtain_dp_node_id() + + local admin_client = helpers.admin_client() + finally(function() + admin_client:close() + end) + + local res = assert(admin_client:get("/clustering/data-planes/" .. dp_node_id .. "/log-level")) + local body = assert.res_status(200, res) + local json = cjson.decode(body) + assert.equal(0, json.timeout) + assert.equal("debug", json.current_level) + assert.equal("debug", json.original_level) + end) + + it("can set the current log level", function() + local dp_node_id = obtain_dp_node_id() + + local admin_client = helpers.admin_client() + finally(function() + admin_client:close() + end) + + local res = assert(admin_client:put("/clustering/data-planes/" .. dp_node_id .. "/log-level", + { + headers = { + ["Content-Type"] = "application/json", + }, + body = { + current_level = "info", + timeout = 10, + }, + })) + assert.res_status(201, res) + + local res = assert(admin_client:get("/clustering/data-planes/" .. dp_node_id .. "/log-level")) + local body = assert.res_status(200, res) + local json = cjson.decode(body) + assert.near(10, json.timeout, 3) + assert.equal("info", json.current_level) + assert.equal("debug", json.original_level) + end) + + it("set current log level to original_level turns off feature", function() + local dp_node_id = obtain_dp_node_id() + + local admin_client = helpers.admin_client() + finally(function() + admin_client:close() + end) + + local res = assert(admin_client:put("/clustering/data-planes/" .. dp_node_id .. "/log-level", + { + headers = { + ["Content-Type"] = "application/json", + }, + body = { + current_level = "info", + timeout = 10, + }, + })) + assert.res_status(201, res) + + local res = assert(admin_client:put("/clustering/data-planes/" .. dp_node_id .. "/log-level", + { + headers = { + ["Content-Type"] = "application/json", + }, + body = { + current_level = "debug", + timeout = 10, + }, + })) + assert.res_status(201, res) + + local res = assert(admin_client:get("/clustering/data-planes/" .. dp_node_id .. "/log-level")) + local body = assert.res_status(200, res) + local json = cjson.decode(body) + assert.equal(0, json.timeout) + assert.equal("debug", json.current_level) + assert.equal("debug", json.original_level) + end) + + it("DELETE turns off feature", function() + local dp_node_id = obtain_dp_node_id() + + local admin_client = helpers.admin_client() + finally(function() + admin_client:close() + end) + + local res = assert(admin_client:put("/clustering/data-planes/" .. dp_node_id .. "/log-level", + { + headers = { + ["Content-Type"] = "application/json", + }, + body = { + current_level = "info", + timeout = 10, + }, + })) + assert.res_status(201, res) + + local res = assert(admin_client:delete("/clustering/data-planes/" .. dp_node_id .. "/log-level")) + assert.res_status(204, res) + + local res = assert(admin_client:get("/clustering/data-planes/" .. dp_node_id .. "/log-level")) + local body = assert.res_status(200, res) + local json = cjson.decode(body) + assert.equal(0, json.timeout) + assert.equal("debug", json.current_level) + assert.equal("debug", json.original_level) + end) + end) + end) +end diff --git a/spec/02-integration/18-hybrid_rpc/03-inert_spec.lua b/spec/02-integration/18-hybrid_rpc/03-inert_spec.lua new file mode 100644 index 000000000000..4a6d73cf659c --- /dev/null +++ b/spec/02-integration/18-hybrid_rpc/03-inert_spec.lua @@ -0,0 +1,101 @@ +local helpers = require "spec.helpers" +local cjson = require("cjson.safe") + + +local function obtain_dp_node_id() + local dp_node_id + + helpers.wait_until(function() + local admin_client = helpers.admin_client() + finally(function() + admin_client:close() + end) + + local res = assert(admin_client:get("/clustering/data-planes")) + local body = assert.res_status(200, res) + local json = cjson.decode(body) + + for _, v in pairs(json.data) do + if v.ip == "127.0.0.1" and ngx.time() - v.last_seen < 3 then + dp_node_id = v.id + return true + end + end + end, 10) + + return dp_node_id +end + + +for _, strategy in helpers.each_strategy() do + describe("Hybrid Mode RPC inert #" .. strategy, function() + + lazy_setup(function() + helpers.get_db_utils(strategy, { + "clustering_data_planes", + }) -- runs migrations + + assert(helpers.start_kong({ + role = "control_plane", + cluster_cert = "spec/fixtures/kong_clustering.crt", + cluster_cert_key = "spec/fixtures/kong_clustering.key", + database = strategy, + cluster_listen = "127.0.0.1:9005", + cluster_rpc = "off", + nginx_conf = "spec/fixtures/custom_nginx.template", + })) + + assert(helpers.start_kong({ + role = "data_plane", + database = "off", + prefix = "servroot2", + cluster_cert = "spec/fixtures/kong_clustering.crt", + cluster_cert_key = "spec/fixtures/kong_clustering.key", + cluster_control_plane = "127.0.0.1:9005", + cluster_rpc = "off", + proxy_listen = "0.0.0.0:9002", + nginx_conf = "spec/fixtures/custom_nginx.template", + })) + end) + + lazy_teardown(function() + helpers.stop_kong("servroot2") + helpers.stop_kong() + end) + + describe("RPC inert", function() + it("rpc_capability list should be empty", function() + helpers.wait_until(function() + local admin_client = helpers.admin_client() + finally(function() + admin_client:close() + end) + + local res = assert(admin_client:get("/clustering/data-planes")) + local body = assert.res_status(200, res) + local json = cjson.decode(body) + + for _, v in pairs(json.data) do + if v.ip == "127.0.0.1" then + assert.near(14 * 86400, v.ttl, 3) + assert.equal(0, #v.rpc_capabilities) + return true + end + end + end, 10) + end) + + it("can not get the current log level", function() + local dp_node_id = obtain_dp_node_id() + + local admin_client = helpers.admin_client() + finally(function() + admin_client:close() + end) + + local res = assert(admin_client:get("/clustering/data-planes/" .. dp_node_id .. "/log-level")) + assert.res_status(404, res) + end) + end) + end) +end diff --git a/spec/05-migration/db/migrations/core/023_360_to_370_spec.lua b/spec/05-migration/db/migrations/core/023_360_to_370_spec.lua new file mode 100644 index 000000000000..d9c52c42ec49 --- /dev/null +++ b/spec/05-migration/db/migrations/core/023_360_to_370_spec.lua @@ -0,0 +1,12 @@ +local uh = require "spec/upgrade_helpers" + +describe("database migration", function() + uh.old_after_up("has created the \"clustering_rpc_requests\" table", function() + assert.database_has_relation("clustering_rpc_requests") + assert.table_has_column("clustering_rpc_requests", "id", "bigint") + assert.table_has_column("clustering_rpc_requests", "node_id", "uuid") + assert.table_has_column("clustering_rpc_requests", "reply_to", "uuid") + assert.table_has_column("clustering_rpc_requests", "ttl", "timestamp with time zone") + assert.table_has_column("clustering_rpc_requests", "payload", "json") + end) +end)