From 5b3e1dfb605e5a8ab039631ca31a77555a945883 Mon Sep 17 00:00:00 2001 From: Guilherme Salazar Date: Wed, 7 Feb 2024 14:15:16 -0300 Subject: [PATCH] refactor(pluginservers): general refactor Context ------- The overall goal of this commit is to refactor the external plugins implementation, with the following goals in mind: - Make plugin server code more approachable to unfamiliar engineers and easier to evolve with confidence - Harden configuration; ensure configuration defects are caught before Kong is started - Extend testing coverage This is related to ongoing work on the Go PDK, with similar goals in mind. Summary ------- This commit implements the following overall changes to plugin server code: - Move configuration related code into conf loader, so that configuration loading and validation happens at startup time, rather than lazily, when plugin data is loaded or pluginservers are started. Add tests for current behavior. - Move process-management code - for starting up plugin servers as well as querying external plugins info - into the `process.lua` module. - Introduce a `kong.runloop.plugin_servers.rpc` module that encapsulates RPC initialization and protocol-specific implementations. This further simplifies the main plugin server main module. - Factor exposed API and phase handlers bridging code into a new `plugin` module, which encapsulates an external plugin representation, including the expected fields for any Kong plugin, plus external plugin-specific bits, such as the RPC instance. Part of this external plugin-specific part is the instance life cycle management. With this structure, the `kong.runloop.plugin_servers` main module contains only general external plugin code, including a list of loaded external plugins, and associated start/stop functions for plugin servers. Testing ------- This commit also implements the following improvements to tests: - Restructure fixtures to accommodate new external plugin servers -- namely, targeting for now in the existing Python and Javascript - Add new test cases for external plugins: * External plugin configuration: add test cases for current behavior; in particular: - Fail if no `query_cmd` is provided; - Warn if no `start_cmd` is provided - this is by design, as external plugins servers can be managed outside of Kong * Plugin server start / stop - for both Go and Python plugins * External plugin info querying for both Go and Python plugins * External plugin execution - for both Go and Python plugins Internal flow ------------- `.plugin_servers.init:` loads all external plugins, by calling .plugin_servers.process and `.plugin_servers.plugin` `.plugin_servers.process`: queries external plugins info with the command specified in `_query_cmd` proeprties `.plugin_servers.plugin`: with info obtained as described above, `.plugin:new` returns a kong-compatible representation of an external plugin, with phase handlers, PRIORITY, and wrappers to the PDK. Calls `.plugin_servers.rpc` to create an RPC through which Kong communicates with the plugin process `.plugin_servers.rpc`: based on info contained in the plugin (protocol field), creates the correct RPC for the given external plugin `.plugin_servers.rpc.pb_rpc`: protobuf rpc implementation - used by Golang `.plugin_servers.rpc.mp.rpc`: messagepack rpc implementation - used by JS and Python `.plugin_servers.init`: calls `.plugin_servers.process` to start external plugin servers `.plugin_servers.process`: optionally starts all external plugin servers (if a `_start_cmd` is found) uses the resty pipe API to manage the external plugin process --- .github/workflows/build_and_test.yml | 1 + .gitignore | 1 + MODULE.bazel.lock | 64 +++ kong-3.9.0-0.rockspec | 7 +- kong.conf.default | 9 +- kong/conf_loader/init.lua | 32 ++ kong/init.lua | 4 +- kong/runloop/plugin_servers/init.lua | 438 +++--------------- kong/runloop/plugin_servers/plugin.lua | 346 ++++++++++++++ kong/runloop/plugin_servers/process.lua | 208 ++++----- kong/runloop/plugin_servers/rpc/init.lua | 22 + .../plugin_servers/{ => rpc}/mp_rpc.lua | 69 +-- .../plugin_servers/{ => rpc}/pb_rpc.lua | 74 ++- kong/runloop/plugin_servers/rpc/util.lua | 19 + spec/01-unit/03-conf_loader_spec.lua | 66 +++ .../01-process-management_spec.lua | 160 +++++++ .../10-external-plugins/02-execution_spec.lua | 76 +++ .../99-reports_spec.lua} | 4 +- .../{ => external_plugins}/go/go-hello.go | 0 .../fixtures/{ => external_plugins}/go/go.mod | 0 .../fixtures/{ => external_plugins}/go/go.sum | 0 spec/fixtures/external_plugins/js/js-hello.js | 33 ++ spec/fixtures/external_plugins/py/py-hello.py | 37 ++ .../external_plugins/py/requirements.txt | 1 + spec/helpers.lua | 4 +- spec/internal/cmd.lua | 6 +- spec/internal/constants.lua | 2 +- 27 files changed, 1091 insertions(+), 592 deletions(-) create mode 100644 MODULE.bazel.lock create mode 100644 kong/runloop/plugin_servers/plugin.lua create mode 100644 kong/runloop/plugin_servers/rpc/init.lua rename kong/runloop/plugin_servers/{ => rpc}/mp_rpc.lua (84%) rename kong/runloop/plugin_servers/{ => rpc}/pb_rpc.lua (88%) create mode 100644 kong/runloop/plugin_servers/rpc/util.lua create mode 100644 spec/02-integration/10-external-plugins/01-process-management_spec.lua create mode 100644 spec/02-integration/10-external-plugins/02-execution_spec.lua rename spec/02-integration/{10-go_plugins/01-reports_spec.lua => 10-external-plugins/99-reports_spec.lua} (95%) rename spec/fixtures/{ => external_plugins}/go/go-hello.go (100%) rename spec/fixtures/{ => external_plugins}/go/go.mod (100%) rename spec/fixtures/{ => external_plugins}/go/go.sum (100%) create mode 100644 spec/fixtures/external_plugins/js/js-hello.js create mode 100755 spec/fixtures/external_plugins/py/py-hello.py create mode 100644 spec/fixtures/external_plugins/py/requirements.txt diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index 0e1c858b5f74c..c8a87a9264e36 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -319,6 +319,7 @@ jobs: - name: Build & install dependencies run: | make dev + pip install kong-pdk - name: Download test rerun information uses: actions/download-artifact@v3 diff --git a/.gitignore b/.gitignore index 5651c0f40c44d..460b863886429 100644 --- a/.gitignore +++ b/.gitignore @@ -41,3 +41,4 @@ bin/h2client *.wasm spec/fixtures/proxy_wasm_filters/build spec/fixtures/proxy_wasm_filters/target +spec/fixtures/external_plugins/go/go-hello diff --git a/MODULE.bazel.lock b/MODULE.bazel.lock new file mode 100644 index 0000000000000..b9b80d4d0360b --- /dev/null +++ b/MODULE.bazel.lock @@ -0,0 +1,64 @@ +{ + "lockFileVersion": 11, + "registryFileHashes": { + "https://bcr.bazel.build/bazel_registry.json": "8a28e4aff06ee60aed2a8c281907fb8bcbf3b753c91fb5a5c57da3215d5b3497", + "https://bcr.bazel.build/modules/abseil-cpp/20210324.2/MODULE.bazel": "7cd0312e064fde87c8d1cd79ba06c876bd23630c83466e9500321be55c96ace2", + "https://bcr.bazel.build/modules/abseil-cpp/20211102.0/MODULE.bazel": "70390338f7a5106231d20620712f7cccb659cd0e9d073d1991c038eb9fc57589", + "https://bcr.bazel.build/modules/abseil-cpp/20211102.0/source.json": "7e3a9adf473e9af076ae485ed649d5641ad50ec5c11718103f34de03170d94ad", + "https://bcr.bazel.build/modules/apple_support/1.5.0/MODULE.bazel": "50341a62efbc483e8a2a6aec30994a58749bd7b885e18dd96aa8c33031e558ef", + "https://bcr.bazel.build/modules/apple_support/1.5.0/source.json": "eb98a7627c0bc486b57f598ad8da50f6625d974c8f723e9ea71bd39f709c9862", + "https://bcr.bazel.build/modules/bazel_features/1.11.0/MODULE.bazel": "f9382337dd5a474c3b7d334c2f83e50b6eaedc284253334cf823044a26de03e8", + "https://bcr.bazel.build/modules/bazel_features/1.11.0/source.json": "c9320aa53cd1c441d24bd6b716da087ad7e4ff0d9742a9884587596edfe53015", + "https://bcr.bazel.build/modules/bazel_skylib/1.0.3/MODULE.bazel": "bcb0fd896384802d1ad283b4e4eb4d718eebd8cb820b0a2c3a347fb971afd9d8", + "https://bcr.bazel.build/modules/bazel_skylib/1.2.1/MODULE.bazel": "f35baf9da0efe45fa3da1696ae906eea3d615ad41e2e3def4aeb4e8bc0ef9a7a", + "https://bcr.bazel.build/modules/bazel_skylib/1.3.0/MODULE.bazel": "20228b92868bf5cfc41bda7afc8a8ba2a543201851de39d990ec957b513579c5", + "https://bcr.bazel.build/modules/bazel_skylib/1.6.1/MODULE.bazel": "8fdee2dbaace6c252131c00e1de4b165dc65af02ea278476187765e1a617b917", + "https://bcr.bazel.build/modules/bazel_skylib/1.6.1/source.json": "082ed5f9837901fada8c68c2f3ddc958bb22b6d654f71dd73f3df30d45d4b749", + "https://bcr.bazel.build/modules/buildozer/7.1.2/MODULE.bazel": "2e8dd40ede9c454042645fd8d8d0cd1527966aa5c919de86661e62953cd73d84", + "https://bcr.bazel.build/modules/buildozer/7.1.2/source.json": "c9028a501d2db85793a6996205c8de120944f50a0d570438fcae0457a5f9d1f8", + "https://bcr.bazel.build/modules/googletest/1.11.0/MODULE.bazel": "3a83f095183f66345ca86aa13c58b59f9f94a2f81999c093d4eeaa2d262d12f4", + "https://bcr.bazel.build/modules/googletest/1.11.0/source.json": "c73d9ef4268c91bd0c1cd88f1f9dfa08e814b1dbe89b5f594a9f08ba0244d206", + "https://bcr.bazel.build/modules/platforms/0.0.4/MODULE.bazel": "9b328e31ee156f53f3c416a64f8491f7eb731742655a47c9eec4703a71644aee", + "https://bcr.bazel.build/modules/platforms/0.0.5/MODULE.bazel": "5733b54ea419d5eaf7997054bb55f6a1d0b5ff8aedf0176fef9eea44f3acda37", + "https://bcr.bazel.build/modules/platforms/0.0.6/MODULE.bazel": "ad6eeef431dc52aefd2d77ed20a4b353f8ebf0f4ecdd26a807d2da5aa8cd0615", + "https://bcr.bazel.build/modules/platforms/0.0.7/MODULE.bazel": "72fd4a0ede9ee5c021f6a8dd92b503e089f46c227ba2813ff183b71616034814", + "https://bcr.bazel.build/modules/platforms/0.0.9/MODULE.bazel": "4a87a60c927b56ddd67db50c89acaa62f4ce2a1d2149ccb63ffd871d5ce29ebc", + "https://bcr.bazel.build/modules/platforms/0.0.9/source.json": "cd74d854bf16a9e002fb2ca7b1a421f4403cda29f824a765acd3a8c56f8d43e6", + "https://bcr.bazel.build/modules/protobuf/21.7/MODULE.bazel": "a5a29bb89544f9b97edce05642fac225a808b5b7be74038ea3640fae2f8e66a7", + "https://bcr.bazel.build/modules/protobuf/21.7/source.json": "bbe500720421e582ff2d18b0802464205138c06056f443184de39fbb8187b09b", + "https://bcr.bazel.build/modules/protobuf/3.19.0/MODULE.bazel": "6b5fbb433f760a99a22b18b6850ed5784ef0e9928a72668b66e4d7ccd47db9b0", + "https://bcr.bazel.build/modules/protobuf/3.19.6/MODULE.bazel": "9233edc5e1f2ee276a60de3eaa47ac4132302ef9643238f23128fea53ea12858", + "https://bcr.bazel.build/modules/rules_cc/0.0.1/MODULE.bazel": "cb2aa0747f84c6c3a78dad4e2049c154f08ab9d166b1273835a8174940365647", + "https://bcr.bazel.build/modules/rules_cc/0.0.2/MODULE.bazel": "6915987c90970493ab97393024c156ea8fb9f3bea953b2f3ec05c34f19b5695c", + "https://bcr.bazel.build/modules/rules_cc/0.0.8/MODULE.bazel": "964c85c82cfeb6f3855e6a07054fdb159aced38e99a5eecf7bce9d53990afa3e", + "https://bcr.bazel.build/modules/rules_cc/0.0.9/MODULE.bazel": "836e76439f354b89afe6a911a7adf59a6b2518fafb174483ad78a2a2fde7b1c5", + "https://bcr.bazel.build/modules/rules_cc/0.0.9/source.json": "1f1ba6fea244b616de4a554a0f4983c91a9301640c8fe0dd1d410254115c8430", + "https://bcr.bazel.build/modules/rules_java/4.0.0/MODULE.bazel": "5a78a7ae82cd1a33cef56dc578c7d2a46ed0dca12643ee45edbb8417899e6f74", + "https://bcr.bazel.build/modules/rules_java/7.6.5/MODULE.bazel": "481164be5e02e4cab6e77a36927683263be56b7e36fef918b458d7a8a1ebadb1", + "https://bcr.bazel.build/modules/rules_java/7.6.5/source.json": "a805b889531d1690e3c72a7a7e47a870d00323186a9904b36af83aa3d053ee8d", + "https://bcr.bazel.build/modules/rules_jvm_external/4.4.2/MODULE.bazel": "a56b85e418c83eb1839819f0b515c431010160383306d13ec21959ac412d2fe7", + "https://bcr.bazel.build/modules/rules_jvm_external/4.4.2/source.json": "a075731e1b46bc8425098512d038d416e966ab19684a10a34f4741295642fc35", + "https://bcr.bazel.build/modules/rules_license/0.0.3/MODULE.bazel": "627e9ab0247f7d1e05736b59dbb1b6871373de5ad31c3011880b4133cafd4bd0", + "https://bcr.bazel.build/modules/rules_license/0.0.7/MODULE.bazel": "088fbeb0b6a419005b89cf93fe62d9517c0a2b8bb56af3244af65ecfe37e7d5d", + "https://bcr.bazel.build/modules/rules_license/0.0.7/source.json": "355cc5737a0f294e560d52b1b7a6492d4fff2caf0bef1a315df5a298fca2d34a", + "https://bcr.bazel.build/modules/rules_pkg/0.7.0/MODULE.bazel": "df99f03fc7934a4737122518bb87e667e62d780b610910f0447665a7e2be62dc", + "https://bcr.bazel.build/modules/rules_pkg/0.7.0/source.json": "c2557066e0c0342223ba592510ad3d812d4963b9024831f7f66fd0584dd8c66c", + "https://bcr.bazel.build/modules/rules_proto/4.0.0/MODULE.bazel": "a7a7b6ce9bee418c1a760b3d84f83a299ad6952f9903c67f19e4edd964894e06", + "https://bcr.bazel.build/modules/rules_proto/5.3.0-21.7/MODULE.bazel": "e8dff86b0971688790ae75528fe1813f71809b5afd57facb44dad9e8eca631b7", + "https://bcr.bazel.build/modules/rules_proto/5.3.0-21.7/source.json": "d57902c052424dfda0e71646cb12668d39c4620ee0544294d9d941e7d12bc3a9", + "https://bcr.bazel.build/modules/rules_python/0.10.2/MODULE.bazel": "cc82bc96f2997baa545ab3ce73f196d040ffb8756fd2d66125a530031cd90e5f", + "https://bcr.bazel.build/modules/rules_python/0.22.1/MODULE.bazel": "26114f0c0b5e93018c0c066d6673f1a2c3737c7e90af95eff30cfee38d0bbac7", + "https://bcr.bazel.build/modules/rules_python/0.22.1/source.json": "57226905e783bae7c37c2dd662be078728e48fa28ee4324a7eabcafb5a43d014", + "https://bcr.bazel.build/modules/rules_python/0.4.0/MODULE.bazel": "9208ee05fd48bf09ac60ed269791cf17fb343db56c8226a720fbb1cdf467166c", + "https://bcr.bazel.build/modules/stardoc/0.5.1/MODULE.bazel": "1a05d92974d0c122f5ccf09291442580317cdd859f07a8655f1db9a60374f9f8", + "https://bcr.bazel.build/modules/stardoc/0.5.1/source.json": "a96f95e02123320aa015b956f29c00cb818fa891ef823d55148e1a362caacf29", + "https://bcr.bazel.build/modules/upb/0.0.0-20220923-a547704/MODULE.bazel": "7298990c00040a0e2f121f6c32544bab27d4452f80d9ce51349b1a28f3005c43", + "https://bcr.bazel.build/modules/upb/0.0.0-20220923-a547704/source.json": "f1ef7d3f9e0e26d4b23d1c39b5f5de71f584dd7d1b4ef83d9bbba6ec7a6a6459", + "https://bcr.bazel.build/modules/zlib/1.2.11/MODULE.bazel": "07b389abc85fdbca459b69e2ec656ae5622873af3f845e1c9d80fe179f3effa0", + "https://bcr.bazel.build/modules/zlib/1.2.12/MODULE.bazel": "3b1a8834ada2a883674be8cbd36ede1b6ec481477ada359cd2d3ddc562340b27", + "https://bcr.bazel.build/modules/zlib/1.3.1.bcr.3/MODULE.bazel": "af322bc08976524477c79d1e45e241b6efbeb918c497e8840b8ab116802dda79", + "https://bcr.bazel.build/modules/zlib/1.3.1.bcr.3/source.json": "2be409ac3c7601245958cd4fcdff4288be79ed23bd690b4b951f500d54ee6e7d" + }, + "selectedYankedVersions": {}, + "moduleExtensions": {} +} diff --git a/kong-3.9.0-0.rockspec b/kong-3.9.0-0.rockspec index 6bf6989b33345..fdd5e4970ac0b 100644 --- a/kong-3.9.0-0.rockspec +++ b/kong-3.9.0-0.rockspec @@ -225,8 +225,11 @@ build = { ["kong.runloop.balancer.upstreams"] = "kong/runloop/balancer/upstreams.lua", ["kong.runloop.plugin_servers"] = "kong/runloop/plugin_servers/init.lua", ["kong.runloop.plugin_servers.process"] = "kong/runloop/plugin_servers/process.lua", - ["kong.runloop.plugin_servers.mp_rpc"] = "kong/runloop/plugin_servers/mp_rpc.lua", - ["kong.runloop.plugin_servers.pb_rpc"] = "kong/runloop/plugin_servers/pb_rpc.lua", + ["kong.runloop.plugin_servers.plugin"] = "kong/runloop/plugin_servers/plugin.lua", + ["kong.runloop.plugin_servers.rpc"] = "kong/runloop/plugin_servers/rpc/init.lua", + ["kong.runloop.plugin_servers.rpc.util"] = "kong/runloop/plugin_servers/rpc/util.lua", + ["kong.runloop.plugin_servers.rpc.mp_rpc"] = "kong/runloop/plugin_servers/rpc/mp_rpc.lua", + ["kong.runloop.plugin_servers.rpc.pb_rpc"] = "kong/runloop/plugin_servers/rpc/pb_rpc.lua", ["kong.runloop.wasm"] = "kong/runloop/wasm.lua", ["kong.runloop.wasm.properties"] = "kong/runloop/wasm/properties.lua", diff --git a/kong.conf.default b/kong.conf.default index 447efb3c0a67d..2b45d2c8c2e66 100644 --- a/kong.conf.default +++ b/kong.conf.default @@ -163,14 +163,17 @@ #pluginserver_XXX_socket = /.socket # Path to the unix socket # used by the pluginserver. + #pluginserver_XXX_start_cmd = /usr/local/bin/ # Full command (including # any needed arguments) to - # start the pluginserver + # start the + # pluginserver. + #pluginserver_XXX_query_cmd = /usr/local/bin/query_ # Full command to "query" the # pluginserver. Should # produce a JSON with the - # dump info of all plugins it - # manages + # dump info of the plugin it + # manages. #port_maps = # With this configuration parameter, you can # let Kong Gateway know the port from diff --git a/kong/conf_loader/init.lua b/kong/conf_loader/init.lua index f1deaf9ef21d3..d3218a1d7820f 100644 --- a/kong/conf_loader/init.lua +++ b/kong/conf_loader/init.lua @@ -955,6 +955,38 @@ local function load(path, custom_conf, opts) conf.cluster_rpc = "off" end + -- parse and validate pluginserver directives + if conf.pluginserver_names then + local pluginservers = {} + for i, name in ipairs(conf.pluginserver_names) do + name = name:lower() + local env_prefix = "pluginserver_" .. name:gsub("-", "_") + local socket = conf[env_prefix .. "_socket"] or (conf.prefix .. "/" .. name .. ".socket") + local start_command = conf[env_prefix .. "_start_cmd"] or exists("/usr/local/bin/" .. name) + local query_command = conf[env_prefix .. "_query_cmd"] or exists("/usr/local/bin/" .. name .. " -dump") + + -- if start_command is unset, we assume the pluginserver process is + -- managed externally + if not start_command then + log.warn("start_command undefined for pluginserver " .. name .. "; assuming external process management") + end + + -- query_command is required + if not query_command then + return nil, "query_command undefined for pluginserver " .. name + end + + pluginservers[i] = { + name = name, + socket = socket, + start_command = start_command, + query_command = query_command, + } + end + + conf.pluginservers = setmetatable(pluginservers, conf_constants._NOP_TOSTRING_MT) + end + -- initialize the dns client, so the globally patched tcp.connect method -- will work from here onwards. assert(require("kong.tools.dns")(conf)) diff --git a/kong/init.lua b/kong/init.lua index 70abad8b59c07..3886d75f7af81 100644 --- a/kong/init.lua +++ b/kong/init.lua @@ -971,7 +971,7 @@ function Kong.init_worker() end if is_not_control_plane then - plugin_servers.start() + assert(plugin_servers.start()) end if kong.clustering then @@ -1008,7 +1008,7 @@ end function Kong.exit_worker() if process.type() ~= "privileged agent" and not is_control_plane(kong.configuration) then - plugin_servers.stop() + assert(plugin_servers.stop()) end end diff --git a/kong/runloop/plugin_servers/init.lua b/kong/runloop/plugin_servers/init.lua index 316bb11012cb5..700591a2c4038 100644 --- a/kong/runloop/plugin_servers/init.lua +++ b/kong/runloop/plugin_servers/init.lua @@ -1,372 +1,45 @@ - local proc_mgmt = require "kong.runloop.plugin_servers.process" -local cjson = require "cjson.safe" -local clone = require "table.clone" -local ngx_ssl = require "ngx.ssl" -local SIGTERM = 15 +local plugin = require "kong.runloop.plugin_servers.plugin" -local type = type local pairs = pairs -local ipairs = ipairs -local tonumber = tonumber - -local ngx = ngx local kong = kong -local ngx_var = ngx.var -local ngx_sleep = ngx.sleep -local worker_id = ngx.worker.id - -local coroutine_running = coroutine.running -local get_plugin_info = proc_mgmt.get_plugin_info -local get_ctx_table = require("resty.core.ctx").get_ctx_table - -local cjson_encode = cjson.encode -local native_timer_at = _G.native_timer_at or ngx.timer.at - -local req_start_time -local req_get_headers -local resp_get_headers - -if ngx.config.subsystem == "http" then - req_start_time = ngx.req.start_time - req_get_headers = ngx.req.get_headers - resp_get_headers = ngx.resp.get_headers - -else - local NOOP = function() end - - req_start_time = NOOP - req_get_headers = NOOP - resp_get_headers = NOOP -end - -local SLEEP_STEP = 0.1 -local WAIT_TIME = 10 -local MAX_WAIT_STEPS = WAIT_TIME / SLEEP_STEP - ---- keep request data a bit longer, into the log timer -local save_for_later = {} - ---- handle notifications from pluginservers -local rpc_notifications = {} - ---- currently running plugin instances -local running_instances = {} - -local function get_saved() - return save_for_later[coroutine_running()] -end - -local exposed_api = { - kong = kong, - - ["kong.log.serialize"] = function() - local saved = get_saved() - return cjson_encode(saved and saved.serialize_data or kong.log.serialize()) - end, - - ["kong.nginx.get_var"] = function(v) - return ngx_var[v] - end, - - ["kong.nginx.get_tls1_version_str"] = ngx_ssl.get_tls1_version_str, - - ["kong.nginx.get_ctx"] = function(k) - local saved = get_saved() - local ngx_ctx = saved and saved.ngx_ctx or ngx.ctx - return ngx_ctx[k] - end, - - ["kong.nginx.set_ctx"] = function(k, v) - local saved = get_saved() - local ngx_ctx = saved and saved.ngx_ctx or ngx.ctx - ngx_ctx[k] = v - end, - - ["kong.ctx.shared.get"] = function(k) - local saved = get_saved() - local ctx_shared = saved and saved.ctx_shared or kong.ctx.shared - return ctx_shared[k] - end, - - ["kong.ctx.shared.set"] = function(k, v) - local saved = get_saved() - local ctx_shared = saved and saved.ctx_shared or kong.ctx.shared - ctx_shared[k] = v - end, - - ["kong.request.get_headers"] = function(max) - local saved = get_saved() - return saved and saved.request_headers or kong.request.get_headers(max) - end, - - ["kong.request.get_header"] = function(name) - local saved = get_saved() - if not saved then - return kong.request.get_header(name) - end - - local header_value = saved.request_headers[name] - if type(header_value) == "table" then - header_value = header_value[1] - end - - return header_value - end, - - ["kong.request.get_uri_captures"] = function() - local saved = get_saved() - local ngx_ctx = saved and saved.ngx_ctx or ngx.ctx - return kong.request.get_uri_captures(ngx_ctx) - end, - - ["kong.response.get_status"] = function() - local saved = get_saved() - return saved and saved.response_status or kong.response.get_status() - end, - - ["kong.response.get_headers"] = function(max) - local saved = get_saved() - return saved and saved.response_headers or kong.response.get_headers(max) - end, - - ["kong.response.get_header"] = function(name) - local saved = get_saved() - if not saved then - return kong.response.get_header(name) - end - - local header_value = saved.response_headers and saved.response_headers[name] - if type(header_value) == "table" then - header_value = header_value[1] - end - - return header_value - end, - - ["kong.response.get_source"] = function() - local saved = get_saved() - return kong.response.get_source(saved and saved.ngx_ctx or nil) - end, - - ["kong.nginx.req_start_time"] = function() - local saved = get_saved() - return saved and saved.req_start_time or req_start_time() - end, -} - - -local get_instance_id -local reset_instance -local reset_instances_for_plugin - -local protocol_implementations = { - ["MsgPack:1"] = "kong.runloop.plugin_servers.mp_rpc", - ["ProtoBuf:1"] = "kong.runloop.plugin_servers.pb_rpc", -} - -local function get_server_rpc(server_def) - if not server_def.rpc then - - local rpc_modname = protocol_implementations[server_def.protocol] - if not rpc_modname then - kong.log.err("Unknown protocol implementation: ", server_def.protocol) - return nil, "Unknown protocol implementation" - end - - local rpc = require (rpc_modname) - rpc.get_instance_id = rpc.get_instance_id or get_instance_id - rpc.reset_instance = rpc.reset_instance or reset_instance - rpc.save_for_later = rpc.save_for_later or save_for_later - rpc.exposed_api = rpc.exposed_api or exposed_api - - server_def.rpc = rpc.new(server_def.socket, rpc_notifications) - end - - return server_def.rpc -end - - ---- get_instance_id: gets an ID to reference a plugin instance running in a ---- pluginserver each configuration in the database is handled by a different ---- instance. Biggest complexity here is due to the remote (and thus non-atomic ---- and fallible) operation of starting the instance at the server. -function get_instance_id(plugin_name, conf) - local key = type(conf) == "table" and kong.plugin.get_id() or plugin_name - local instance_info = running_instances[key] +-- module cache of loaded external plugins +-- XXX historically, this list of plugins has not been invalidated; +-- however, as plugin servers can be managed externally, users may also +-- change and restart the plugin server, potentially with new configurations +-- this needs to be improved -- docs and code hardening +local loaded_plugins - local wait_count = 0 - while instance_info and not instance_info.id do - -- some other thread is already starting an instance - -- prevent busy-waiting - ngx_sleep(SLEEP_STEP) - - -- to prevent a potential dead loop when someone failed to release the ID - wait_count = wait_count + 1 - if wait_count > MAX_WAIT_STEPS then - running_instances[key] = nil - return nil, "Could not claim instance_id for " .. plugin_name .. " (key: " .. key .. ")" - end - instance_info = running_instances[key] - end - - if instance_info - and instance_info.id - and instance_info.seq == conf.__seq__ - and instance_info.conf and instance_info.conf.__plugin_id == key - then - -- exact match, return it - return instance_info.id - end - - local old_instance_id = instance_info and instance_info.id - if not instance_info then - -- we're the first, put something to claim - instance_info = { - conf = conf, - seq = conf.__seq__, - } - running_instances[key] = instance_info - else - - -- there already was something, make it evident that we're changing it - instance_info.id = nil - end - - local plugin_info = get_plugin_info(plugin_name) - local server_rpc = get_server_rpc(plugin_info.server_def) - - local new_instance_info, err = server_rpc:call_start_instance(plugin_name, conf) - if new_instance_info == nil then - kong.log.err("starting instance: ", err) - -- remove claim, some other thread might succeed - running_instances[key] = nil - error(err) - end - - instance_info.id = new_instance_info.id - instance_info.plugin_name = plugin_name - instance_info.conf = new_instance_info.conf - instance_info.seq = new_instance_info.seq - instance_info.Config = new_instance_info.Config - instance_info.rpc = new_instance_info.rpc - - if old_instance_id then - -- there was a previous instance with same key, close it - server_rpc:call_close_instance(old_instance_id) - -- don't care if there's an error, maybe other thread closed it first. +local function load_external_plugins() + if loaded_plugins then + return true end - return instance_info.id -end - -function reset_instances_for_plugin(plugin_name) - for k, instance in pairs(running_instances) do - if instance.plugin_name == plugin_name then - running_instances[k] = nil - end - end -end + loaded_plugins = {} ---- reset_instance: removes an instance from the table. -function reset_instance(plugin_name, conf) - -- - -- the same plugin (which acts as a plugin server) is shared among - -- instances of the plugin; for example, the same plugin can be applied - -- to many routes - -- `reset_instance` is called when (but not only) the plugin server died; - -- in such case, all associated instances must be removed, not only the current - -- - reset_instances_for_plugin(plugin_name) + local kong_config = kong.configuration - local ok, err = kong.worker_events.post("plugin_server", "reset_instances", { plugin_name = plugin_name }) - if not ok then - kong.log.err("failed to post plugin_server reset_instances event: ", err) + local plugins_info, err = proc_mgmt.load_external_plugins_info(kong_config) + if not plugins_info then + return nil, "failed loading external plugins: " .. err end -end - ---- serverPid notification sent by the pluginserver. if it changes, ---- all instances tied to this RPC socket should be restarted. -function rpc_notifications:serverPid(n) - n = tonumber(n) - if self.pluginserver_pid and n ~= self.pluginserver_pid then - for key, instance in pairs(running_instances) do - if instance.rpc == self then - running_instances[key] = nil - end - end + for plugin_name, plugin_info in pairs(plugins_info) do + local plugin = plugin.new(plugin_info) + loaded_plugins[plugin_name] = plugin end - self.pluginserver_pid = n + return loaded_plugins end - - - - ---- Phase closures -local function build_phases(plugin) - if not plugin then - return - end - - local server_rpc = get_server_rpc(plugin.server_def) - - for _, phase in ipairs(plugin.phases) do - if phase == "log" then - plugin[phase] = function(self, conf) - native_timer_at(0, function(premature, saved) - if premature then - return - end - get_ctx_table(saved.ngx_ctx) - local co = coroutine_running() - save_for_later[co] = saved - server_rpc:handle_event(self.name, conf, phase) - save_for_later[co] = nil - end, { - plugin_name = self.name, - serialize_data = kong.log.serialize(), - ngx_ctx = clone(ngx.ctx), - ctx_shared = kong.ctx.shared, - request_headers = req_get_headers(), - response_headers = resp_get_headers(), - response_status = ngx.status, - req_start_time = req_start_time(), - }) - end - - else - plugin[phase] = function(self, conf) - server_rpc:handle_event(self.name, conf, phase) - end - end - end - - return plugin -end - - - ---- module table -local plugin_servers = {} - - -local loaded_plugins = {} - local function get_plugin(plugin_name) - kong = kong or _G.kong -- some CLI cmds set the global after loading the module. - if not loaded_plugins[plugin_name] then - local plugin = get_plugin_info(plugin_name) - loaded_plugins[plugin_name] = build_phases(plugin) - end + assert(load_external_plugins()) return loaded_plugins[plugin_name] end -function plugin_servers.load_plugin(plugin_name) +local function load_plugin(plugin_name) local plugin = get_plugin(plugin_name) if plugin and plugin.PRIORITY then return true, plugin @@ -375,7 +48,7 @@ function plugin_servers.load_plugin(plugin_name) return false, "no plugin found" end -function plugin_servers.load_schema(plugin_name) +local function load_schema(plugin_name) local plugin = get_plugin(plugin_name) if plugin and plugin.PRIORITY then return true, plugin.schema @@ -384,36 +57,53 @@ function plugin_servers.load_schema(plugin_name) return false, "no plugin found" end - -function plugin_servers.start() - if worker_id() ~= 0 then - return - end - - local pluginserver_timer = proc_mgmt.pluginserver_timer - - for _, server_def in ipairs(proc_mgmt.get_server_defs()) do - if server_def.start_command then - native_timer_at(0, pluginserver_timer, server_def) - end - end - +local function start() -- in case plugin server restarts, all workers need to update their defs kong.worker_events.register(function (data) - reset_instances_for_plugin(data.plugin_name) + plugin.reset_instances_for_plugin(data.plugin_name) end, "plugin_server", "reset_instances") + + assert(proc_mgmt.start_pluginservers()) + + return true end -function plugin_servers.stop() - if worker_id() ~= 0 then - return - end +local function stop() + assert(proc_mgmt.stop_pluginservers()) - for _, server_def in ipairs(proc_mgmt.get_server_defs()) do - if server_def.proc then - server_def.proc:kill(SIGTERM) - end - end + return true end -return plugin_servers + +-- +-- This modules sole responsibility is to +-- manage external plugins: starting/stopping plugins servers, +-- and return plugins info (such as schema and their loaded representations) +-- +-- The general initialization flow is: +-- - kong.init: calls start and stop to start/stop external plugins servers +-- - kong.db.schema.plugin_loader: calls load_schema to get an external plugin schema +-- - kong.db.dao.plugins: calls load_plugin to get the expected representation of a plugin +-- (phase handlers, priority, etc) +-- +-- Internal flow: +-- .plugin_servers.init: loads all external plugins, by calling .plugin_servers.process and .plugin_servers.plugin +-- .plugin_servers.process: queries external plugins info with the command specified in _query_cmd proeprties +-- .plugin_servers.plugin: with info obtained as described above, .plugin:new returns a kong-compatible representation +-- of an external plugin, with phase handlers, PRIORITY, and wrappers to the PDK. Calls +-- .plugin_servers.rpc to create an RPC through which Kong communicates with the plugin process +-- .plugin_servers.rpc: based on info contained in the plugin (protocol field), creates the correct RPC for the +-- given external plugin +-- .plugin_servers.rpc.pb_rpc: protobuf rpc implementation - used by Golang +-- .plugin_servers.rpc.mp.rpc: messagepack rpc implementation - used by JS and Python +-- .plugin_servers.init: calls .plugin_servers.process to start external plugin servers +-- .plugin_servers.process: optionally starts all external plugin servers (if a _start_cmd is found) +-- uses the resty pipe API to manage the external plugin process +-- + +return { + start = start, + stop = stop, + load_schema = load_schema, + load_plugin = load_plugin, +} \ No newline at end of file diff --git a/kong/runloop/plugin_servers/plugin.lua b/kong/runloop/plugin_servers/plugin.lua new file mode 100644 index 0000000000000..f8f9465feca80 --- /dev/null +++ b/kong/runloop/plugin_servers/plugin.lua @@ -0,0 +1,346 @@ +local cjson = require "cjson.safe" +local ngx_ssl = require "ngx.ssl" +local clone = require "table.clone" +local rpc = require "kong.runloop.plugin_servers.rpc" + +local type = type +local ngx_sleep = ngx.sleep +local ngx_var = ngx.var +local cjson_encode = cjson.encode +local ipairs = ipairs +local coroutine_running = coroutine.running +local get_ctx_table = require("resty.core.ctx").get_ctx_table +local native_timer_at = _G.native_timer_at or ngx.timer.at + +--- keep request data a bit longer, into the log timer +local save_for_later = {} + +--- currently running plugin instances +local running_instances = {} + +local req_start_time +local req_get_headers +local resp_get_headers + +if ngx.config.subsystem == "http" then + req_start_time = ngx.req.start_time + req_get_headers = ngx.req.get_headers + resp_get_headers = ngx.resp.get_headers + +else + local NOOP = function() end + + req_start_time = NOOP + req_get_headers = NOOP + resp_get_headers = NOOP +end + +local function get_saved() + return save_for_later[coroutine_running()] +end + +local exposed_api = { + kong = kong, + + get_saved_for_later = get_saved, + + ["kong.log.serialize"] = function() + local saved = get_saved() + return cjson_encode(saved and saved.serialize_data or kong.log.serialize()) + end, + + ["kong.nginx.get_var"] = function(v) + return ngx_var[v] + end, + + ["kong.nginx.get_tls1_version_str"] = ngx_ssl.get_tls1_version_str, + + ["kong.nginx.get_ctx"] = function(k) + local saved = get_saved() + local ngx_ctx = saved and saved.ngx_ctx or ngx.ctx + return ngx_ctx[k] + end, + + ["kong.nginx.set_ctx"] = function(k, v) + local saved = get_saved() + local ngx_ctx = saved and saved.ngx_ctx or ngx.ctx + ngx_ctx[k] = v + end, + + ["kong.ctx.shared.get"] = function(k) + local saved = get_saved() + local ctx_shared = saved and saved.ctx_shared or kong.ctx.shared + return ctx_shared[k] + end, + + ["kong.ctx.shared.set"] = function(k, v) + local saved = get_saved() + local ctx_shared = saved and saved.ctx_shared or kong.ctx.shared + ctx_shared[k] = v + end, + + ["kong.request.get_headers"] = function(max) + local saved = get_saved() + return saved and saved.request_headers or kong.request.get_headers(max) + end, + + ["kong.request.get_header"] = function(name) + local saved = get_saved() + if not saved then + return kong.request.get_header(name) + end + + local header_value = saved.request_headers[name] + if type(header_value) == "table" then + header_value = header_value[1] + end + + return header_value + end, + + ["kong.request.get_uri_captures"] = function() + local saved = get_saved() + local ngx_ctx = saved and saved.ngx_ctx or ngx.ctx + return kong.request.get_uri_captures(ngx_ctx) + end, + + ["kong.response.get_status"] = function() + local saved = get_saved() + return saved and saved.response_status or kong.response.get_status() + end, + + ["kong.response.get_headers"] = function(max) + local saved = get_saved() + return saved and saved.response_headers or kong.response.get_headers(max) + end, + + ["kong.response.get_header"] = function(name) + local saved = get_saved() + if not saved then + return kong.response.get_header(name) + end + + local header_value = saved.response_headers and saved.response_headers[name] + if type(header_value) == "table" then + header_value = header_value[1] + end + + return header_value + end, + + ["kong.response.get_source"] = function() + local saved = get_saved() + return kong.response.get_source(saved and saved.ngx_ctx or nil) + end, + + ["kong.nginx.req_start_time"] = function() + local saved = get_saved() + return saved and saved.req_start_time or req_start_time() + end, +} + + +--- Phase closures +local function build_phases(plugin) + if not plugin then + return + end + + for _, phase in ipairs(plugin.phases) do + if phase == "log" then + plugin[phase] = function(self, conf) + native_timer_at(0, function(premature, saved) + if premature then + return + end + get_ctx_table(saved.ngx_ctx) + local co = coroutine_running() + save_for_later[co] = saved + plugin.rpc:handle_event(conf, phase) + save_for_later[co] = nil + end, { + plugin_name = self.name, + serialize_data = kong.log.serialize(), + ngx_ctx = clone(ngx.ctx), + ctx_shared = kong.ctx.shared, + request_headers = req_get_headers(), + response_headers = resp_get_headers(), + response_status = ngx.status, + req_start_time = req_start_time(), + }) + end + + else + plugin[phase] = function(self, conf) + plugin.rpc:handle_event(conf, phase) + end + end + end + + return plugin +end + +--- handle notifications from pluginservers +local rpc_notifications = {} + +--- serverPid notification sent by the pluginserver. if it changes, +--- all instances tied to this RPC socket should be restarted. +function rpc_notifications:serverPid(n) + n = tonumber(n) + if self.pluginserver_pid and n ~= self.pluginserver_pid then + for key, instance in pairs(running_instances) do + if instance.rpc == self then + running_instances[key] = nil + end + end + end + + self.pluginserver_pid = n +end + +local function reset_instances_for_plugin(plugin_name) + for k, instance in pairs(running_instances) do + if instance.plugin_name == plugin_name then + running_instances[k] = nil + end + end +end + +--- reset_instance: removes an instance from the table. +local function reset_instance(plugin_name, conf) + -- + -- the same plugin (which acts as a plugin server) is shared among + -- instances of the plugin; for example, the same plugin can be applied + -- to many routes + -- `reset_instance` is called when (but not only) the plugin server died; + -- in such case, all associated instances must be removed, not only the current + -- + reset_instances_for_plugin(plugin_name) + + local ok, err = kong.worker_events.post("plugin_server", "reset_instances", { plugin_name = plugin_name }) + if not ok then + kong.log.err("failed to post plugin_server reset_instances event: ", err) + end +end + +local get_instance_id + +do + local SLEEP_STEP = 0.1 + local WAIT_TIME = 10 + local MAX_WAIT_STEPS = WAIT_TIME / SLEEP_STEP + + --- get_instance_id: gets an ID to reference a plugin instance running in the + --- pluginserver; each configuration of a plugin is handled by a different + --- instance. Biggest complexity here is due to the remote (and thus non-atomic + --- and fallible) operation of starting the instance at the server. + function get_instance_id(plugin, conf) + local plugin_name = plugin.name + + local key = kong.plugin.get_id() + local instance_info = running_instances[key] + + local wait_count = 0 + while instance_info and not instance_info.id do + -- some other thread is already starting an instance + -- prevent busy-waiting + ngx_sleep(SLEEP_STEP) + + -- to prevent a potential dead loop when someone failed to release the ID + wait_count = wait_count + 1 + if wait_count > MAX_WAIT_STEPS then + running_instances[key] = nil + return nil, "Could not claim instance_id for " .. plugin_name .. " (key: " .. key .. ")" + end + instance_info = running_instances[key] + end + + if instance_info + and instance_info.id + and instance_info.seq == conf.__seq__ + and instance_info.conf and instance_info.conf.__plugin_id == key + then + -- exact match, return it + return instance_info.id + end + + local old_instance_id = instance_info and instance_info.id + if not instance_info then + -- we're the first, put something to claim + instance_info = { + conf = conf, + seq = conf.__seq__, + } + running_instances[key] = instance_info + else + + -- there already was something, make it evident that we're changing it + instance_info.id = nil + end + + local new_instance_info, err = plugin.rpc:call_start_instance(plugin_name, conf) + if new_instance_info == nil then + kong.log.err("starting instance: ", err) + -- remove claim, some other thread might succeed + running_instances[key] = nil + error(err) + end + + instance_info.id = new_instance_info.id + instance_info.plugin_name = plugin_name + instance_info.conf = new_instance_info.conf + instance_info.seq = new_instance_info.seq + instance_info.Config = new_instance_info.Config + instance_info.rpc = new_instance_info.rpc + + if old_instance_id then + -- there was a previous instance with same key, close it + plugin.rpc:call_close_instance(old_instance_id) + -- don't care if there's an error, maybe other thread closed it first. + end + + return instance_info.id + end +end + +-- +-- instance callbacks manage the state of a plugin instance +-- - get_instance_id (which also starts and instance) +-- - reset_instance, which removes an instance from the local cache +-- +local instance_callbacks = { + reset_instance = reset_instance, + get_instance_id = get_instance_id, +} + +local function new(plugin_info) + -- + -- plugin_info + -- * name + -- * priority + -- * version + -- * schema + -- * phases + -- * server_def + -- + + local self = build_phases(plugin_info) + self.instance_callbacks = instance_callbacks + self.exposed_api = exposed_api + self.rpc_notifications = rpc_notifications + + local plugin_rpc, err = rpc.new(self) + if not rpc then + return nil, err + end + + self.rpc = plugin_rpc + + return self +end + + +return { + new = new, + reset_instances_for_plugin = reset_instances_for_plugin, +} diff --git a/kong/runloop/plugin_servers/process.lua b/kong/runloop/plugin_servers/process.lua index 79c5ee44c7b1c..bde565d8c5cf5 100644 --- a/kong/runloop/plugin_servers/process.lua +++ b/kong/runloop/plugin_servers/process.lua @@ -1,81 +1,19 @@ local cjson = require "cjson.safe" -local pl_path = require "pl.path" local raw_log = require "ngx.errlog".raw_log -local is_not_http_subsystem = ngx.config.subsystem ~= "http" - +local worker_id = ngx.worker.id +local native_timer_at = _G.native_timer_at or ngx.timer.at local _, ngx_pipe = pcall(require, "ngx.pipe") - local kong = kong local ngx_INFO = ngx.INFO local cjson_decode = cjson.decode +local SIGTERM = 15 -local proc_mgmt = {} - -local _servers -local _plugin_infos - ---[[ - -Configuration - -We require three settings to communicate with each pluginserver. To make it -fit in the config structure, use a dynamic namespace and generous defaults. - -- pluginserver_names: a list of names, one for each pluginserver. - -- pluginserver_XXX_socket: unix socket to communicate with the pluginserver. -- pluginserver_XXX_start_cmd: command line to strat the pluginserver. -- pluginserver_XXX_query_cmd: command line to query the pluginserver. - -Note: the `_start_cmd` and `_query_cmd` are set to the defaults only if -they exist on the filesystem. If omitted and the default doesn't exist, -they're disabled. - -A disabled `_start_cmd` (unset and the default doesn't exist in the filesystem) -means this process isn't managed by Kong. It's expected that the socket -still works, supposedly handled by an externally-managed process. - -A disable `_query_cmd` means it won't be queried and so the corresponding -socket wouldn't be used, even if the process is managed (if the `_start_cmd` -is valid). Currently this has no use, but it could eventually be added via -other means, perhaps dynamically. - ---]] - -local function ifexists(path) - if pl_path.exists(path) then - return path - end -end +local _M = {} -local function get_server_defs() - local config = kong.configuration - - if not _servers then - _servers = {} - - for i, name in ipairs(config.pluginserver_names) do - name = name:lower() - kong.log.debug("search config for pluginserver named: ", name) - local env_prefix = "pluginserver_" .. name:gsub("-", "_") - _servers[i] = { - name = name, - socket = config[env_prefix .. "_socket"] or "/usr/local/kong/" .. name .. ".socket", - start_command = config[env_prefix .. "_start_cmd"] or ifexists("/usr/local/bin/"..name), - query_command = config[env_prefix .. "_query_cmd"] or ifexists("/usr/local/bin/query_"..name), - } - end - end - - return _servers -end - -proc_mgmt.get_server_defs = get_server_defs --[[ - Plugin info requests Disclaimer: The best way to do it is to have "ListPlugins()" and "GetInfo(plugin)" @@ -103,69 +41,66 @@ defining the name, priority, version, schema and phases of one plugin. This array should describe all plugins currently available through this server, no matter if actually enabled in Kong's configuration or not. - --]] - -local function register_plugin_info(server_def, plugin_info) - if _plugin_infos[plugin_info.Name] then - kong.log.err(string.format("Duplicate plugin name [%s] by %s and %s", - plugin_info.Name, _plugin_infos[plugin_info.Name].server_def.name, server_def.name)) - return - end - - _plugin_infos[plugin_info.Name] = { - server_def = server_def, - --rpc = server_def.rpc, - name = plugin_info.Name, - PRIORITY = plugin_info.Priority, - VERSION = plugin_info.Version, - schema = plugin_info.Schema, - phases = plugin_info.Phases, - } -end - -local function ask_info(server_def) +local function query_external_plugin_info(server_def) if not server_def.query_command then - kong.log.info(string.format("No info query for %s", server_def.name)) - return + return nil, "no info query for " .. server_def.name end local fd, err = io.popen(server_def.query_command) if not fd then - local msg = string.format("loading plugins info from [%s]:\n", server_def.name) - kong.log.err(msg, err) - return + return nil, string.format("error loading plugins info from [%s]: %s", server_def.name, err) end local infos_dump = fd:read("*a") fd:close() - local dump = cjson_decode(infos_dump) + local dump, err = cjson_decode(infos_dump) + if err then + return nil, "failed decoding plugin info: " .. err + end + if type(dump) ~= "table" then - error(string.format("Not a plugin info table: \n%s\n%s", - server_def.query_command, infos_dump)) - return + return nil, string.format("not a plugin info table: \n%s\n%s", server_def.query_command, infos_dump) end server_def.protocol = dump.Protocol or "MsgPack:1" - local infos = dump.Plugins or dump - - for _, plugin_info in ipairs(infos) do - register_plugin_info(server_def, plugin_info) - end + local info = (dump.Plugins or dump)[1] -- XXX can a pluginserver (in the embedded plugin server world + -- have more than one plugin? only a single + -- configuration is initialized currently, so this + -- seems to be legacy code) + + -- in remote times, a plugin server could serve more than one plugin + -- nowadays (2.8+), external plugins use an "embedded pluginserver" model, where + -- each plugin acts as an independent plugin server + return { + server_def = server_def, + name = info.Name, + PRIORITY = info.Priority, + VERSION = info.Version, + schema = info.Schema, + phases = info.Phases, + } end -function proc_mgmt.get_plugin_info(plugin_name) - if not _plugin_infos then - kong = kong or _G.kong -- some CLI cmds set the global after loading the module. - _plugin_infos = {} - for _, server_def in ipairs(get_server_defs()) do - ask_info(server_def) +function _M.load_external_plugins_info(kong_conf) + local available_external_plugins = {} + + kong.log.notice("[pluginserver] loading external plugins info") + + for _, pluginserver in ipairs(kong_conf.pluginservers) do + local plugin_info, err = query_external_plugin_info(pluginserver) + if not plugin_info then + return nil, err end + + available_external_plugins[plugin_info.name] = plugin_info end - return _plugin_infos[plugin_name] + kong.log.notice("[pluginserver] loaded #", #kong_conf.pluginservers, " external plugins info") + + return available_external_plugins end @@ -179,7 +114,6 @@ event and respawns the server. If the `_start_cmd` is unset (and the default doesn't exist in the filesystem) it's assumed the process is managed externally. - --]] local function grab_logs(proc, name) @@ -198,12 +132,13 @@ local function grab_logs(proc, name) end end -function proc_mgmt.pluginserver_timer(premature, server_def) + +local function pluginserver_timer(premature, server_def) if premature then return end - if is_not_http_subsystem then + if ngx.config.subsystem ~= "http" then return end @@ -214,30 +149,73 @@ function proc_mgmt.pluginserver_timer(premature, server_def) ngx.sleep(next_spawn - ngx.now()) end - kong.log.notice("Starting " .. server_def.name or "") + kong.log.notice("[pluginserver] starting pluginserver process for ", server_def.name or "") server_def.proc = assert(ngx_pipe.spawn("exec " .. server_def.start_command, { merge_stderr = true, })) next_spawn = ngx.now() + 1 server_def.proc:set_timeouts(nil, nil, nil, 0) -- block until something actually happens + kong.log.notice("[pluginserver] started, pid ", server_def.proc:pid()) while true do grab_logs(server_def.proc, server_def.name) local ok, reason, status = server_def.proc:wait() + + -- exited with a non 0 status if ok == false and reason == "exit" and status == 127 then kong.log.err(string.format( - "external pluginserver %q start command %q exited with \"command not found\"", + "[pluginserver] external pluginserver %q start command %q exited with \"command not found\"", server_def.name, server_def.start_command)) break + + -- waited on an exited thread elseif ok ~= nil or reason == "exited" or ngx.worker.exiting() then kong.log.notice("external pluginserver '", server_def.name, "' terminated: ", tostring(reason), " ", tostring(status)) break end + + -- XXX what happens if the process stops with a 0 status code? + end + end + + kong.log.notice("[pluginserver] exiting: pluginserver '", server_def.name, "' not respawned.") +end + + +function _M.start_pluginservers() + local kong_config = kong.configuration + + -- only worker 0 manages plugin server processes + if worker_id() == 0 then -- TODO move to privileged worker? + local pluginserver_timer = pluginserver_timer + + for _, server_def in ipairs(kong_config.pluginservers) do + if server_def.start_command then -- if not defined, we assume it's managed externally + native_timer_at(0, pluginserver_timer, server_def) + end end end - kong.log.notice("Exiting: pluginserver '", server_def.name, "' not respawned.") + + return true end +function _M.stop_pluginservers() + local kong_config = kong.configuration + + -- only worker 0 manages plugin server processes + if worker_id() == 0 then -- TODO move to privileged worker? + for _, server_def in ipairs(kong_config.pluginservers) do + if server_def.proc then + local ok, err = server_def.proc:kill(SIGTERM) + if not ok then + kong.log.error("[pluginserver] failed to stop pluginserver '", server_def.name, ": ", err) + end + kong.log.notice("[pluginserver] successfully stopped pluginserver '", server_def.name, "', pid ", server_def.proc:pid()) + end + end + end + return true +end -return proc_mgmt +return _M diff --git a/kong/runloop/plugin_servers/rpc/init.lua b/kong/runloop/plugin_servers/rpc/init.lua new file mode 100644 index 0000000000000..c30b47b8370a1 --- /dev/null +++ b/kong/runloop/plugin_servers/rpc/init.lua @@ -0,0 +1,22 @@ +local protocol_implementations = { + ["MsgPack:1"] = "kong.runloop.plugin_servers.rpc.mp_rpc", + ["ProtoBuf:1"] = "kong.runloop.plugin_servers.rpc.pb_rpc", +} + +local function new(plugin) + local rpc_modname = protocol_implementations[plugin.server_def.protocol] + if not rpc_modname then + return nil, "unknown protocol implementation: " .. (plugin.server_def.protocol or "nil") + end + + kong.log.notice("[pluginserver] loading protocol ", plugin.server_def.protocol, " for plugin ", plugin.name) + + local rpc_mod = require (rpc_modname) + local rpc = rpc_mod.new(plugin) + + return rpc +end + +return { + new = new, +} diff --git a/kong/runloop/plugin_servers/mp_rpc.lua b/kong/runloop/plugin_servers/rpc/mp_rpc.lua similarity index 84% rename from kong/runloop/plugin_servers/mp_rpc.lua rename to kong/runloop/plugin_servers/rpc/mp_rpc.lua index 0895c44b600fd..ef06faa558301 100644 --- a/kong/runloop/plugin_servers/mp_rpc.lua +++ b/kong/runloop/plugin_servers/rpc/mp_rpc.lua @@ -1,5 +1,6 @@ local kong_global = require "kong.global" local cjson = require "cjson.safe" +local rpc_util = require "kong.runloop.plugin_servers.rpc.util" local _ local msgpack do @@ -27,19 +28,6 @@ local str_find = string.find local Rpc = {} Rpc.__index = Rpc -Rpc.notifications_callbacks = {} - -function Rpc.new(socket_path, notifications) - kong.log.debug("mp_rpc.new: ", socket_path) - return setmetatable({ - socket_path = socket_path, - msg_id = 0, - notifications_callbacks = notifications, - }, Rpc) -end - - - -- add MessagePack empty array/map msgpack.packers['function'] = function (buffer, f) @@ -106,48 +94,30 @@ Kong API exposed to external plugins --]] --- global method search and cache -local function index_table(table, field) - if table[field] then - return table[field] - end - - local res = table - for segment, e in ngx.re.gmatch(field, "\\w+", "jo") do - if res[segment[0]] then - res = res[segment[0]] - else - return nil - end - end - return res -end - - local get_field do local method_cache = {} - function get_field(method) + function get_field(pdk, method) if method_cache[method] then return method_cache[method] else - method_cache[method] = index_table(Rpc.exposed_api, method) + method_cache[method] = rpc_util.index_table(pdk, method) return method_cache[method] end end end -local function call_pdk_method(cmd, args) - local method = get_field(cmd) +local function call_pdk_method(pdk, cmd, args) + local method = get_field(pdk, cmd) if not method then kong.log.err("could not find pdk method: ", cmd) return end - local saved = Rpc.save_for_later[coroutine.running()] + local saved = pdk.get_saved_for_later() if saved and saved.plugin_name then kong_global.set_namespaced_log(kong, saved.plugin_name) end @@ -203,7 +173,7 @@ function Rpc:call(method, ...) self.msg_id = self.msg_id + 1 local msg_id = self.msg_id - local c, err = ngx.socket.connect("unix:" .. self.socket_path) + local c, err = ngx.socket.connect("unix:" .. self.plugin.server_def.socket) if not c then kong.log.err("trying to connect: ", err) return nil, err @@ -278,7 +248,7 @@ end function Rpc:notification(label, args) - local f = self.notifications_callbacks[label] + local f = self.plugin.rpc_notifications[label] if f then f(self, args) end @@ -311,6 +281,7 @@ local function bridge_loop(instance_rpc, instance_id, phase) end local pdk_res, pdk_err = call_pdk_method( + instance_rpc.plugin.exposed_api, step_in.Data.Method, step_in.Data.Args) @@ -327,8 +298,10 @@ local function bridge_loop(instance_rpc, instance_id, phase) end -function Rpc:handle_event(plugin_name, conf, phase) - local instance_id, err = self.get_instance_id(plugin_name, conf) +function Rpc:handle_event(conf, phase) + local plugin_name = self.plugin.name + + local instance_id, err = self.plugin.instance_callbacks.get_instance_id(self.plugin, conf) if not err then _, err = bridge_loop(self, instance_id, phase) end @@ -337,16 +310,24 @@ function Rpc:handle_event(plugin_name, conf, phase) local err_lowered = err:lower() if str_find(err_lowered, "no plugin instance") then - self.reset_instance(plugin_name, conf) + self.plugin.instance_callbacks.reset_instance(plugin_name, conf) kong.log.warn(err) - return self:handle_event(plugin_name, conf, phase) + return self:handle_event(conf, phase) end kong.log.err(err) end end +local function new(plugin) + local self = setmetatable({ + msg_id = 0, + plugin = plugin, + }, Rpc) + return self +end - -return Rpc +return { + new = new, +} diff --git a/kong/runloop/plugin_servers/pb_rpc.lua b/kong/runloop/plugin_servers/rpc/pb_rpc.lua similarity index 88% rename from kong/runloop/plugin_servers/pb_rpc.lua rename to kong/runloop/plugin_servers/rpc/pb_rpc.lua index d05b40ecb2fd3..33b2b9d56d796 100644 --- a/kong/runloop/plugin_servers/pb_rpc.lua +++ b/kong/runloop/plugin_servers/rpc/pb_rpc.lua @@ -3,6 +3,7 @@ local cjson = require "cjson.safe" local grpc_tools = require "kong.tools.grpc" local pb = require "pb" local lpack = require "lua_pack" +local rpc_util = require "kong.runloop.plugin_servers.rpc.util" local ngx = ngx local kong = kong @@ -13,11 +14,9 @@ local st_unpack = lpack.unpack local str_find = string.find local proto_fname = "kong/pluginsocket.proto" - local Rpc = {} Rpc.__index = Rpc - local pb_unwrap do local structpb_value, structpb_list, structpb_struct @@ -176,24 +175,7 @@ do } end - -local function index_table(table, field) - if table[field] then - return table[field] - end - - local res = table - for segment, e in ngx.re.gmatch(field, "\\w+", "jo") do - if res[segment[0]] then - res = res[segment[0]] - else - return nil - end - end - return res -end - -local function load_service() +local function load_service(pdk) local p = grpc_tools.new() local protoc_instance = p.protoc_instance @@ -212,7 +194,7 @@ local function load_service() service[lower_name] = { method_name = method_name, - method = index_table(Rpc.exposed_api, lower_name), + method = rpc_util.index_table(pdk, lower_name), input_type = m.input_type, output_type = m.output_type, } @@ -233,13 +215,13 @@ local function identity_function(x) end -local function call_pdk(method_name, arg) +local function call_pdk(pdk, method_name, arg) local method = rpc_service[method_name] if not method then return nil, ("method %q not found"):format(method_name) end - local saved = Rpc.save_for_later[coroutine.running()] + local saved = pdk.get_saved_for_later() if saved and saved.plugin_name then kong_global.set_namespaced_log(kong, saved.plugin_name) end @@ -283,25 +265,10 @@ local function write_frame(c, msg) assert (c:send(msg)) end -function Rpc.new(socket_path, notifications) - - if not rpc_service then - rpc_service = load_service() - end - - --kong.log.debug("pb_rpc.new: ", socket_path) - return setmetatable({ - socket_path = socket_path, - msg_id = 0, - notifications_callbacks = notifications, - }, Rpc) -end - - function Rpc:call(method, data, do_bridge_loop) self.msg_id = self.msg_id + 1 local msg_id = self.msg_id - local c, err = ngx.socket.connect("unix:" .. self.socket_path) + local c, err = ngx.socket.connect("unix:" .. self.plugin.server_def.socket) if not c then kong.log.err("trying to connect: ", err) return nil, err @@ -336,7 +303,7 @@ function Rpc:call(method, data, do_bridge_loop) end local reply - reply, err = call_pdk(method_name, args) + reply, err = call_pdk(self.plugin.exposed_api, method_name, args) if not reply then return nil, err end @@ -371,7 +338,7 @@ function Rpc:call_start_instance(plugin_name, conf) return nil, err end - kong.log.debug("started plugin server: seq ", conf.__seq__, ", worker ", ngx.worker.id() or -1, ", instance id ", + kong.log.debug("started plugin server: seq ", conf.__seq__, ", worker ", ngx.worker.id(), ", instance id ", status.instance_status.instance_id) return { @@ -390,9 +357,10 @@ function Rpc:call_close_instance(instance_id) end +function Rpc:handle_event(conf, phase) + local plugin_name = self.plugin.name -function Rpc:handle_event(plugin_name, conf, phase) - local instance_id, err = self.get_instance_id(plugin_name, conf) + local instance_id, err = self.plugin.instance_callbacks.get_instance_id(self.plugin, conf) local res if not err then res, err = self:call("cmd_handle_event", { @@ -406,9 +374,9 @@ function Rpc:handle_event(plugin_name, conf, phase) if str_find(err_lowered, "no plugin instance", nil, true) or str_find(err_lowered, "closed", nil, true) then - self.reset_instance(plugin_name, conf) + self.plugin.instance_callbacks.reset_instance(plugin_name, conf) kong.log.warn(err) - return self:handle_event(plugin_name, conf, phase) + return self:handle_event(conf, phase) else kong.log.err("pluginserver error: ", err or "unknown error") @@ -417,5 +385,19 @@ function Rpc:handle_event(plugin_name, conf, phase) end end +local function new(plugin) + if not rpc_service then + rpc_service = load_service(plugin.exposed_api) + end + + local self = setmetatable({ + msg_id = 0, + plugin = plugin, + }, Rpc) + + return self +end -return Rpc +return { + new = new, +} diff --git a/kong/runloop/plugin_servers/rpc/util.lua b/kong/runloop/plugin_servers/rpc/util.lua new file mode 100644 index 0000000000000..c868e1e23a30a --- /dev/null +++ b/kong/runloop/plugin_servers/rpc/util.lua @@ -0,0 +1,19 @@ +local function index_table(table, field) + if table[field] then + return table[field] + end + + local res = table + for segment, e in ngx.re.gmatch(field, "\\w+", "jo") do + if res[segment[0]] then + res = res[segment[0]] + else + return nil + end + end + return res + end + +return { + index_table = index_table, +} diff --git a/spec/01-unit/03-conf_loader_spec.lua b/spec/01-unit/03-conf_loader_spec.lua index 9827fcef10e27..7403a50d4ef5c 100644 --- a/spec/01-unit/03-conf_loader_spec.lua +++ b/spec/01-unit/03-conf_loader_spec.lua @@ -2686,4 +2686,70 @@ describe("Configuration loader", function() end) end) + describe("pluginserver config", function() + describe("fails if", function() + it("no query_command and not found in default location", function() + local _, err = conf_loader(nil, { + pluginserver_names = "gopher", + -- query_command = {}, + }) + assert.is_not_nil(err) + assert.matches("query_command undefined for pluginserver gopher", err) + end) + + -- TODO need one more test case: + -- if no query_command is given, `/usr/local/bin/` is checked + -- we cannot write there in tests - figure out a way to write the test + end) + describe("warns if", function() + it("no start_command (meaning process is externally maintained)", function() + local spy_log = spy.on(log, "warn") + + finally(function() + log.warn:revert() + assert:unregister("matcher", "str_match") + end) + + assert:register("matcher", "str_match", function (_state, arguments) + local expected = arguments[1] + return function(value) + return string.match(value, expected) ~= nil + end + end) + + local _, err = conf_loader(nil, { + pluginserver_names = "gopher", + pluginserver_gopher_query_cmd = "gopher -dump", + }) + assert.is_nil(err) + assert.spy(spy_log).was_called(1) + assert.spy(spy_log).was_called_with("start_command undefined for pluginserver gopher; assuming external process management") + end) + end) + it("fills in default settings", function() + local conf, err = conf_loader(nil, { + pluginserver_names = "gopher", + pluginserver_gopher_query_cmd = "gopher -dump", + pluginserver_gopher_start_cmd = "gopher -p $KONG_PREFIX", + }) + assert.is_nil(err) + assert.same("gopher", conf.pluginservers[1].name) + assert.same("gopher -dump", conf.pluginservers[1].query_command) + assert.same("gopher -p $KONG_PREFIX", conf.pluginservers[1].start_command) + assert.same(conf.prefix .. "/gopher.socket", conf.pluginservers[1].socket) + end) + it("accepts custom settings", function() + local conf, err = conf_loader(nil, { + pluginserver_names = "gopher", + pluginserver_gopher_query_cmd = "gopher -dump", + pluginserver_gopher_start_cmd = "gopher -p $KONG_PREFIX", + pluginserver_gopher_socket = "/foo/bar/gopher.socket", + }) + assert.is_nil(err) + assert.same("gopher", conf.pluginservers[1].name) + assert.same("gopher -dump", conf.pluginservers[1].query_command) + assert.same("gopher -p $KONG_PREFIX", conf.pluginservers[1].start_command) + assert.same("/foo/bar/gopher.socket", conf.pluginservers[1].socket) + end) + end) end) diff --git a/spec/02-integration/10-external-plugins/01-process-management_spec.lua b/spec/02-integration/10-external-plugins/01-process-management_spec.lua new file mode 100644 index 0000000000000..04c983773b7df --- /dev/null +++ b/spec/02-integration/10-external-plugins/01-process-management_spec.lua @@ -0,0 +1,160 @@ +local helpers = require "spec.helpers" + +for _, strategy in helpers.each_strategy() do + describe("manages a pluginserver #" .. strategy, function() + lazy_setup(function() + assert(helpers.get_db_utils(strategy, { + "services", + "routes", + "plugins", + })) + end) + + describe("process management", function() + it("starts/stops an external plugin server [golang]", function() + local kong_prefix = helpers.test_conf.prefix + + assert(helpers.start_kong({ + nginx_conf = "spec/fixtures/custom_nginx.template", + log_level = "notice", + database = strategy, + plugins = "bundled,go-hello", + pluginserver_names = "test", + pluginserver_test_socket = kong_prefix .. "/go-hello.socket", + pluginserver_test_query_cmd = helpers.external_plugins_path .. "/go/go-hello -dump", + pluginserver_test_start_cmd = helpers.external_plugins_path .. "/go/go-hello -kong-prefix " .. kong_prefix, + })) + assert.logfile().has.line([[started, pid [0-9]+]]) + assert(helpers.stop_kong(nil, true)) + assert.logfile().has.line([[successfully stopped pluginserver 'test', pid [0-9]+]]) + end) + + it("starts/stops an external plugin server [python]", function() + local kong_prefix = helpers.test_conf.prefix + + assert(helpers.start_kong({ + nginx_conf = "spec/fixtures/custom_nginx.template", + log_level = "notice", + database = strategy, + plugins = "bundled,py-hello", + pluginserver_names = "test", + pluginserver_test_socket = kong_prefix .. "/py-hello.socket", + pluginserver_test_query_cmd = helpers.external_plugins_path .. "/py/py-hello.py --dump", + pluginserver_test_start_cmd = helpers.external_plugins_path .. "/py/py-hello.py --socket-name py-hello.socket --kong-prefix " .. kong_prefix, + })) + assert.logfile().has.line([[started, pid [0-9]+]]) + assert(helpers.stop_kong(nil, true)) + assert.logfile().has.line([[successfully stopped pluginserver 'test', pid [0-9]+]]) + end) + + it("starts/stops an external plugin server [golang, python]", function() + local kong_prefix = helpers.test_conf.prefix + + assert(helpers.start_kong({ + nginx_conf = "spec/fixtures/custom_nginx.template", + log_level = "notice", + database = strategy, + plugins = "bundled,go-hello,py-hello", + pluginserver_names = "test-go,test-py", + pluginserver_test_go_socket = kong_prefix .. "/go-hello.socket", + pluginserver_test_go_query_cmd = helpers.external_plugins_path .. "/go/go-hello -dump", + pluginserver_test_go_start_cmd = helpers.external_plugins_path .. "/go/go-hello -kong-prefix " .. kong_prefix, + pluginserver_test_py_socket = kong_prefix .. "/py-hello.socket", + pluginserver_test_py_query_cmd = helpers.external_plugins_path .. "/py/py-hello.py --dump", + pluginserver_test_py_start_cmd = helpers.external_plugins_path .. "/py/py-hello.py --socket-name py-hello.socket --kong-prefix " .. kong_prefix, + })) + assert.logfile().has.line([[started, pid [0-9]+]]) + assert(helpers.stop_kong(nil, true)) + assert.logfile().has.line([[successfully stopped pluginserver 'test-go', pid [0-9]+]]) + assert.logfile().has.line([[successfully stopped pluginserver 'test-py', pid [0-9]+]]) + end) + end) + + it("queries plugin info [golang]", function() + local proc_management = require "kong.runloop.plugin_servers.process" + local kong_prefix = helpers.test_conf.prefix + local conf_loader = require "kong.conf_loader" + + local conf, err = conf_loader(nil, { + plugins = "bundled,go-hello", + pluginserver_names = "test", + pluginserver_test_socket = kong_prefix .. "/go-hello.socket", + pluginserver_test_query_cmd = helpers.external_plugins_path .. "/go/go-hello -dump", + pluginserver_test_start_cmd = helpers.external_plugins_path .. "/go/go-hello -kong-prefix " .. kong_prefix, + }) + assert.is_nil(err) + + helpers.build_go_plugins(helpers.external_plugins_path .. "/go") + local plugin_infos = proc_management.load_external_plugins_info(conf) + assert.not_nil(plugin_infos["go-hello"]) + + local info = plugin_infos["go-hello"] + assert.equal(1, info.PRIORITY) + assert.equal("0.1", info.VERSION) + assert.equal("go-hello", info.name) + assert.same({ "access", "response", "log" }, info.phases) + assert.same("ProtoBuf:1", info.server_def.protocol) + end) + + it("queries plugin info [python]", function() + local proc_management = require "kong.runloop.plugin_servers.process" + local kong_prefix = helpers.test_conf.prefix + local conf_loader = require "kong.conf_loader" + + local conf, err = conf_loader(nil, { + plugins = "bundled,py-hello", + pluginserver_names = "test", + pluginserver_test_socket = kong_prefix .. "/py-hello.socket", + pluginserver_test_query_cmd = helpers.external_plugins_path .. "/py/py-hello.py --dump", + pluginserver_test_start_cmd = helpers.external_plugins_path .. "/py/py-hello.py --socket-name py-hello.socket --kong-prefix " .. kong_prefix, + }) + assert.is_nil(err) + + local plugin_infos = proc_management.load_external_plugins_info(conf) + assert.not_nil(plugin_infos["py-hello"]) + + local info = plugin_infos["py-hello"] + assert.equal(100, info.PRIORITY) + assert.equal("0.1.0", info.VERSION) + assert.equal("py-hello", info.name) + assert.same({ "access" }, info.phases) + assert.same("MsgPack:1", info.server_def.protocol) + end) + + it("queries plugin info [golang, python]", function() + local proc_management = require "kong.runloop.plugin_servers.process" + local kong_prefix = helpers.test_conf.prefix + local conf_loader = require "kong.conf_loader" + + local conf, err = conf_loader(nil, { + plugins = "bundled,py-hello", + pluginserver_names = "test-go,test-py", + pluginserver_test_go_socket = kong_prefix .. "/go-hello.socket", + pluginserver_test_go_query_cmd = helpers.external_plugins_path .. "/go/go-hello -dump", + pluginserver_test_go_start_cmd = helpers.external_plugins_path .. "/go/go-hello -kong-prefix " .. kong_prefix, + pluginserver_test_py_socket = kong_prefix .. "/py-hello.socket", + pluginserver_test_py_query_cmd = helpers.external_plugins_path .. "/py/py-hello.py --dump", + pluginserver_test_py_start_cmd = helpers.external_plugins_path .. "/py/py-hello.py --socket-name py-hello.socket --kong-prefix " .. kong_prefix, + }) + assert.is_nil(err) + + local plugin_infos = proc_management.load_external_plugins_info(conf) + assert.not_nil(plugin_infos["go-hello"]) + assert.not_nil(plugin_infos["py-hello"]) + + local go_info = plugin_infos["go-hello"] + assert.equal(1, go_info.PRIORITY) + assert.equal("0.1", go_info.VERSION) + assert.equal("go-hello", go_info.name) + assert.same({ "access", "response", "log" }, go_info.phases) + assert.same("ProtoBuf:1", go_info.server_def.protocol) + + local py_info = plugin_infos["py-hello"] + assert.equal(100, py_info.PRIORITY) + assert.equal("0.1.0", py_info.VERSION) + assert.equal("py-hello", py_info.name) + assert.same({ "access" }, py_info.phases) + assert.same("MsgPack:1", py_info.server_def.protocol) + end) + end) +end diff --git a/spec/02-integration/10-external-plugins/02-execution_spec.lua b/spec/02-integration/10-external-plugins/02-execution_spec.lua new file mode 100644 index 0000000000000..72e6e20982dee --- /dev/null +++ b/spec/02-integration/10-external-plugins/02-execution_spec.lua @@ -0,0 +1,76 @@ +local helpers = require "spec.helpers" + +for _, strategy in helpers.each_strategy() do + describe("anonymous reports for go plugins #" .. strategy, function() + lazy_setup(function() + local bp = assert(helpers.get_db_utils(strategy, { + "services", + "routes", + "plugins", + })) + + assert(bp.services:insert {}) + assert(bp.routes:insert({ + protocols = { "http" }, + paths = { "/" } + })) + + local kong_prefix = helpers.test_conf.prefix + + assert(helpers.start_kong({ + nginx_conf = "spec/fixtures/custom_nginx.template", + database = strategy, + plugins = "bundled,reports-api,go-hello,py-hello", + pluginserver_names = "test-go,test-py", + pluginserver_test_go_socket = kong_prefix .. "/go-hello.socket", + pluginserver_test_go_query_cmd = helpers.external_plugins_path .. "/go/go-hello -dump -kong-prefix " .. kong_prefix, + pluginserver_test_go_start_cmd = helpers.external_plugins_path .. "/go/go-hello -kong-prefix " .. kong_prefix, + pluginserver_test_py_socket = kong_prefix .. "/py-hello.socket", + pluginserver_test_py_query_cmd = helpers.external_plugins_path .. "/py/py-hello.py --dump", + pluginserver_test_py_start_cmd = helpers.external_plugins_path .. "/py/py-hello.py --socket-name py-hello.socket --kong-prefix " .. kong_prefix, + })) + + local admin_client = helpers.admin_client() + + local res = admin_client:post("/plugins", { + headers = { + ["Content-Type"] = "application/json" + }, + body = { + name = "go-hello", + config = { + message = "Kong!" + } + } + }) + assert.res_status(201, res) + + res = admin_client:post("/plugins", { + headers = { + ["Content-Type"] = "application/json" + }, + body = { + name = "py-hello", + config = { + message = "Kong!" + } + } + }) + assert.res_status(201, res) + end) + + lazy_teardown(function() + helpers.stop_kong() + end) + + it("executes external plugins [golang, python]", function() + local proxy_client = assert(helpers.proxy_client()) + local res = proxy_client:get("/") + assert.res_status(200, res) + local h = assert.response(res).has.header("x-hello-from-go") + assert.matches("Go says Kong! to", h) + h = assert.response(res).has.header("x-hello-from-python") + assert.matches("Python says Kong! to", h) + end) + end) +end diff --git a/spec/02-integration/10-go_plugins/01-reports_spec.lua b/spec/02-integration/10-external-plugins/99-reports_spec.lua similarity index 95% rename from spec/02-integration/10-go_plugins/01-reports_spec.lua rename to spec/02-integration/10-external-plugins/99-reports_spec.lua index 6e6d1a3215367..e92c0f51a67d2 100644 --- a/spec/02-integration/10-go_plugins/01-reports_spec.lua +++ b/spec/02-integration/10-external-plugins/99-reports_spec.lua @@ -54,8 +54,8 @@ for _, strategy in helpers.each_strategy() do plugins = "bundled,reports-api,go-hello", pluginserver_names = "test", pluginserver_test_socket = kong_prefix .. "/go-hello.socket", - pluginserver_test_query_cmd = "./spec/fixtures/go/go-hello -dump -kong-prefix " .. kong_prefix, - pluginserver_test_start_cmd = "./spec/fixtures/go/go-hello -kong-prefix " .. kong_prefix, + pluginserver_test_query_cmd = helpers.external_plugins_path .. "/go/go-hello -dump -kong-prefix " .. kong_prefix, + pluginserver_test_start_cmd = helpers.external_plugins_path .. "/go/go-hello -kong-prefix " .. kong_prefix, anonymous_reports = true, })) diff --git a/spec/fixtures/go/go-hello.go b/spec/fixtures/external_plugins/go/go-hello.go similarity index 100% rename from spec/fixtures/go/go-hello.go rename to spec/fixtures/external_plugins/go/go-hello.go diff --git a/spec/fixtures/go/go.mod b/spec/fixtures/external_plugins/go/go.mod similarity index 100% rename from spec/fixtures/go/go.mod rename to spec/fixtures/external_plugins/go/go.mod diff --git a/spec/fixtures/go/go.sum b/spec/fixtures/external_plugins/go/go.sum similarity index 100% rename from spec/fixtures/go/go.sum rename to spec/fixtures/external_plugins/go/go.sum diff --git a/spec/fixtures/external_plugins/js/js-hello.js b/spec/fixtures/external_plugins/js/js-hello.js new file mode 100644 index 0000000000000..b1141644bd626 --- /dev/null +++ b/spec/fixtures/external_plugins/js/js-hello.js @@ -0,0 +1,33 @@ +'use strict'; + +// This is an example plugin that add a header to the response + +class KongPlugin { + constructor(config) { + this.config = config + } + + async access(kong) { + let host = await kong.request.getHeader("host") + if (host === undefined) { + return await kong.log.err("unable to get header for request") + } + + let message = this.config.message || "hello" + + // the following can be "parallel"ed + await Promise.all([ + kong.response.setHeader("x-hello-from-javascript", "Javascript says " + message + " to " + host), + kong.response.setHeader("x-javascript-pid", process.pid), + ]) + } +} + +module.exports = { + Plugin: KongPlugin, + Schema: [ + { message: { type: "string" } }, + ], + Version: '0.1.0', + Priority: 0, +} \ No newline at end of file diff --git a/spec/fixtures/external_plugins/py/py-hello.py b/spec/fixtures/external_plugins/py/py-hello.py new file mode 100755 index 0000000000000..3301e059e1575 --- /dev/null +++ b/spec/fixtures/external_plugins/py/py-hello.py @@ -0,0 +1,37 @@ +#!/usr/bin/env python3 +import os +import kong_pdk.pdk.kong as kong + +Schema = ( + {"message": {"type": "string"}}, +) + +version = '0.1.0' +priority = 100 + +# This is an example plugin that add a header to the response + +class Plugin(object): + def __init__(self, config): + self.config = config + + def access(self, kong: kong.kong): + host, err = kong.request.get_header("host") + if err: + pass # error handling + # if run with --no-lua-style + # try: + # host = kong.request.get_header("host") + # except Exception as ex: + # pass # error handling + message = "hello" + if 'message' in self.config: + message = self.config['message'] + kong.response.set_header("x-hello-from-python", "Python says %s to %s" % (message, host)) + kong.response.set_header("x-python-pid", str(os.getpid())) + + +# add below section to allow this plugin optionally be running in a dedicated process +if __name__ == "__main__": + from kong_pdk.cli import start_dedicated_server + start_dedicated_server("py-hello", Plugin, version, priority, Schema) diff --git a/spec/fixtures/external_plugins/py/requirements.txt b/spec/fixtures/external_plugins/py/requirements.txt new file mode 100644 index 0000000000000..0f887887fd555 --- /dev/null +++ b/spec/fixtures/external_plugins/py/requirements.txt @@ -0,0 +1 @@ +kong-pdk diff --git a/spec/helpers.lua b/spec/helpers.lua index 89273b6e8e57b..22b67c4434d35 100644 --- a/spec/helpers.lua +++ b/spec/helpers.lua @@ -91,7 +91,7 @@ local wait = reload_module("spec.internal.wait") bin_path = CONSTANTS.BIN_PATH, test_conf = conf, test_conf_path = CONSTANTS.TEST_CONF_PATH, - go_plugin_path = CONSTANTS.GO_PLUGIN_PATH, + external_plugins_path = CONSTANTS.EXTERNAL_PLUGINS_PATH, mock_upstream_hostname = CONSTANTS.MOCK_UPSTREAM_HOSTNAME, mock_upstream_protocol = CONSTANTS.MOCK_UPSTREAM_PROTOCOL, mock_upstream_host = CONSTANTS.MOCK_UPSTREAM_HOST, @@ -224,4 +224,6 @@ local wait = reload_module("spec.internal.wait") get_available_port = wait.get_available_port, make_temp_dir = misc.make_temp_dir, + + build_go_plugins = cmd.build_go_plugins, } diff --git a/spec/internal/cmd.lua b/spec/internal/cmd.lua index 916c5593dadcf..1c8bf3b0568d2 100644 --- a/spec/internal/cmd.lua +++ b/spec/internal/cmd.lua @@ -272,8 +272,8 @@ local function start_kong(env, tables, preserve_prefix, fixtures) -- go plugins are enabled -- compile fixture go plugins if any setting mentions it for _,v in pairs(env) do - if type(v) == "string" and v:find(CONSTANTS.GO_PLUGIN_PATH) then - build_go_plugins(CONSTANTS.GO_PLUGIN_PATH) + if type(v) == "string" and v:find(CONSTANTS.EXTERNAL_PLUGINS_PATH .. "/go") then + build_go_plugins(CONSTANTS.EXTERNAL_PLUGINS_PATH .. "/go") break end end @@ -471,5 +471,7 @@ return { kill_all = kill_all, signal = signal, signal_workers = signal_workers, + + build_go_plugins = build_go_plugins, } diff --git a/spec/internal/constants.lua b/spec/internal/constants.lua index 34d1f897c2bcb..bbefa8d5152dc 100644 --- a/spec/internal/constants.lua +++ b/spec/internal/constants.lua @@ -6,7 +6,7 @@ local CONSTANTS = { CUSTOM_PLUGIN_PATH = "./spec/fixtures/custom_plugins/?.lua", CUSTOM_VAULT_PATH = "./spec/fixtures/custom_vaults/?.lua;./spec/fixtures/custom_vaults/?/init.lua", DNS_MOCK_LUA_PATH = "./spec/fixtures/mocks/lua-resty-dns/?.lua", - GO_PLUGIN_PATH = "./spec/fixtures/go", + EXTERNAL_PLUGINS_PATH = "./spec/fixtures/external_plugins", GRPC_TARGET_SRC_PATH = "./spec/fixtures/grpc/target/", MOCK_UPSTREAM_PROTOCOL = "http", MOCK_UPSTREAM_SSL_PROTOCOL = "https",