Skip to content

Commit

Permalink
feat(clustering): CP/DP RPC framework & dynamic log level RPC (#12320)
Browse files Browse the repository at this point in the history
This PR implements an extensible mechanism where CP and DP nodes can bidirectionally communicate
with each other and perform RPC calls. Which can be used for building complex interactions
such as config sync, debugging, etc.

This PR also implements a simple RPC service for CP to inspect and change the dynamic log level
of the connected DP nodes. The functionality is exposed via a new Admin API endpoint
`/clustering/data-planes/<node-id>/log-level`.

KAG-623
KAG-3751
  • Loading branch information
dndx authored and ADD-SP committed Apr 28, 2024
1 parent a3d4331 commit 59ba081
Show file tree
Hide file tree
Showing 36 changed files with 2,079 additions and 9 deletions.
3 changes: 2 additions & 1 deletion .requirements
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ LIBEXPAT=2.6.2
LUA_KONG_NGINX_MODULE=a8411f7cf4289049f0bd3e8e40088e7256389ed3 # 0.11.0
LUA_RESTY_LMDB=7d2581cbe30cde18a8482d820c227ca0845c0ded # 1.4.2
LUA_RESTY_EVENTS=8448a92cec36ac04ea522e78f6496ba03c9b1fd8 # 0.2.0
LUA_RESTY_WEBSOCKET=60eafc3d7153bceb16e6327074e0afc3d94b1316 # 0.4.0
LUA_RESTY_WEBSOCKET=966c69c39f03029b9b42ec0f8e55aaed7d6eebc0 # 0.4.0.1
ATC_ROUTER=ffd11db657115769bf94f0c4f915f98300bc26b6 # 1.6.2
SNAPPY=23b3286820105438c5dbb9bc22f1bb85c5812c8a # 1.2.0

# KONG_MANAGER is the admin GUI for the CE gateway
# Please leave this untouched and use KONG_ADMIN in EE version instead
Expand Down
4 changes: 4 additions & 0 deletions build/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ kong_directory_genrule(
"//build/ee:bytecode_compile",
# EE new files
"//build/ee:luarocks_license",
"@snappy//:snappy",
] + select({
"@kong//:licensing_flag": [
"@kong-licensing",
Expand Down Expand Up @@ -235,6 +236,9 @@ kong_directory_genrule(
tar -cC ${LUAJIT}/share . | tar -xC ${BUILD_DESTDIR}/openresty/luajit/share
chmod -R "+rw" ${BUILD_DESTDIR}/openresty/luajit
SNAPPY=${WORKSPACE_PATH}/$(dirname $(echo '$(locations @snappy//:snappy)' | awk '{print $1}'))
cp ${SNAPPY}/libsnappy.so ${BUILD_DESTDIR}/kong/lib
LUAROCKS=${WORKSPACE_PATH}/$(dirname '$(location @luarocks//:luarocks_make)')/luarocks_tree
cp -r ${LUAROCKS}/. ${BUILD_DESTDIR}/.
rm ${BUILD_DESTDIR}/bin/lapis ${BUILD_DESTDIR}/bin/luarocks-admin
Expand Down
2 changes: 2 additions & 0 deletions build/openresty/repositories.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ load("//build/openresty/msgpack_c:msgpack_c_repositories.bzl", "msgpack_c_reposi
load("//build/openresty/wasmx:wasmx_repositories.bzl", "wasmx_repositories")
load("//build/openresty/wasmx/filters:repositories.bzl", "wasm_filters_repositories")
load("//build/openresty/brotli:brotli_repositories.bzl", "brotli_repositories")
load("//build/openresty/snappy:snappy_repositories.bzl", "snappy_repositories")

# This is a dummy file to export the module's repository.
_NGINX_MODULE_DUMMY_FILE = """
Expand All @@ -31,6 +32,7 @@ def openresty_repositories():
wasmx_repositories()
wasm_filters_repositories()
brotli_repositories()
snappy_repositories()

openresty_version = KONG_VAR["OPENRESTY"]

Expand Down
206 changes: 206 additions & 0 deletions build/openresty/snappy/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
# Copyright 2023 Google Inc. All Rights Reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

package(default_visibility = ["//visibility:public"])

licenses(["notice"])

SNAPPY_VERSION = (1, 1, 10)

config_setting(
name = "windows",
constraint_values = ["@platforms//os:windows"],
)

cc_library(
name = "config",
hdrs = ["config.h"],
defines = ["HAVE_CONFIG_H"],
)

cc_library(
name = "snappy-stubs-public",
hdrs = [":snappy-stubs-public.h"],
)

cc_library(
name = "snappy-stubs-internal",
srcs = ["snappy-stubs-internal.cc"],
hdrs = ["snappy-stubs-internal.h"],
deps = [
":config",
":snappy-stubs-public",
],
)

cc_library(
name = "snappy",
srcs = [
"snappy.cc",
"snappy-c.cc",
"snappy-internal.h",
"snappy-sinksource.cc",
],
hdrs = [
"snappy.h",
"snappy-c.h",
"snappy-sinksource.h",
],
copts = select({
":windows": [],
"//conditions:default": [
"-Wno-sign-compare",
],
}),
deps = [
":config",
":snappy-stubs-internal",
":snappy-stubs-public",
],
)

filegroup(
name = "testdata",
srcs = glob(["testdata/*"]),
)

cc_library(
name = "snappy-test",
testonly = True,
srcs = [
"snappy-test.cc",
"snappy_test_data.cc",
],
hdrs = [
"snappy-test.h",
"snappy_test_data.h",
],
deps = [":snappy-stubs-internal"],
)

cc_test(
name = "snappy_benchmark",
srcs = ["snappy_benchmark.cc"],
data = [":testdata"],
deps = [
":snappy",
":snappy-test",
"//third_party/benchmark:benchmark_main",
],
)

cc_test(
name = "snappy_unittest",
srcs = [
"snappy_unittest.cc",
],
data = [":testdata"],
deps = [
":snappy",
":snappy-test",
"//third_party/googletest:gtest_main",
],
)

# Generate a config.h similar to what cmake would produce.
genrule(
name = "config_h",
outs = ["config.h"],
cmd = """cat <<EOF >$@
#define HAVE_STDDEF_H 1
#define HAVE_STDINT_H 1
#ifdef __has_builtin
# if !defined(HAVE_BUILTIN_EXPECT) && __has_builtin(__builtin_expect)
# define HAVE_BUILTIN_EXPECT 1
# endif
# if !defined(HAVE_BUILTIN_CTZ) && __has_builtin(__builtin_ctzll)
# define HAVE_BUILTIN_CTZ 1
# endif
# if !defined(HAVE_BUILTIN_PREFETCH) && __has_builtin(__builtin_prefetech)
# define HAVE_BUILTIN_PREFETCH 1
# endif
#elif defined(__GNUC__) && (__GNUC__ > 3 || __GNUC__ == 3 && __GNUC_MINOR__ >= 4)
# ifndef HAVE_BUILTIN_EXPECT
# define HAVE_BUILTIN_EXPECT 1
# endif
# ifndef HAVE_BUILTIN_CTZ
# define HAVE_BUILTIN_CTZ 1
# endif
# ifndef HAVE_BUILTIN_PREFETCH
# define HAVE_BUILTIN_PREFETCH 1
# endif
#endif
#if defined(_WIN32) && !defined(HAVE_WINDOWS_H)
#define HAVE_WINDOWS_H 1
#endif
#ifdef __has_include
# if !defined(HAVE_BYTESWAP_H) && __has_include(<byteswap.h>)
# define HAVE_BYTESWAP_H 1
# endif
# if !defined(HAVE_UNISTD_H) && __has_include(<unistd.h>)
# define HAVE_UNISTD_H 1
# endif
# if !defined(HAVE_SYS_ENDIAN_H) && __has_include(<sys/endian.h>)
# define HAVE_SYS_ENDIAN_H 1
# endif
# if !defined(HAVE_SYS_MMAN_H) && __has_include(<sys/mman.h>)
# define HAVE_SYS_MMAN_H 1
# endif
# if !defined(HAVE_SYS_UIO_H) && __has_include(<sys/uio.h>)
# define HAVE_SYS_UIO_H 1
# endif
# if !defined(HAVE_SYS_TIME_H) && __has_include(<sys/time.h>)
# define HAVE_SYS_TIME_H 1
# endif
#endif
#ifndef SNAPPY_IS_BIG_ENDIAN
# ifdef __s390x__
# define SNAPPY_IS_BIG_ENDIAN 1
# elif defined(__BYTE_ORDER__) && defined(__ORDER_BIG_ENDIAN__) && __BYTE_ORDER__ == __ORDER_BIG_ENDIAN__
# define SNAPPY_IS_BIG_ENDIAN 1
# endif
#endif
EOF
""",
)

genrule(
name = "snappy_stubs_public_h",
srcs = ["snappy-stubs-public.h.in"],
outs = ["snappy-stubs-public.h"],
# Assume sys/uio.h is available on non-Windows.
# Set the version numbers.
cmd = ("""sed -e 's/$${HAVE_SYS_UIO_H_01}/!_WIN32/g' \
-e 's/$${PROJECT_VERSION_MAJOR}/%d/g' \
-e 's/$${PROJECT_VERSION_MINOR}/%d/g' \
-e 's/$${PROJECT_VERSION_PATCH}/%d/g' \
$< >$@""" % SNAPPY_VERSION),
)
15 changes: 15 additions & 0 deletions build/openresty/snappy/snappy_repositories.bzl
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
"""A module defining the dependency snappy"""

load("@bazel_tools//tools/build_defs/repo:git.bzl", "new_git_repository")
load("@bazel_tools//tools/build_defs/repo:utils.bzl", "maybe")
load("@kong_bindings//:variables.bzl", "KONG_VAR")

def snappy_repositories():
maybe(
new_git_repository,
name = "snappy",
branch = KONG_VAR["SNAPPY"],
remote = "https://github.com/google/snappy",
visibility = ["//visibility:public"], # let this to be referenced by openresty build
build_file = "//build/openresty/snappy:BUILD.bazel",
)
3 changes: 3 additions & 0 deletions changelog/unreleased/kong/cp-dp-rpc.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
message: "Remote procedure call (RPC) framework for Hybrid mode deployments."
type: feature
scope: Clustering
6 changes: 6 additions & 0 deletions changelog/unreleased/kong/dynamic-log-level-rpc.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
message: |
Dynamic log level over Hybrid mode RPC which allows setting DP log level
to a different level for specified duration before reverting back
to the `kong.conf` configured value.
type: feature
scope: Clustering
12 changes: 12 additions & 0 deletions kong-3.7.0-0.rockspec
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ dependencies = {
"lpeg == 1.1.0",
"lua-resty-ljsonschema == 1.1.6-2",
"lua-resty-jq == 0.1.0",
"lua-resty-snappy == 1.0-1",
}
build = {
type = "builtin",
Expand Down Expand Up @@ -106,6 +107,16 @@ build = {
["kong.clustering.config_sync_backup.election"] = "kong/clustering/config_sync_backup/election.lua",
["kong.clustering.config_sync_backup.strategies.s3"] = "kong/clustering/config_sync_backup/strategies/s3.lua",
["kong.clustering.config_sync_backup.strategies.gcs"] = "kong/clustering/config_sync_backup/strategies/gcs.lua",
["kong.clustering.services.debug"] = "kong/clustering/services/debug.lua",

["kong.clustering.rpc.callbacks"] = "kong/clustering/rpc/callbacks.lua",
["kong.clustering.rpc.future"] = "kong/clustering/rpc/future.lua",
["kong.clustering.rpc.json_rpc_v2"] = "kong/clustering/rpc/json_rpc_v2.lua",
["kong.clustering.rpc.manager"] = "kong/clustering/rpc/manager.lua",
["kong.clustering.rpc.queue"] = "kong/clustering/rpc/queue.lua",
["kong.clustering.rpc.socket"] = "kong/clustering/rpc/socket.lua",
["kong.clustering.rpc.utils"] = "kong/clustering/rpc/utils.lua",
["kong.clustering.rpc.concentrator"] = "kong/clustering/rpc/concentrator.lua",

["kong.cluster_events"] = "kong/cluster_events/init.lua",
["kong.cluster_events.strategies.postgres"] = "kong/cluster_events/strategies/postgres.lua",
Expand Down Expand Up @@ -552,6 +563,7 @@ build = {
["kong.db.migrations.core.020_330_to_340"] = "kong/db/migrations/core/020_330_to_340.lua",
["kong.db.migrations.core.021_340_to_350"] = "kong/db/migrations/core/021_340_to_350.lua",
["kong.db.migrations.core.022_350_to_360"] = "kong/db/migrations/core/022_350_to_360.lua",
["kong.db.migrations.core.023_360_to_370"] = "kong/db/migrations/core/023_360_to_370.lua",
["kong.db.migrations.operations.200_to_210"] = "kong/db/migrations/operations/200_to_210.lua",
["kong.db.migrations.operations.212_to_213"] = "kong/db/migrations/operations/212_to_213.lua",
["kong.db.migrations.operations.230_to_260"] = "kong/db/migrations/operations/230_to_260.lua",
Expand Down
46 changes: 46 additions & 0 deletions kong/api/routes/debug.lua
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ local kong = kong
local pcall = pcall
local type = type
local tostring = tostring
local tonumber = tonumber
local string_format = string.format
local math_max = math.max
local math_random = math.random
Expand Down Expand Up @@ -443,4 +444,49 @@ routes[cluster_name] = {
end
}


if kong.rpc then
routes["/clustering/data-planes/:node_id/log-level"] = {
GET = function(self)
local res, err =
kong.rpc:call(self.params.node_id, "kong.debug.log_level.v1.get_log_level")
if not res then
return kong.response.exit(500, { message = err, })
end

return kong.response.exit(200, res)
end,
PUT = function(self)
local new_level = self.params.current_level
local timeout = self.params.timeout and
math.ceil(tonumber(self.params.timeout)) or nil

if not new_level then
return kong.response.exit(400, { message = "Required parameter \"current_level\" is missing.", })
end

local res, err = kong.rpc:call(self.params.node_id,
"kong.debug.log_level.v1.set_log_level",
new_level,
timeout)
if not res then
return kong.response.exit(500, { message = err, })
end

return kong.response.exit(201)
end,
DELETE = function(self)
local res, err = kong.rpc:call(self.params.node_id,
"kong.debug.log_level.v1.set_log_level",
"warn",
0)
if not res then
return kong.response.exit(500, { message = err, })
end

return kong.response.exit(204)
end,
}
end

return routes
8 changes: 8 additions & 0 deletions kong/clustering/control_plane.lua
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,12 @@ function _M:handle_cp_websocket(cert)
local sync_status = CLUSTERING_SYNC_STATUS.UNKNOWN
local purge_delay = self.conf.cluster_data_plane_purge_delay
local update_sync_status = function()
local rpc_peers

if self.conf.cluster_rpc then
rpc_peers = kong.rpc:get_peers()
end

local ok
ok, err = kong.db.clustering_data_planes:upsert({ id = dp_id }, {
last_seen = last_seen,
Expand All @@ -273,6 +279,8 @@ function _M:handle_cp_websocket(cert)
sync_status = sync_status, -- TODO: import may have been failed though
labels = data.labels,
cert_details = dp_cert_details,
-- only update rpc_capabilities if dp_id is connected
rpc_capabilities = rpc_peers and rpc_peers[dp_id] or {},
}, { ttl = purge_delay })
if not ok then
ngx_log(ngx_ERR, _log_prefix, "unable to update clustering data plane status: ", err, log_suffix)
Expand Down
Loading

0 comments on commit 59ba081

Please sign in to comment.