From cd1d33104285d2e8f127b90bbf1ff4df376bba26 Mon Sep 17 00:00:00 2001 From: Guilherme Salazar Date: Wed, 13 Nov 2024 17:58:24 -0300 Subject: [PATCH] refactor(pluginservers): code refactor & testing (#12858) Context ------- The overall goal of this commit is to refactor the external plugins implementation, with the following goals in mind: - Make plugin server code more approachable to unfamiliar engineers and easier to evolve with confidence - Harden configuration; ensure configuration defects are caught before Kong is started - Extend testing coverage This is related to ongoing work on the Go PDK, with similar goals in mind. Summary ------- This commit implements the following overall changes to plugin server code: - Move configuration related code into conf loader, so that configuration loading and validation happens at startup time, rather than lazily, when plugin data is loaded or pluginservers are started. Add tests for current behavior. - Move process-management code - for starting up plugin servers as well as querying external plugins info - into the `process.lua` module. - Introduce a `kong.runloop.plugin_servers.rpc` module that encapsulates RPC initialization and protocol-specific implementations. This further simplifies the main plugin server main module. - Factor exposed API and phase handlers bridging code into a new `plugin` module, which encapsulates an external plugin representation, including the expected fields for any Kong plugin, plus external plugin-specific bits, such as the RPC instance. Part of this external plugin-specific part is the instance life cycle management. With this structure, the `kong.runloop.plugin_servers` main module contains only general external plugin code, including a list of loaded external plugins, and associated start/stop functions for plugin servers. Testing ------- This commit also implements the following improvements to tests: - Restructure fixtures to accommodate new external plugin servers -- namely, targeting for now in the existing Python and Javascript - Add new test cases for external plugins: * External plugin configuration: add test cases for current behavior; in particular: - Fail if no `query_cmd` is provided; - Warn if no `start_cmd` is provided - this is by design, as external plugins servers can be managed outside of Kong * Plugin server start / stop - for both Go and Python plugins * External plugin info querying for both Go and Python plugins * External plugin execution - for both Go and Python plugins Internal flow ------------- `.plugin_servers.init:` loads all external plugins, by calling .plugin_servers.process and `.plugin_servers.plugin` `.plugin_servers.process`: queries external plugins info with the command specified in `_query_cmd` proeprties `.plugin_servers.plugin`: with info obtained as described above, `.plugin:new` returns a kong-compatible representation of an external plugin, with phase handlers, PRIORITY, and wrappers to the PDK. Calls `.plugin_servers.rpc` to create an RPC through which Kong communicates with the plugin process `.plugin_servers.rpc`: based on info contained in the plugin (protocol field), creates the correct RPC for the given external plugin `.plugin_servers.rpc.pb_rpc`: protobuf rpc implementation - used by Golang `.plugin_servers.rpc.mp.rpc`: messagepack rpc implementation - used by JS and Python `.plugin_servers.init`: calls `.plugin_servers.process` to start external plugin servers `.plugin_servers.process`: optionally starts all external plugin servers (if a `_start_cmd` is found) uses the resty pipe API to manage the external plugin process (cherry picked from commit f88da7df62cb3cbc7dbd4150756571d1b7928198) --- .github/workflows/build_and_test.yml | 2 + .gitignore | 1 + kong-3.9.0-0.rockspec | 7 +- kong.conf.default | 9 +- kong/conf_loader/constants.lua | 4 + kong/conf_loader/init.lua | 43 ++ kong/runloop/plugin_servers/init.lua | 435 +++--------------- kong/runloop/plugin_servers/plugin.lua | 353 ++++++++++++++ kong/runloop/plugin_servers/process.lua | 205 ++++----- kong/runloop/plugin_servers/rpc/init.lua | 29 ++ .../plugin_servers/{ => rpc}/mp_rpc.lua | 69 +-- .../plugin_servers/{ => rpc}/pb_rpc.lua | 74 ++- kong/runloop/plugin_servers/rpc/util.lua | 26 ++ spec/01-unit/03-conf_loader_spec.lua | 76 +++ spec/01-unit/25-msgpack_rpc_spec.lua | 4 +- .../01-process-management_spec.lua | 167 +++++++ .../10-external-plugins/02-execution_spec.lua | 83 ++++ .../03-wasm_spec.lua | 0 .../99-reports_spec.lua} | 4 +- .../{ => external_plugins}/go/go-hello.go | 0 .../fixtures/{ => external_plugins}/go/go.mod | 0 .../fixtures/{ => external_plugins}/go/go.sum | 0 spec/fixtures/external_plugins/js/js-hello.js | 33 ++ spec/fixtures/external_plugins/py/py-hello.py | 37 ++ .../external_plugins/py/requirements.txt | 1 + spec/helpers.lua | 4 +- spec/internal/cmd.lua | 6 +- spec/internal/constants.lua | 2 +- 28 files changed, 1084 insertions(+), 590 deletions(-) create mode 100644 kong/runloop/plugin_servers/plugin.lua create mode 100644 kong/runloop/plugin_servers/rpc/init.lua rename kong/runloop/plugin_servers/{ => rpc}/mp_rpc.lua (85%) rename kong/runloop/plugin_servers/{ => rpc}/pb_rpc.lua (89%) create mode 100644 kong/runloop/plugin_servers/rpc/util.lua create mode 100644 spec/02-integration/10-external-plugins/01-process-management_spec.lua create mode 100644 spec/02-integration/10-external-plugins/02-execution_spec.lua rename spec/02-integration/{10-go_plugins => 10-external-plugins}/03-wasm_spec.lua (100%) rename spec/02-integration/{10-go_plugins/01-reports_spec.lua => 10-external-plugins/99-reports_spec.lua} (95%) rename spec/fixtures/{ => external_plugins}/go/go-hello.go (100%) rename spec/fixtures/{ => external_plugins}/go/go.mod (100%) rename spec/fixtures/{ => external_plugins}/go/go.sum (100%) create mode 100644 spec/fixtures/external_plugins/js/js-hello.js create mode 100755 spec/fixtures/external_plugins/py/py-hello.py create mode 100644 spec/fixtures/external_plugins/py/requirements.txt diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index e2dfcd265715..86bcb0ac10e2 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -503,6 +503,8 @@ jobs: BUILD_NAME=${{ env.BUILD_NAME_NORMAL }} make dev LIBRARY_PREFIX=${{ env.BUILD_ROOT }}/${{ env.BUILD_NAME_NORMAL }}/kong # restore /usr/local folder sudo rsync -a --delete /tmp/usr-local-bkp/local/ /usr/local/ + # python pluginserver tests dependency + sudo pip install kong-pdk - name: Extract FIPS artifact contents if: ${{ inputs.aarch64 == false }} diff --git a/.gitignore b/.gitignore index 987409318ddb..06aa24d1b645 100644 --- a/.gitignore +++ b/.gitignore @@ -57,3 +57,4 @@ __pycache__ bazel-* # remove it after migrating from WORKSPACE to Bzlmod MODULE.bazel.lock +spec/fixtures/external_plugins/go/go-hello diff --git a/kong-3.9.0-0.rockspec b/kong-3.9.0-0.rockspec index dc1ab5b58c5f..380114465948 100644 --- a/kong-3.9.0-0.rockspec +++ b/kong-3.9.0-0.rockspec @@ -410,8 +410,11 @@ build = { ["kong.runloop.balancer.upstreams"] = "kong/runloop/balancer/upstreams.lua", ["kong.runloop.plugin_servers"] = "kong/runloop/plugin_servers/init.lua", ["kong.runloop.plugin_servers.process"] = "kong/runloop/plugin_servers/process.lua", - ["kong.runloop.plugin_servers.mp_rpc"] = "kong/runloop/plugin_servers/mp_rpc.lua", - ["kong.runloop.plugin_servers.pb_rpc"] = "kong/runloop/plugin_servers/pb_rpc.lua", + ["kong.runloop.plugin_servers.plugin"] = "kong/runloop/plugin_servers/plugin.lua", + ["kong.runloop.plugin_servers.rpc"] = "kong/runloop/plugin_servers/rpc/init.lua", + ["kong.runloop.plugin_servers.rpc.util"] = "kong/runloop/plugin_servers/rpc/util.lua", + ["kong.runloop.plugin_servers.rpc.mp_rpc"] = "kong/runloop/plugin_servers/rpc/mp_rpc.lua", + ["kong.runloop.plugin_servers.rpc.pb_rpc"] = "kong/runloop/plugin_servers/rpc/pb_rpc.lua", ["kong.runloop.wasm"] = "kong/runloop/wasm.lua", ["kong.runloop.wasm.plugins"] = "kong/runloop/wasm/plugins.lua", ["kong.runloop.wasm.properties"] = "kong/runloop/wasm/properties.lua", diff --git a/kong.conf.default b/kong.conf.default index df2385553052..ce0810e1a9a3 100644 --- a/kong.conf.default +++ b/kong.conf.default @@ -176,14 +176,17 @@ #pluginserver_XXX_socket = /.socket # Path to the unix socket # used by the pluginserver. + #pluginserver_XXX_start_cmd = /usr/local/bin/ # Full command (including # any needed arguments) to - # start the pluginserver + # start the + # pluginserver. + #pluginserver_XXX_query_cmd = /usr/local/bin/query_ # Full command to "query" the # pluginserver. Should # produce a JSON with the - # dump info of all plugins it - # manages + # dump info of the plugin it + # manages. #port_maps = # With this configuration parameter, you can # let Kong Gateway know the port from diff --git a/kong/conf_loader/constants.lua b/kong/conf_loader/constants.lua index c4f44c7119a6..1acf309e9f1e 100644 --- a/kong/conf_loader/constants.lua +++ b/kong/conf_loader/constants.lua @@ -93,6 +93,9 @@ local DEFAULT_PATHS = { } +local DEFAULT_PLUGINSERVER_PATH = "/usr/local/bin" + + local HEADER_KEY_TO_NAME = { ["server_tokens"] = "server_tokens", ["latency_tokens"] = "latency_tokens", @@ -664,6 +667,7 @@ return { CIPHER_SUITES = CIPHER_SUITES, DEFAULT_PATHS = DEFAULT_PATHS, + DEFAULT_PLUGINSERVER_PATH = DEFAULT_PLUGINSERVER_PATH, HEADER_KEY_TO_NAME = HEADER_KEY_TO_NAME, UPSTREAM_HEADER_KEY_TO_NAME = UPSTREAM_HEADER_KEY_TO_NAME, DYNAMIC_KEY_NAMESPACES = DYNAMIC_KEY_NAMESPACES, diff --git a/kong/conf_loader/init.lua b/kong/conf_loader/init.lua index 8d043906fb1e..b6765dbea7b4 100644 --- a/kong/conf_loader/init.lua +++ b/kong/conf_loader/init.lua @@ -997,6 +997,49 @@ local function load(path, custom_conf, opts) conf.active_tracing = false end + -- parse and validate pluginserver directives + if conf.pluginserver_names then + local pluginservers = {} + for i, name in ipairs(conf.pluginserver_names) do + name = name:lower() + local env_prefix = "pluginserver_" .. name:gsub("-", "_") + local socket = conf[env_prefix .. "_socket"] or (conf.prefix .. "/" .. name .. ".socket") + + local start_command = conf[env_prefix .. "_start_cmd"] + local query_command = conf[env_prefix .. "_query_cmd"] + + local default_path = exists(conf_constants.DEFAULT_PLUGINSERVER_PATH .. "/" .. name) + + if not start_command and default_path then + start_command = default_path + end + + if not query_command and default_path then + query_command = default_path .. " -dump" + end + + -- query_command is required + if not query_command then + return nil, "query_command undefined for pluginserver " .. name + end + + -- if start_command is unset, we assume the pluginserver process is + -- managed externally + if not start_command then + log.warn("start_command undefined for pluginserver " .. name .. "; assuming external process management") + end + + pluginservers[i] = { + name = name, + socket = socket, + start_command = start_command, + query_command = query_command, + } + end + + conf.pluginservers = setmetatable(pluginservers, conf_constants._NOP_TOSTRING_MT) + end + -- initialize the dns client, so the globally patched tcp.connect method -- will work from here onwards. assert(require("kong.tools.dns")(conf)) diff --git a/kong/runloop/plugin_servers/init.lua b/kong/runloop/plugin_servers/init.lua index effa06c7af6e..66825a4c644a 100644 --- a/kong/runloop/plugin_servers/init.lua +++ b/kong/runloop/plugin_servers/init.lua @@ -7,373 +7,47 @@ local proc_mgmt = require "kong.runloop.plugin_servers.process" -local cjson = require "cjson.safe" -local clone = require "table.clone" -local ngx_ssl = require "ngx.ssl" -local SIGTERM = 15 +local plugin = require "kong.runloop.plugin_servers.plugin" -local type = type local pairs = pairs -local ipairs = ipairs -local tonumber = tonumber - -local ngx = ngx local kong = kong -local ngx_var = ngx.var -local ngx_sleep = ngx.sleep -local worker_id = ngx.worker.id - -local coroutine_running = coroutine.running -local get_plugin_info = proc_mgmt.get_plugin_info -local get_ctx_table = require("resty.core.ctx").get_ctx_table - -local cjson_encode = cjson.encode -local native_timer_at = _G.native_timer_at or ngx.timer.at - -local req_start_time -local req_get_headers -local resp_get_headers - -if ngx.config.subsystem == "http" then - req_start_time = ngx.req.start_time - req_get_headers = ngx.req.get_headers - resp_get_headers = ngx.resp.get_headers - -else - local NOOP = function() end - - req_start_time = NOOP - req_get_headers = NOOP - resp_get_headers = NOOP -end - -local SLEEP_STEP = 0.1 -local WAIT_TIME = 10 -local MAX_WAIT_STEPS = WAIT_TIME / SLEEP_STEP - ---- keep request data a bit longer, into the log timer -local save_for_later = {} - ---- handle notifications from pluginservers -local rpc_notifications = {} - ---- currently running plugin instances -local running_instances = {} - -local function get_saved() - return save_for_later[coroutine_running()] -end - -local exposed_api = { - kong = kong, - - ["kong.log.serialize"] = function() - local saved = get_saved() - return cjson_encode(saved and saved.serialize_data or kong.log.serialize()) - end, - - ["kong.nginx.get_var"] = function(v) - return ngx_var[v] - end, - - ["kong.nginx.get_tls1_version_str"] = ngx_ssl.get_tls1_version_str, - - ["kong.nginx.get_ctx"] = function(k) - local saved = get_saved() - local ngx_ctx = saved and saved.ngx_ctx or ngx.ctx - return ngx_ctx[k] - end, - - ["kong.nginx.set_ctx"] = function(k, v) - local saved = get_saved() - local ngx_ctx = saved and saved.ngx_ctx or ngx.ctx - ngx_ctx[k] = v - end, - - ["kong.ctx.shared.get"] = function(k) - local saved = get_saved() - local ctx_shared = saved and saved.ctx_shared or kong.ctx.shared - return ctx_shared[k] - end, - - ["kong.ctx.shared.set"] = function(k, v) - local saved = get_saved() - local ctx_shared = saved and saved.ctx_shared or kong.ctx.shared - ctx_shared[k] = v - end, - - ["kong.request.get_headers"] = function(max) - local saved = get_saved() - return saved and saved.request_headers or kong.request.get_headers(max) - end, - - ["kong.request.get_header"] = function(name) - local saved = get_saved() - if not saved then - return kong.request.get_header(name) - end - - local header_value = saved.request_headers[name] - if type(header_value) == "table" then - header_value = header_value[1] - end - - return header_value - end, - - ["kong.request.get_uri_captures"] = function() - local saved = get_saved() - local ngx_ctx = saved and saved.ngx_ctx or ngx.ctx - return kong.request.get_uri_captures(ngx_ctx) - end, - - ["kong.response.get_status"] = function() - local saved = get_saved() - return saved and saved.response_status or kong.response.get_status() - end, - - ["kong.response.get_headers"] = function(max) - local saved = get_saved() - return saved and saved.response_headers or kong.response.get_headers(max) - end, - - ["kong.response.get_header"] = function(name) - local saved = get_saved() - if not saved then - return kong.response.get_header(name) - end - - local header_value = saved.response_headers and saved.response_headers[name] - if type(header_value) == "table" then - header_value = header_value[1] - end - - return header_value - end, - - ["kong.response.get_source"] = function() - local saved = get_saved() - return kong.response.get_source(saved and saved.ngx_ctx or nil) - end, - - ["kong.nginx.req_start_time"] = function() - local saved = get_saved() - return saved and saved.req_start_time or req_start_time() - end, -} - - -local get_instance_id -local reset_instance -local reset_instances_for_plugin - -local protocol_implementations = { - ["MsgPack:1"] = "kong.runloop.plugin_servers.mp_rpc", - ["ProtoBuf:1"] = "kong.runloop.plugin_servers.pb_rpc", -} - -local function get_server_rpc(server_def) - if not server_def.rpc then - - local rpc_modname = protocol_implementations[server_def.protocol] - if not rpc_modname then - kong.log.err("Unknown protocol implementation: ", server_def.protocol) - return nil, "Unknown protocol implementation" - end - - local rpc = require (rpc_modname) - rpc.get_instance_id = rpc.get_instance_id or get_instance_id - rpc.reset_instance = rpc.reset_instance or reset_instance - rpc.save_for_later = rpc.save_for_later or save_for_later - rpc.exposed_api = rpc.exposed_api or exposed_api - - server_def.rpc = rpc.new(server_def.socket, rpc_notifications) - end - - return server_def.rpc -end - - ---- get_instance_id: gets an ID to reference a plugin instance running in a ---- pluginserver each configuration in the database is handled by a different ---- instance. Biggest complexity here is due to the remote (and thus non-atomic ---- and fallible) operation of starting the instance at the server. -function get_instance_id(plugin_name, conf) - local key = type(conf) == "table" and kong.plugin.get_id() or plugin_name - local instance_info = running_instances[key] +-- module cache of loaded external plugins +-- XXX historically, this list of plugins has not been invalidated; +-- however, as plugin servers can be managed externally, users may also +-- change and restart the plugin server, potentially with new configurations +-- this needs to be improved -- docs and code hardening +local loaded_plugins - local wait_count = 0 - while instance_info and not instance_info.id do - -- some other thread is already starting an instance - -- prevent busy-waiting - ngx_sleep(SLEEP_STEP) - - -- to prevent a potential dead loop when someone failed to release the ID - wait_count = wait_count + 1 - if wait_count > MAX_WAIT_STEPS then - running_instances[key] = nil - return nil, "Could not claim instance_id for " .. plugin_name .. " (key: " .. key .. ")" - end - instance_info = running_instances[key] - end - - if instance_info - and instance_info.id - and instance_info.seq == conf.__seq__ - and instance_info.conf and instance_info.conf.__plugin_id == key - then - -- exact match, return it - return instance_info.id - end - - local old_instance_id = instance_info and instance_info.id - if not instance_info then - -- we're the first, put something to claim - instance_info = { - conf = conf, - seq = conf.__seq__, - } - running_instances[key] = instance_info - else - - -- there already was something, make it evident that we're changing it - instance_info.id = nil - end - - local plugin_info = get_plugin_info(plugin_name) - local server_rpc = get_server_rpc(plugin_info.server_def) - - local new_instance_info, err = server_rpc:call_start_instance(plugin_name, conf) - if new_instance_info == nil then - kong.log.err("starting instance: ", err) - -- remove claim, some other thread might succeed - running_instances[key] = nil - error(err) - end - - instance_info.id = new_instance_info.id - instance_info.plugin_name = plugin_name - instance_info.conf = new_instance_info.conf - instance_info.seq = new_instance_info.seq - instance_info.Config = new_instance_info.Config - instance_info.rpc = new_instance_info.rpc - - if old_instance_id then - -- there was a previous instance with same key, close it - server_rpc:call_close_instance(old_instance_id) - -- don't care if there's an error, maybe other thread closed it first. - end - - return instance_info.id -end - -function reset_instances_for_plugin(plugin_name) - for k, instance in pairs(running_instances) do - if instance.plugin_name == plugin_name then - running_instances[k] = nil - end - end -end - ---- reset_instance: removes an instance from the table. -function reset_instance(plugin_name, conf) - -- - -- the same plugin (which acts as a plugin server) is shared among - -- instances of the plugin; for example, the same plugin can be applied - -- to many routes - -- `reset_instance` is called when (but not only) the plugin server died; - -- in such case, all associated instances must be removed, not only the current - -- - reset_instances_for_plugin(plugin_name) - - local ok, err = kong.worker_events.post("plugin_server", "reset_instances", { plugin_name = plugin_name }) - if not ok then - kong.log.err("failed to post plugin_server reset_instances event: ", err) - end -end - - ---- serverPid notification sent by the pluginserver. if it changes, ---- all instances tied to this RPC socket should be restarted. -function rpc_notifications:serverPid(n) - n = tonumber(n) - if self.pluginserver_pid and n ~= self.pluginserver_pid then - for key, instance in pairs(running_instances) do - if instance.rpc == self then - running_instances[key] = nil - end - end +local function load_external_plugins() + if loaded_plugins then + return true end - self.pluginserver_pid = n -end - - + loaded_plugins = {} + local kong_config = kong.configuration - ---- Phase closures -local function build_phases(plugin) - if not plugin then - return + local plugins_info, err = proc_mgmt.load_external_plugins_info(kong_config) + if not plugins_info then + return nil, "failed loading external plugins: " .. err end - local server_rpc = get_server_rpc(plugin.server_def) - - for _, phase in ipairs(plugin.phases) do - if phase == "log" then - plugin[phase] = function(self, conf) - native_timer_at(0, function(premature, saved) - if premature then - return - end - get_ctx_table(saved.ngx_ctx) - local co = coroutine_running() - save_for_later[co] = saved - server_rpc:handle_event(self.name, conf, phase) - save_for_later[co] = nil - end, { - plugin_name = self.name, - serialize_data = kong.log.serialize(), - ngx_ctx = clone(ngx.ctx), - ctx_shared = kong.ctx.shared, - request_headers = req_get_headers(), - response_headers = resp_get_headers(), - response_status = ngx.status, - req_start_time = req_start_time(), - }) - end - - else - plugin[phase] = function(self, conf) - server_rpc:handle_event(self.name, conf, phase) - end - end + for plugin_name, plugin_info in pairs(plugins_info) do + local plugin = plugin.new(plugin_info) + loaded_plugins[plugin_name] = plugin end - return plugin + return loaded_plugins end - - ---- module table -local plugin_servers = {} - - -local loaded_plugins = {} - local function get_plugin(plugin_name) - kong = kong or _G.kong -- some CLI cmds set the global after loading the module. - if not loaded_plugins[plugin_name] then - local plugin = get_plugin_info(plugin_name) - loaded_plugins[plugin_name] = build_phases(plugin) - end + assert(load_external_plugins()) return loaded_plugins[plugin_name] end -function plugin_servers.load_plugin(plugin_name) +local function load_plugin(plugin_name) local plugin = get_plugin(plugin_name) if plugin and plugin.PRIORITY then return true, plugin @@ -382,7 +56,7 @@ function plugin_servers.load_plugin(plugin_name) return false, "no plugin found" end -function plugin_servers.load_schema(plugin_name) +local function load_schema(plugin_name) local plugin = get_plugin(plugin_name) if plugin and plugin.PRIORITY then return true, plugin.schema @@ -391,36 +65,49 @@ function plugin_servers.load_schema(plugin_name) return false, "no plugin found" end - -function plugin_servers.start() - if worker_id() ~= 0 then - return - end - - local pluginserver_timer = proc_mgmt.pluginserver_timer - - for _, server_def in ipairs(proc_mgmt.get_server_defs()) do - if server_def.start_command then - native_timer_at(0, pluginserver_timer, server_def) - end - end - +local function start() -- in case plugin server restarts, all workers need to update their defs kong.worker_events.register(function (data) - reset_instances_for_plugin(data.plugin_name) + plugin.reset_instances_for_plugin(data.plugin_name) end, "plugin_server", "reset_instances") -end -function plugin_servers.stop() - if worker_id() ~= 0 then - return - end + return proc_mgmt.start_pluginservers() +end - for _, server_def in ipairs(proc_mgmt.get_server_defs()) do - if server_def.proc then - server_def.proc:kill(SIGTERM) - end - end +local function stop() + return proc_mgmt.stop_pluginservers() end -return plugin_servers + +-- +-- This modules sole responsibility is to +-- manage external plugins: starting/stopping plugins servers, +-- and return plugins info (such as schema and their loaded representations) +-- +-- The general initialization flow is: +-- - kong.init: calls start and stop to start/stop external plugins servers +-- - kong.db.schema.plugin_loader: calls load_schema to get an external plugin schema +-- - kong.db.dao.plugins: calls load_plugin to get the expected representation of a plugin +-- (phase handlers, priority, etc) +-- +-- Internal flow: +-- .plugin_servers.init: loads all external plugins, by calling .plugin_servers.process and .plugin_servers.plugin +-- .plugin_servers.process: queries external plugins info with the command specified in _query_cmd properties +-- .plugin_servers.plugin: with info obtained as described above, .plugin:new returns a kong-compatible representation +-- of an external plugin, with phase handlers, PRIORITY, and wrappers to the PDK. Calls +-- .plugin_servers.rpc to create an RPC through which Kong communicates with the plugin process +-- .plugin_servers.rpc: based on info contained in the plugin (protocol field), creates the correct RPC for the +-- given external plugin +-- .plugin_servers.rpc.pb_rpc: protobuf rpc implementation - used by Golang +-- .plugin_servers.rpc.mp.rpc: messagepack rpc implementation - used by JS and Python +-- .plugin_servers.init: calls .plugin_servers.process to start external plugin servers +-- .plugin_servers.process: optionally starts all external plugin servers (if a _start_cmd is found) +-- uses the resty pipe API to manage the external plugin process +-- + +return { + start = start, + stop = stop, + load_schema = load_schema, + load_plugin = load_plugin, +} diff --git a/kong/runloop/plugin_servers/plugin.lua b/kong/runloop/plugin_servers/plugin.lua new file mode 100644 index 000000000000..cc84fb0e5b65 --- /dev/null +++ b/kong/runloop/plugin_servers/plugin.lua @@ -0,0 +1,353 @@ +-- This software is copyright Kong Inc. and its licensors. +-- Use of the software is subject to the agreement between your organization +-- and Kong Inc. If there is no such agreement, use is governed by and +-- subject to the terms of the Kong Master Software License Agreement found +-- at https://konghq.com/enterprisesoftwarelicense/. +-- [ END OF LICENSE 0867164ffc95e54f04670b5169c09574bdbd9bba ] + +local cjson = require "cjson.safe" +local ngx_ssl = require "ngx.ssl" +local clone = require "table.clone" +local rpc = require "kong.runloop.plugin_servers.rpc" + +local type = type +local ngx_sleep = ngx.sleep +local ngx_var = ngx.var +local cjson_encode = cjson.encode +local ipairs = ipairs +local coroutine_running = coroutine.running +local get_ctx_table = require("resty.core.ctx").get_ctx_table +local native_timer_at = _G.native_timer_at or ngx.timer.at + +--- currently running plugin instances +local running_instances = {} + +local req_start_time +local req_get_headers +local resp_get_headers + +if ngx.config.subsystem == "http" then + req_start_time = ngx.req.start_time + req_get_headers = ngx.req.get_headers + resp_get_headers = ngx.resp.get_headers + +else + local NOOP = function() end + + req_start_time = NOOP + req_get_headers = NOOP + resp_get_headers = NOOP +end + +--- keep request data a bit longer, into the log timer +local req_data = {} + +local function get_saved_req_data() + return req_data[coroutine_running()] +end + +local exposed_api = { + kong = kong, + + get_saved_req_data = get_saved_req_data, + + ["kong.log.serialize"] = function() + local saved = get_saved_req_data() + return cjson_encode(saved and saved.serialize_data or kong.log.serialize()) + end, + + ["kong.nginx.get_var"] = function(v) + return ngx_var[v] + end, + + ["kong.nginx.get_tls1_version_str"] = ngx_ssl.get_tls1_version_str, + + ["kong.nginx.get_ctx"] = function(k) + local saved = get_saved_req_data() + local ngx_ctx = saved and saved.ngx_ctx or ngx.ctx + return ngx_ctx[k] + end, + + ["kong.nginx.set_ctx"] = function(k, v) + local saved = get_saved_req_data() + local ngx_ctx = saved and saved.ngx_ctx or ngx.ctx + ngx_ctx[k] = v + end, + + ["kong.ctx.shared.get"] = function(k) + local saved = get_saved_req_data() + local ctx_shared = saved and saved.ctx_shared or kong.ctx.shared + return ctx_shared[k] + end, + + ["kong.ctx.shared.set"] = function(k, v) + local saved = get_saved_req_data() + local ctx_shared = saved and saved.ctx_shared or kong.ctx.shared + ctx_shared[k] = v + end, + + ["kong.request.get_headers"] = function(max) + local saved = get_saved_req_data() + return saved and saved.request_headers or kong.request.get_headers(max) + end, + + ["kong.request.get_header"] = function(name) + local saved = get_saved_req_data() + if not saved then + return kong.request.get_header(name) + end + + local header_value = saved.request_headers[name] + if type(header_value) == "table" then + header_value = header_value[1] + end + + return header_value + end, + + ["kong.request.get_uri_captures"] = function() + local saved = get_saved_req_data() + local ngx_ctx = saved and saved.ngx_ctx or ngx.ctx + return kong.request.get_uri_captures(ngx_ctx) + end, + + ["kong.response.get_status"] = function() + local saved = get_saved_req_data() + return saved and saved.response_status or kong.response.get_status() + end, + + ["kong.response.get_headers"] = function(max) + local saved = get_saved_req_data() + return saved and saved.response_headers or kong.response.get_headers(max) + end, + + ["kong.response.get_header"] = function(name) + local saved = get_saved_req_data() + if not saved then + return kong.response.get_header(name) + end + + local header_value = saved.response_headers and saved.response_headers[name] + if type(header_value) == "table" then + header_value = header_value[1] + end + + return header_value + end, + + ["kong.response.get_source"] = function() + local saved = get_saved_req_data() + return kong.response.get_source(saved and saved.ngx_ctx or nil) + end, + + ["kong.nginx.req_start_time"] = function() + local saved = get_saved_req_data() + return saved and saved.req_start_time or req_start_time() + end, +} + + +--- Phase closures +local function build_phases(plugin) + if not plugin then + return + end + + for _, phase in ipairs(plugin.phases) do + if phase == "log" then + plugin[phase] = function(self, conf) + native_timer_at(0, function(premature, saved) + if premature then + return + end + get_ctx_table(saved.ngx_ctx) + local co = coroutine_running() + req_data[co] = saved + plugin.rpc:handle_event(conf, phase) + req_data[co] = nil + end, { + plugin_name = self.name, + serialize_data = kong.log.serialize(), + ngx_ctx = clone(ngx.ctx), + ctx_shared = kong.ctx.shared, + request_headers = req_get_headers(), + response_headers = resp_get_headers(), + response_status = ngx.status, + req_start_time = req_start_time(), + }) + end + + else + plugin[phase] = function(self, conf) + plugin.rpc:handle_event(conf, phase) + end + end + end + + return plugin +end + +--- handle notifications from pluginservers +local rpc_notifications = {} + +--- serverPid notification sent by the pluginserver. if it changes, +--- all instances tied to this RPC socket should be restarted. +function rpc_notifications:serverPid(n) + n = tonumber(n) + if self.pluginserver_pid and n ~= self.pluginserver_pid then + for key, instance in pairs(running_instances) do + if instance.rpc == self then + running_instances[key] = nil + end + end + end + + self.pluginserver_pid = n +end + +local function reset_instances_for_plugin(plugin_name) + for k, instance in pairs(running_instances) do + if instance.plugin_name == plugin_name then + running_instances[k] = nil + end + end +end + +--- reset_instance: removes an instance from the table. +local function reset_instance(plugin_name, conf) + -- + -- the same plugin (which acts as a plugin server) is shared among + -- instances of the plugin; for example, the same plugin can be applied + -- to many routes + -- `reset_instance` is called when (but not only) the plugin server died; + -- in such case, all associated instances must be removed, not only the current + -- + reset_instances_for_plugin(plugin_name) + + local ok, err = kong.worker_events.post("plugin_server", "reset_instances", { plugin_name = plugin_name }) + if not ok then + kong.log.err("failed to post plugin_server reset_instances event: ", err) + end +end + +local get_instance_id + +do + local SLEEP_STEP = 0.1 + local WAIT_TIME = 10 + local MAX_WAIT_STEPS = WAIT_TIME / SLEEP_STEP + + --- get_instance_id: gets an ID to reference a plugin instance running in the + --- pluginserver; each configuration of a plugin is handled by a different + --- instance. Biggest complexity here is due to the remote (and thus non-atomic + --- and fallible) operation of starting the instance at the server. + function get_instance_id(plugin, conf) + local plugin_name = plugin.name + + local key = kong.plugin.get_id() + local instance_info = running_instances[key] + + local wait_count = 0 + while instance_info and not instance_info.id do + -- some other thread is already starting an instance + -- prevent busy-waiting + ngx_sleep(SLEEP_STEP) + + -- to prevent a potential dead loop when someone failed to release the ID + wait_count = wait_count + 1 + if wait_count > MAX_WAIT_STEPS then + running_instances[key] = nil + return nil, "Could not claim instance_id for " .. plugin_name .. " (key: " .. key .. ")" + end + instance_info = running_instances[key] + end + + if instance_info + and instance_info.id + and instance_info.seq == conf.__seq__ + and instance_info.conf and instance_info.conf.__plugin_id == key + then + -- exact match, return it + return instance_info.id + end + + local old_instance_id = instance_info and instance_info.id + if not instance_info then + -- we're the first, put something to claim + instance_info = { + conf = conf, + seq = conf.__seq__, + } + running_instances[key] = instance_info + else + + -- there already was something, make it evident that we're changing it + instance_info.id = nil + end + + local new_instance_info, err = plugin.rpc:call_start_instance(plugin_name, conf) + if new_instance_info == nil then + kong.log.err("starting instance: ", err) + -- remove claim, some other thread might succeed + running_instances[key] = nil + error(err) + end + + instance_info.id = new_instance_info.id + instance_info.plugin_name = plugin_name + instance_info.conf = new_instance_info.conf + instance_info.seq = new_instance_info.seq + instance_info.Config = new_instance_info.Config + instance_info.rpc = new_instance_info.rpc + + if old_instance_id then + -- there was a previous instance with same key, close it + plugin.rpc:call_close_instance(old_instance_id) + -- don't care if there's an error, maybe other thread closed it first. + end + + return instance_info.id + end +end + +-- +-- instance callbacks manage the state of a plugin instance +-- - get_instance_id (which also starts and instance) +-- - reset_instance, which removes an instance from the local cache +-- +local instance_callbacks = { + reset_instance = reset_instance, + get_instance_id = get_instance_id, +} + +local function new(plugin_info) + -- + -- plugin_info + -- * name + -- * priority + -- * version + -- * schema + -- * phases + -- * server_def + -- + + local self = build_phases(plugin_info) + self.instance_callbacks = instance_callbacks + self.exposed_api = exposed_api + self.rpc_notifications = rpc_notifications + + local plugin_rpc, err = rpc.new(self) + if not rpc then + return nil, err + end + + self.rpc = plugin_rpc + + return self +end + + +return { + new = new, + reset_instances_for_plugin = reset_instances_for_plugin, +} diff --git a/kong/runloop/plugin_servers/process.lua b/kong/runloop/plugin_servers/process.lua index 99bae4534360..ef984dddeeec 100644 --- a/kong/runloop/plugin_servers/process.lua +++ b/kong/runloop/plugin_servers/process.lua @@ -6,82 +6,21 @@ -- [ END OF LICENSE 0867164ffc95e54f04670b5169c09574bdbd9bba ] local cjson = require "cjson.safe" -local pl_path = require "pl.path" local raw_log = require "ngx.errlog".raw_log - +local worker_id = ngx.worker.id +local native_timer_at = _G.native_timer_at or ngx.timer.at local _, ngx_pipe = pcall(require, "ngx.pipe") - local kong = kong local ngx_INFO = ngx.INFO local cjson_decode = cjson.decode +local SIGTERM = 15 -local proc_mgmt = {} - -local _servers -local _plugin_infos - ---[[ - -Configuration - -We require three settings to communicate with each pluginserver. To make it -fit in the config structure, use a dynamic namespace and generous defaults. - -- pluginserver_names: a list of names, one for each pluginserver. - -- pluginserver_XXX_socket: unix socket to communicate with the pluginserver. -- pluginserver_XXX_start_cmd: command line to strat the pluginserver. -- pluginserver_XXX_query_cmd: command line to query the pluginserver. - -Note: the `_start_cmd` and `_query_cmd` are set to the defaults only if -they exist on the filesystem. If omitted and the default doesn't exist, -they're disabled. - -A disabled `_start_cmd` (unset and the default doesn't exist in the filesystem) -means this process isn't managed by Kong. It's expected that the socket -still works, supposedly handled by an externally-managed process. - -A disable `_query_cmd` means it won't be queried and so the corresponding -socket wouldn't be used, even if the process is managed (if the `_start_cmd` -is valid). Currently this has no use, but it could eventually be added via -other means, perhaps dynamically. - ---]] - -local function ifexists(path) - if pl_path.exists(path) then - return path - end -end +local _M = {} -local function get_server_defs() - local config = kong.configuration - - if not _servers then - _servers = {} - - for i, name in ipairs(config.pluginserver_names) do - name = name:lower() - kong.log.debug("search config for pluginserver named: ", name) - local env_prefix = "pluginserver_" .. name:gsub("-", "_") - _servers[i] = { - name = name, - socket = config[env_prefix .. "_socket"] or "/usr/local/kong/" .. name .. ".socket", - start_command = config[env_prefix .. "_start_cmd"] or ifexists("/usr/local/bin/"..name), - query_command = config[env_prefix .. "_query_cmd"] or ifexists("/usr/local/bin/query_"..name), - } - end - end - - return _servers -end - -proc_mgmt.get_server_defs = get_server_defs --[[ - Plugin info requests Disclaimer: The best way to do it is to have "ListPlugins()" and "GetInfo(plugin)" @@ -109,69 +48,66 @@ defining the name, priority, version, schema and phases of one plugin. This array should describe all plugins currently available through this server, no matter if actually enabled in Kong's configuration or not. - --]] - -local function register_plugin_info(server_def, plugin_info) - if _plugin_infos[plugin_info.Name] then - kong.log.err(string.format("Duplicate plugin name [%s] by %s and %s", - plugin_info.Name, _plugin_infos[plugin_info.Name].server_def.name, server_def.name)) - return - end - - _plugin_infos[plugin_info.Name] = { - server_def = server_def, - --rpc = server_def.rpc, - name = plugin_info.Name, - PRIORITY = plugin_info.Priority, - VERSION = plugin_info.Version, - schema = plugin_info.Schema, - phases = plugin_info.Phases, - } -end - -local function ask_info(server_def) +local function query_external_plugin_info(server_def) if not server_def.query_command then - kong.log.info(string.format("No info query for %s", server_def.name)) - return + return nil, "no info query for " .. server_def.name end local fd, err = io.popen(server_def.query_command) if not fd then - local msg = string.format("loading plugins info from [%s]:\n", server_def.name) - kong.log.err(msg, err) - return + return nil, string.format("error loading plugins info from [%s]: %s", server_def.name, err) end local infos_dump = fd:read("*a") fd:close() - local dump = cjson_decode(infos_dump) + local dump, err = cjson_decode(infos_dump) + if err then + return nil, "failed decoding plugin info: " .. err + end + if type(dump) ~= "table" then - error(string.format("Not a plugin info table: \n%s\n%s", - server_def.query_command, infos_dump)) - return + return nil, string.format("not a plugin info table: \n%s\n%s", server_def.query_command, infos_dump) end server_def.protocol = dump.Protocol or "MsgPack:1" - local infos = dump.Plugins or dump - - for _, plugin_info in ipairs(infos) do - register_plugin_info(server_def, plugin_info) - end + local info = (dump.Plugins or dump)[1] -- XXX can a pluginserver (in the embedded plugin server world + -- have more than one plugin? only a single + -- configuration is initialized currently, so this + -- seems to be legacy code) + + -- in remote times, a plugin server could serve more than one plugin + -- nowadays (2.8+), external plugins use an "embedded pluginserver" model, where + -- each plugin acts as an independent plugin server + return { + server_def = server_def, + name = info.Name, + PRIORITY = info.Priority, + VERSION = info.Version, + schema = info.Schema, + phases = info.Phases, + } end -function proc_mgmt.get_plugin_info(plugin_name) - if not _plugin_infos then - kong = kong or _G.kong -- some CLI cmds set the global after loading the module. - _plugin_infos = {} - for _, server_def in ipairs(get_server_defs()) do - ask_info(server_def) +function _M.load_external_plugins_info(kong_conf) + local available_external_plugins = {} + + kong.log.notice("[pluginserver] loading external plugins info") + + for _, pluginserver in ipairs(kong_conf.pluginservers) do + local plugin_info, err = query_external_plugin_info(pluginserver) + if not plugin_info then + return nil, err end + + available_external_plugins[plugin_info.name] = plugin_info end - return _plugin_infos[plugin_name] + kong.log.notice("[pluginserver] loaded #", #kong_conf.pluginservers, " external plugins info") + + return available_external_plugins end @@ -185,7 +121,6 @@ event and respawns the server. If the `_start_cmd` is unset (and the default doesn't exist in the filesystem) it's assumed the process is managed externally. - --]] local function grab_logs(proc, name) @@ -204,7 +139,8 @@ local function grab_logs(proc, name) end end -function proc_mgmt.pluginserver_timer(premature, server_def) + +local function pluginserver_timer(premature, server_def) if premature then return end @@ -220,30 +156,73 @@ function proc_mgmt.pluginserver_timer(premature, server_def) ngx.sleep(next_spawn - ngx.now()) end - kong.log.notice("Starting " .. server_def.name or "") + kong.log.notice("[pluginserver] starting pluginserver process for ", server_def.name or "") server_def.proc = assert(ngx_pipe.spawn("exec " .. server_def.start_command, { merge_stderr = true, })) next_spawn = ngx.now() + 1 server_def.proc:set_timeouts(nil, nil, nil, 0) -- block until something actually happens + kong.log.notice("[pluginserver] started, pid ", server_def.proc:pid()) while true do grab_logs(server_def.proc, server_def.name) local ok, reason, status = server_def.proc:wait() + + -- exited with a non 0 status if ok == false and reason == "exit" and status == 127 then kong.log.err(string.format( - "external pluginserver %q start command %q exited with \"command not found\"", + "[pluginserver] external pluginserver %q start command %q exited with \"command not found\"", server_def.name, server_def.start_command)) break + + -- waited on an exited thread elseif ok ~= nil or reason == "exited" or ngx.worker.exiting() then kong.log.notice("external pluginserver '", server_def.name, "' terminated: ", tostring(reason), " ", tostring(status)) break end + + -- XXX what happens if the process stops with a 0 status code? + end + end + + kong.log.notice("[pluginserver] exiting: pluginserver '", server_def.name, "' not respawned.") +end + + +function _M.start_pluginservers() + local kong_config = kong.configuration + + -- only worker 0 manages plugin server processes + if worker_id() == 0 then -- TODO move to privileged worker? + local pluginserver_timer = pluginserver_timer + + for _, server_def in ipairs(kong_config.pluginservers) do + if server_def.start_command then -- if not defined, we assume it's managed externally + native_timer_at(0, pluginserver_timer, server_def) + end end end - kong.log.notice("Exiting: pluginserver '", server_def.name, "' not respawned.") + + return true end +function _M.stop_pluginservers() + local kong_config = kong.configuration + + -- only worker 0 manages plugin server processes + if worker_id() == 0 then -- TODO move to privileged worker? + for _, server_def in ipairs(kong_config.pluginservers) do + if server_def.proc then + local ok, err = server_def.proc:kill(SIGTERM) + if not ok then + kong.log.error("[pluginserver] failed to stop pluginserver '", server_def.name, ": ", err) + end + kong.log.notice("[pluginserver] successfully stopped pluginserver '", server_def.name, "', pid ", server_def.proc:pid()) + end + end + end + return true +end -return proc_mgmt +return _M diff --git a/kong/runloop/plugin_servers/rpc/init.lua b/kong/runloop/plugin_servers/rpc/init.lua new file mode 100644 index 000000000000..5c2ea0f39234 --- /dev/null +++ b/kong/runloop/plugin_servers/rpc/init.lua @@ -0,0 +1,29 @@ +-- This software is copyright Kong Inc. and its licensors. +-- Use of the software is subject to the agreement between your organization +-- and Kong Inc. If there is no such agreement, use is governed by and +-- subject to the terms of the Kong Master Software License Agreement found +-- at https://konghq.com/enterprisesoftwarelicense/. +-- [ END OF LICENSE 0867164ffc95e54f04670b5169c09574bdbd9bba ] + +local protocol_implementations = { + ["MsgPack:1"] = "kong.runloop.plugin_servers.rpc.mp_rpc", + ["ProtoBuf:1"] = "kong.runloop.plugin_servers.rpc.pb_rpc", +} + +local function new(plugin) + local rpc_modname = protocol_implementations[plugin.server_def.protocol] + if not rpc_modname then + return nil, "unknown protocol implementation: " .. (plugin.server_def.protocol or "nil") + end + + kong.log.notice("[pluginserver] loading protocol ", plugin.server_def.protocol, " for plugin ", plugin.name) + + local rpc_mod = require (rpc_modname) + local rpc = rpc_mod.new(plugin) + + return rpc +end + +return { + new = new, +} diff --git a/kong/runloop/plugin_servers/mp_rpc.lua b/kong/runloop/plugin_servers/rpc/mp_rpc.lua similarity index 85% rename from kong/runloop/plugin_servers/mp_rpc.lua rename to kong/runloop/plugin_servers/rpc/mp_rpc.lua index dfd52d713949..c6c19c9f1a49 100644 --- a/kong/runloop/plugin_servers/mp_rpc.lua +++ b/kong/runloop/plugin_servers/rpc/mp_rpc.lua @@ -7,6 +7,7 @@ local kong_global = require "kong.global" local cjson = require "cjson.safe" +local rpc_util = require "kong.runloop.plugin_servers.rpc.util" local _ local msgpack do @@ -34,19 +35,6 @@ local str_find = string.find local Rpc = {} Rpc.__index = Rpc -Rpc.notifications_callbacks = {} - -function Rpc.new(socket_path, notifications) - kong.log.debug("mp_rpc.new: ", socket_path) - return setmetatable({ - socket_path = socket_path, - msg_id = 0, - notifications_callbacks = notifications, - }, Rpc) -end - - - -- add MessagePack empty array/map msgpack.packers['function'] = function (buffer, f) @@ -113,48 +101,30 @@ Kong API exposed to external plugins --]] --- global method search and cache -local function index_table(table, field) - if table[field] then - return table[field] - end - - local res = table - for segment, e in ngx.re.gmatch(field, "\\w+", "jo") do - if res[segment[0]] then - res = res[segment[0]] - else - return nil - end - end - return res -end - - local get_field do local method_cache = {} - function get_field(method) + function get_field(pdk, method) if method_cache[method] then return method_cache[method] else - method_cache[method] = index_table(Rpc.exposed_api, method) + method_cache[method] = rpc_util.index_table(pdk, method) return method_cache[method] end end end -local function call_pdk_method(cmd, args) - local method = get_field(cmd) +local function call_pdk_method(pdk, cmd, args) + local method = get_field(pdk, cmd) if not method then kong.log.err("could not find pdk method: ", cmd) return end - local saved = Rpc.save_for_later[coroutine.running()] + local saved = pdk.get_saved_req_data() if saved and saved.plugin_name then kong_global.set_namespaced_log(kong, saved.plugin_name) end @@ -210,7 +180,7 @@ function Rpc:call(method, ...) self.msg_id = self.msg_id + 1 local msg_id = self.msg_id - local c, err = ngx.socket.connect("unix:" .. self.socket_path) + local c, err = ngx.socket.connect("unix:" .. self.plugin.server_def.socket) if not c then kong.log.err("trying to connect: ", err) return nil, err @@ -285,7 +255,7 @@ end function Rpc:notification(label, args) - local f = self.notifications_callbacks[label] + local f = self.plugin.rpc_notifications[label] if f then f(self, args) end @@ -318,6 +288,7 @@ local function bridge_loop(instance_rpc, instance_id, phase) end local pdk_res, pdk_err = call_pdk_method( + instance_rpc.plugin.exposed_api, step_in.Data.Method, step_in.Data.Args) @@ -334,8 +305,10 @@ local function bridge_loop(instance_rpc, instance_id, phase) end -function Rpc:handle_event(plugin_name, conf, phase) - local instance_id, err = self.get_instance_id(plugin_name, conf) +function Rpc:handle_event(conf, phase) + local plugin_name = self.plugin.name + + local instance_id, err = self.plugin.instance_callbacks.get_instance_id(self.plugin, conf) if not err then _, err = bridge_loop(self, instance_id, phase) end @@ -344,16 +317,24 @@ function Rpc:handle_event(plugin_name, conf, phase) local err_lowered = err:lower() if str_find(err_lowered, "no plugin instance") then - self.reset_instance(plugin_name, conf) + self.plugin.instance_callbacks.reset_instance(plugin_name, conf) kong.log.warn(err) - return self:handle_event(plugin_name, conf, phase) + return self:handle_event(conf, phase) end kong.log.err(err) end end +local function new(plugin) + local self = setmetatable({ + msg_id = 0, + plugin = plugin, + }, Rpc) + return self +end - -return Rpc +return { + new = new, +} diff --git a/kong/runloop/plugin_servers/pb_rpc.lua b/kong/runloop/plugin_servers/rpc/pb_rpc.lua similarity index 89% rename from kong/runloop/plugin_servers/pb_rpc.lua rename to kong/runloop/plugin_servers/rpc/pb_rpc.lua index c96098de4bdc..dab4145b65a4 100644 --- a/kong/runloop/plugin_servers/pb_rpc.lua +++ b/kong/runloop/plugin_servers/rpc/pb_rpc.lua @@ -10,6 +10,7 @@ local cjson = require "cjson.safe" local grpc_tools = require "kong.tools.grpc" local pb = require "pb" local lpack = require "lua_pack" +local rpc_util = require "kong.runloop.plugin_servers.rpc.util" local ngx = ngx local kong = kong @@ -20,11 +21,9 @@ local st_unpack = lpack.unpack local str_find = string.find local proto_fname = "kong/pluginsocket.proto" - local Rpc = {} Rpc.__index = Rpc - local pb_unwrap do local structpb_value, structpb_list, structpb_struct @@ -183,24 +182,7 @@ do } end - -local function index_table(table, field) - if table[field] then - return table[field] - end - - local res = table - for segment, e in ngx.re.gmatch(field, "\\w+", "jo") do - if res[segment[0]] then - res = res[segment[0]] - else - return nil - end - end - return res -end - -local function load_service() +local function load_service(pdk) local p = grpc_tools.new() local protoc_instance = p.protoc_instance @@ -219,7 +201,7 @@ local function load_service() service[lower_name] = { method_name = method_name, - method = index_table(Rpc.exposed_api, lower_name), + method = rpc_util.index_table(pdk, lower_name), input_type = m.input_type, output_type = m.output_type, } @@ -240,13 +222,13 @@ local function identity_function(x) end -local function call_pdk(method_name, arg) +local function call_pdk(pdk, method_name, arg) local method = rpc_service[method_name] if not method then return nil, ("method %q not found"):format(method_name) end - local saved = Rpc.save_for_later[coroutine.running()] + local saved = pdk.get_saved_req_data() if saved and saved.plugin_name then kong_global.set_namespaced_log(kong, saved.plugin_name) end @@ -290,25 +272,10 @@ local function write_frame(c, msg) assert (c:send(msg)) end -function Rpc.new(socket_path, notifications) - - if not rpc_service then - rpc_service = load_service() - end - - --kong.log.debug("pb_rpc.new: ", socket_path) - return setmetatable({ - socket_path = socket_path, - msg_id = 0, - notifications_callbacks = notifications, - }, Rpc) -end - - function Rpc:call(method, data, do_bridge_loop) self.msg_id = self.msg_id + 1 local msg_id = self.msg_id - local c, err = ngx.socket.connect("unix:" .. self.socket_path) + local c, err = ngx.socket.connect("unix:" .. self.plugin.server_def.socket) if not c then kong.log.err("trying to connect: ", err) return nil, err @@ -343,7 +310,7 @@ function Rpc:call(method, data, do_bridge_loop) end local reply - reply, err = call_pdk(method_name, args) + reply, err = call_pdk(self.plugin.exposed_api, method_name, args) if not reply then return nil, err end @@ -378,7 +345,7 @@ function Rpc:call_start_instance(plugin_name, conf) return nil, err end - kong.log.debug("started plugin server: seq ", conf.__seq__, ", worker ", ngx.worker.id() or -1, ", instance id ", + kong.log.debug("started plugin server: seq ", conf.__seq__, ", worker ", ngx.worker.id(), ", instance id ", status.instance_status.instance_id) return { @@ -397,9 +364,10 @@ function Rpc:call_close_instance(instance_id) end +function Rpc:handle_event(conf, phase) + local plugin_name = self.plugin.name -function Rpc:handle_event(plugin_name, conf, phase) - local instance_id, err = self.get_instance_id(plugin_name, conf) + local instance_id, err = self.plugin.instance_callbacks.get_instance_id(self.plugin, conf) local res if not err then res, err = self:call("cmd_handle_event", { @@ -413,9 +381,9 @@ function Rpc:handle_event(plugin_name, conf, phase) if str_find(err_lowered, "no plugin instance", nil, true) or str_find(err_lowered, "closed", nil, true) then - self.reset_instance(plugin_name, conf) + self.plugin.instance_callbacks.reset_instance(plugin_name, conf) kong.log.warn(err) - return self:handle_event(plugin_name, conf, phase) + return self:handle_event(conf, phase) else kong.log.err("pluginserver error: ", err or "unknown error") @@ -424,5 +392,19 @@ function Rpc:handle_event(plugin_name, conf, phase) end end +local function new(plugin) + if not rpc_service then + rpc_service = load_service(plugin.exposed_api) + end + + local self = setmetatable({ + msg_id = 0, + plugin = plugin, + }, Rpc) + + return self +end -return Rpc +return { + new = new, +} diff --git a/kong/runloop/plugin_servers/rpc/util.lua b/kong/runloop/plugin_servers/rpc/util.lua new file mode 100644 index 000000000000..d452a366ddda --- /dev/null +++ b/kong/runloop/plugin_servers/rpc/util.lua @@ -0,0 +1,26 @@ +-- This software is copyright Kong Inc. and its licensors. +-- Use of the software is subject to the agreement between your organization +-- and Kong Inc. If there is no such agreement, use is governed by and +-- subject to the terms of the Kong Master Software License Agreement found +-- at https://konghq.com/enterprisesoftwarelicense/. +-- [ END OF LICENSE 0867164ffc95e54f04670b5169c09574bdbd9bba ] + +local function index_table(table, field) + if table[field] then + return table[field] + end + + local res = table + for segment, e in ngx.re.gmatch(field, "\\w+", "jo") do + if res[segment[0]] then + res = res[segment[0]] + else + return nil + end + end + return res + end + +return { + index_table = index_table, +} diff --git a/spec/01-unit/03-conf_loader_spec.lua b/spec/01-unit/03-conf_loader_spec.lua index 63cff8e1131c..c738c200ad7a 100644 --- a/spec/01-unit/03-conf_loader_spec.lua +++ b/spec/01-unit/03-conf_loader_spec.lua @@ -2984,4 +2984,80 @@ describe("Configuration loader", function() end) end) + describe("pluginserver config", function() + describe("fails if", function() + it("no query_command and not found in default location", function() + local _, err = conf_loader(nil, { + pluginserver_names = "gopher", + -- query_command = {}, + }) + assert.is_not_nil(err) + assert.matches("query_command undefined for pluginserver gopher", err) + end) + end) + describe("warns if", function() + it("no start_command (meaning process is externally maintained)", function() + local spy_log = spy.on(log, "warn") + + finally(function() + log.warn:revert() + assert:unregister("matcher", "str_match") + end) + + assert:register("matcher", "str_match", function (_state, arguments) + local expected = arguments[1] + return function(value) + return string.match(value, expected) ~= nil + end + end) + + local _, err = conf_loader(nil, { + pluginserver_names = "gopher", + pluginserver_gopher_query_cmd = "gopher -dump", + }) + assert.is_nil(err) + assert.spy(spy_log).was_called(1) + assert.spy(spy_log).was_called_with("start_command undefined for pluginserver gopher; assuming external process management") + end) + end) + it("fills in default settings", function() + -- mock default conf loader path - as we cannot + -- reliably write in the default path (/usr/local/bin) + package.loaded["kong.conf_loader"] = nil + local conf_constants = require"kong.conf_loader.constants" + conf_constants.DEFAULT_PLUGINSERVER_PATH = helpers.external_plugins_path .. "/go" + local conf_loader = require"kong.conf_loader" + + helpers.build_go_plugins(helpers.external_plugins_path .. "/go") + + finally(function() + package.loaded["kong.conf_loader"] = nil + package.loaded["kong.conf_loader.constants"] = nil + end) + + local conf, err = conf_loader(nil, { + pluginserver_names = "go-hello", + -- leave out start_command and query_command so that the defaults + -- are used + }) + assert.is_nil(err) + assert.same("go-hello", conf.pluginservers[1].name) + assert.same("./spec/fixtures/external_plugins/go/go-hello -dump", conf.pluginservers[1].query_command) + assert.same("./spec/fixtures/external_plugins/go/go-hello", conf.pluginservers[1].start_command) + assert.same(conf.prefix .. "/go-hello.socket", conf.pluginservers[1].socket) + end) + it("accepts custom settings", function() + local conf, err = conf_loader(nil, { + pluginserver_names = "gopher", + pluginserver_gopher_query_cmd = "gopher -dump", + pluginserver_gopher_start_cmd = "gopher -p $KONG_PREFIX", + pluginserver_gopher_socket = "/foo/bar/gopher.socket", + }) + assert.is_nil(err) + assert.same("gopher", conf.pluginservers[1].name) + assert.same("gopher -dump", conf.pluginservers[1].query_command) + assert.same("gopher -p $KONG_PREFIX", conf.pluginservers[1].start_command) + assert.same("/foo/bar/gopher.socket", conf.pluginservers[1].socket) + end) + end) end) diff --git a/spec/01-unit/25-msgpack_rpc_spec.lua b/spec/01-unit/25-msgpack_rpc_spec.lua index 9ab604fe5230..34a0a965f1f4 100644 --- a/spec/01-unit/25-msgpack_rpc_spec.lua +++ b/spec/01-unit/25-msgpack_rpc_spec.lua @@ -5,7 +5,7 @@ -- at https://konghq.com/enterprisesoftwarelicense/. -- [ END OF LICENSE 0867164ffc95e54f04670b5169c09574bdbd9bba ] -local mp_rpc = require "kong.runloop.plugin_servers.mp_rpc" +local mp_rpc = require "kong.runloop.plugin_servers.rpc.mp_rpc".new() local msgpack = require "MessagePack" local cjson = require "cjson.safe" @@ -42,4 +42,4 @@ describe("msgpack patched", function() assert.same(nil, unpacked[1], "failed to reproduce null when unpack") end end) -end) \ No newline at end of file +end) diff --git a/spec/02-integration/10-external-plugins/01-process-management_spec.lua b/spec/02-integration/10-external-plugins/01-process-management_spec.lua new file mode 100644 index 000000000000..aa55cffeb8c1 --- /dev/null +++ b/spec/02-integration/10-external-plugins/01-process-management_spec.lua @@ -0,0 +1,167 @@ +-- This software is copyright Kong Inc. and its licensors. +-- Use of the software is subject to the agreement between your organization +-- and Kong Inc. If there is no such agreement, use is governed by and +-- subject to the terms of the Kong Master Software License Agreement found +-- at https://konghq.com/enterprisesoftwarelicense/. +-- [ END OF LICENSE 0867164ffc95e54f04670b5169c09574bdbd9bba ] + +local helpers = require "spec.helpers" + +for _, strategy in helpers.each_strategy() do + describe("manages a pluginserver #" .. strategy, function() + lazy_setup(function() + assert(helpers.get_db_utils(strategy, { + "services", + "routes", + "plugins", + })) + end) + + describe("process management", function() + it("starts/stops an external plugin server [golang]", function() + local kong_prefix = helpers.test_conf.prefix + + assert(helpers.start_kong({ + nginx_conf = "spec/fixtures/custom_nginx.template", + log_level = "notice", + database = strategy, + plugins = "bundled,go-hello", + pluginserver_names = "test", + pluginserver_test_socket = kong_prefix .. "/go-hello.socket", + pluginserver_test_query_cmd = helpers.external_plugins_path .. "/go/go-hello -dump", + pluginserver_test_start_cmd = helpers.external_plugins_path .. "/go/go-hello -kong-prefix " .. kong_prefix, + })) + assert.logfile().has.line([[started, pid [0-9]+]]) + assert(helpers.stop_kong(nil, true)) + assert.logfile().has.line([[successfully stopped pluginserver 'test', pid [0-9]+]]) + end) + + it("starts/stops an external plugin server [python]", function() + local kong_prefix = helpers.test_conf.prefix + + assert(helpers.start_kong({ + nginx_conf = "spec/fixtures/custom_nginx.template", + log_level = "notice", + database = strategy, + plugins = "bundled,py-hello", + pluginserver_names = "test", + pluginserver_test_socket = kong_prefix .. "/py-hello.socket", + pluginserver_test_query_cmd = helpers.external_plugins_path .. "/py/py-hello.py --dump", + pluginserver_test_start_cmd = helpers.external_plugins_path .. "/py/py-hello.py --socket-name py-hello.socket --kong-prefix " .. kong_prefix, + })) + assert.logfile().has.line([[started, pid [0-9]+]]) + assert(helpers.stop_kong(nil, true)) + assert.logfile().has.line([[successfully stopped pluginserver 'test', pid [0-9]+]]) + end) + + it("starts/stops an external plugin server [golang, python]", function() + local kong_prefix = helpers.test_conf.prefix + + assert(helpers.start_kong({ + nginx_conf = "spec/fixtures/custom_nginx.template", + log_level = "notice", + database = strategy, + plugins = "bundled,go-hello,py-hello", + pluginserver_names = "test-go,test-py", + pluginserver_test_go_socket = kong_prefix .. "/go-hello.socket", + pluginserver_test_go_query_cmd = helpers.external_plugins_path .. "/go/go-hello -dump", + pluginserver_test_go_start_cmd = helpers.external_plugins_path .. "/go/go-hello -kong-prefix " .. kong_prefix, + pluginserver_test_py_socket = kong_prefix .. "/py-hello.socket", + pluginserver_test_py_query_cmd = helpers.external_plugins_path .. "/py/py-hello.py --dump", + pluginserver_test_py_start_cmd = helpers.external_plugins_path .. "/py/py-hello.py --socket-name py-hello.socket --kong-prefix " .. kong_prefix, + })) + assert.logfile().has.line([[started, pid [0-9]+]]) + assert(helpers.stop_kong(nil, true)) + assert.logfile().has.line([[successfully stopped pluginserver 'test-go', pid [0-9]+]]) + assert.logfile().has.line([[successfully stopped pluginserver 'test-py', pid [0-9]+]]) + end) + end) + + it("queries plugin info [golang]", function() + local proc_management = require "kong.runloop.plugin_servers.process" + local kong_prefix = helpers.test_conf.prefix + local conf_loader = require "kong.conf_loader" + + local conf, err = conf_loader(nil, { + plugins = "bundled,go-hello", + pluginserver_names = "test", + pluginserver_test_socket = kong_prefix .. "/go-hello.socket", + pluginserver_test_query_cmd = helpers.external_plugins_path .. "/go/go-hello -dump", + pluginserver_test_start_cmd = helpers.external_plugins_path .. "/go/go-hello -kong-prefix " .. kong_prefix, + }) + assert.is_nil(err) + + helpers.build_go_plugins(helpers.external_plugins_path .. "/go") + local plugin_infos = proc_management.load_external_plugins_info(conf) + assert.not_nil(plugin_infos["go-hello"]) + + local info = plugin_infos["go-hello"] + assert.equal(1, info.PRIORITY) + assert.equal("0.1", info.VERSION) + assert.equal("go-hello", info.name) + assert.same({ "access", "response", "log" }, info.phases) + assert.same("ProtoBuf:1", info.server_def.protocol) + end) + + it("queries plugin info [python]", function() + local proc_management = require "kong.runloop.plugin_servers.process" + local kong_prefix = helpers.test_conf.prefix + local conf_loader = require "kong.conf_loader" + + local conf, err = conf_loader(nil, { + plugins = "bundled,py-hello", + pluginserver_names = "test", + pluginserver_test_socket = kong_prefix .. "/py-hello.socket", + pluginserver_test_query_cmd = helpers.external_plugins_path .. "/py/py-hello.py --dump", + pluginserver_test_start_cmd = helpers.external_plugins_path .. "/py/py-hello.py --socket-name py-hello.socket --kong-prefix " .. kong_prefix, + }) + assert.is_nil(err) + + local plugin_infos = proc_management.load_external_plugins_info(conf) + assert.not_nil(plugin_infos["py-hello"]) + + local info = plugin_infos["py-hello"] + assert.equal(100, info.PRIORITY) + assert.equal("0.1.0", info.VERSION) + assert.equal("py-hello", info.name) + assert.same({ "access" }, info.phases) + assert.same("MsgPack:1", info.server_def.protocol) + end) + + it("queries plugin info [golang, python]", function() + local proc_management = require "kong.runloop.plugin_servers.process" + local kong_prefix = helpers.test_conf.prefix + local conf_loader = require "kong.conf_loader" + + local conf, err = conf_loader(nil, { + plugins = "bundled,py-hello", + pluginserver_names = "test-go,test-py", + pluginserver_test_go_socket = kong_prefix .. "/go-hello.socket", + pluginserver_test_go_query_cmd = helpers.external_plugins_path .. "/go/go-hello -dump", + pluginserver_test_go_start_cmd = helpers.external_plugins_path .. "/go/go-hello -kong-prefix " .. kong_prefix, + pluginserver_test_py_socket = kong_prefix .. "/py-hello.socket", + pluginserver_test_py_query_cmd = helpers.external_plugins_path .. "/py/py-hello.py --dump", + pluginserver_test_py_start_cmd = helpers.external_plugins_path .. "/py/py-hello.py --socket-name py-hello.socket --kong-prefix " .. kong_prefix, + }) + assert.is_nil(err) + + local plugin_infos = proc_management.load_external_plugins_info(conf) + assert.not_nil(plugin_infos["go-hello"]) + assert.not_nil(plugin_infos["py-hello"]) + + local go_info = plugin_infos["go-hello"] + assert.equal(1, go_info.PRIORITY) + assert.equal("0.1", go_info.VERSION) + assert.equal("go-hello", go_info.name) + assert.same({ "access", "response", "log" }, go_info.phases) + assert.same("ProtoBuf:1", go_info.server_def.protocol) + + local py_info = plugin_infos["py-hello"] + assert.equal(100, py_info.PRIORITY) + assert.equal("0.1.0", py_info.VERSION) + assert.equal("py-hello", py_info.name) + assert.same({ "access" }, py_info.phases) + assert.same("MsgPack:1", py_info.server_def.protocol) + end) + end) +end diff --git a/spec/02-integration/10-external-plugins/02-execution_spec.lua b/spec/02-integration/10-external-plugins/02-execution_spec.lua new file mode 100644 index 000000000000..466b4b4932fc --- /dev/null +++ b/spec/02-integration/10-external-plugins/02-execution_spec.lua @@ -0,0 +1,83 @@ +-- This software is copyright Kong Inc. and its licensors. +-- Use of the software is subject to the agreement between your organization +-- and Kong Inc. If there is no such agreement, use is governed by and +-- subject to the terms of the Kong Master Software License Agreement found +-- at https://konghq.com/enterprisesoftwarelicense/. +-- [ END OF LICENSE 0867164ffc95e54f04670b5169c09574bdbd9bba ] + +local helpers = require "spec.helpers" + +for _, strategy in helpers.each_strategy() do + describe("plugin triggering #" .. strategy, function() + lazy_setup(function() + local bp = assert(helpers.get_db_utils(strategy, { + "services", + "routes", + "plugins", + })) + + assert(bp.services:insert {}) + assert(bp.routes:insert({ + protocols = { "http" }, + paths = { "/" } + })) + + local kong_prefix = helpers.test_conf.prefix + + assert(helpers.start_kong({ + nginx_conf = "spec/fixtures/custom_nginx.template", + database = strategy, + plugins = "bundled,reports-api,go-hello,py-hello", + pluginserver_names = "test-go,test-py", + pluginserver_test_go_socket = kong_prefix .. "/go-hello.socket", + pluginserver_test_go_query_cmd = helpers.external_plugins_path .. "/go/go-hello -dump -kong-prefix " .. kong_prefix, + pluginserver_test_go_start_cmd = helpers.external_plugins_path .. "/go/go-hello -kong-prefix " .. kong_prefix, + pluginserver_test_py_socket = kong_prefix .. "/py-hello.socket", + pluginserver_test_py_query_cmd = helpers.external_plugins_path .. "/py/py-hello.py --dump", + pluginserver_test_py_start_cmd = helpers.external_plugins_path .. "/py/py-hello.py --socket-name py-hello.socket --kong-prefix " .. kong_prefix, + })) + + local admin_client = helpers.admin_client() + + local res = admin_client:post("/plugins", { + headers = { + ["Content-Type"] = "application/json" + }, + body = { + name = "go-hello", + config = { + message = "Kong!" + } + } + }) + assert.res_status(201, res) + + res = admin_client:post("/plugins", { + headers = { + ["Content-Type"] = "application/json" + }, + body = { + name = "py-hello", + config = { + message = "Kong!" + } + } + }) + assert.res_status(201, res) + end) + + lazy_teardown(function() + helpers.stop_kong() + end) + + it("executes external plugins [golang, python]", function() + local proxy_client = assert(helpers.proxy_client()) + local res = proxy_client:get("/") + assert.res_status(200, res) + local h = assert.response(res).has.header("x-hello-from-go") + assert.matches("Go says Kong! to", h) + h = assert.response(res).has.header("x-hello-from-python") + assert.matches("Python says Kong! to", h) + end) + end) +end diff --git a/spec/02-integration/10-go_plugins/03-wasm_spec.lua b/spec/02-integration/10-external-plugins/03-wasm_spec.lua similarity index 100% rename from spec/02-integration/10-go_plugins/03-wasm_spec.lua rename to spec/02-integration/10-external-plugins/03-wasm_spec.lua diff --git a/spec/02-integration/10-go_plugins/01-reports_spec.lua b/spec/02-integration/10-external-plugins/99-reports_spec.lua similarity index 95% rename from spec/02-integration/10-go_plugins/01-reports_spec.lua rename to spec/02-integration/10-external-plugins/99-reports_spec.lua index 4d64656a786a..2e82eb184d22 100644 --- a/spec/02-integration/10-go_plugins/01-reports_spec.lua +++ b/spec/02-integration/10-external-plugins/99-reports_spec.lua @@ -61,8 +61,8 @@ for _, strategy in helpers.each_strategy() do plugins = "bundled,reports-api,go-hello", pluginserver_names = "test", pluginserver_test_socket = kong_prefix .. "/go-hello.socket", - pluginserver_test_query_cmd = "./spec/fixtures/go/go-hello -dump -kong-prefix " .. kong_prefix, - pluginserver_test_start_cmd = "./spec/fixtures/go/go-hello -kong-prefix " .. kong_prefix, + pluginserver_test_query_cmd = helpers.external_plugins_path .. "/go/go-hello -dump -kong-prefix " .. kong_prefix, + pluginserver_test_start_cmd = helpers.external_plugins_path .. "/go/go-hello -kong-prefix " .. kong_prefix, anonymous_reports = true, })) diff --git a/spec/fixtures/go/go-hello.go b/spec/fixtures/external_plugins/go/go-hello.go similarity index 100% rename from spec/fixtures/go/go-hello.go rename to spec/fixtures/external_plugins/go/go-hello.go diff --git a/spec/fixtures/go/go.mod b/spec/fixtures/external_plugins/go/go.mod similarity index 100% rename from spec/fixtures/go/go.mod rename to spec/fixtures/external_plugins/go/go.mod diff --git a/spec/fixtures/go/go.sum b/spec/fixtures/external_plugins/go/go.sum similarity index 100% rename from spec/fixtures/go/go.sum rename to spec/fixtures/external_plugins/go/go.sum diff --git a/spec/fixtures/external_plugins/js/js-hello.js b/spec/fixtures/external_plugins/js/js-hello.js new file mode 100644 index 000000000000..b1141644bd62 --- /dev/null +++ b/spec/fixtures/external_plugins/js/js-hello.js @@ -0,0 +1,33 @@ +'use strict'; + +// This is an example plugin that add a header to the response + +class KongPlugin { + constructor(config) { + this.config = config + } + + async access(kong) { + let host = await kong.request.getHeader("host") + if (host === undefined) { + return await kong.log.err("unable to get header for request") + } + + let message = this.config.message || "hello" + + // the following can be "parallel"ed + await Promise.all([ + kong.response.setHeader("x-hello-from-javascript", "Javascript says " + message + " to " + host), + kong.response.setHeader("x-javascript-pid", process.pid), + ]) + } +} + +module.exports = { + Plugin: KongPlugin, + Schema: [ + { message: { type: "string" } }, + ], + Version: '0.1.0', + Priority: 0, +} \ No newline at end of file diff --git a/spec/fixtures/external_plugins/py/py-hello.py b/spec/fixtures/external_plugins/py/py-hello.py new file mode 100755 index 000000000000..3301e059e157 --- /dev/null +++ b/spec/fixtures/external_plugins/py/py-hello.py @@ -0,0 +1,37 @@ +#!/usr/bin/env python3 +import os +import kong_pdk.pdk.kong as kong + +Schema = ( + {"message": {"type": "string"}}, +) + +version = '0.1.0' +priority = 100 + +# This is an example plugin that add a header to the response + +class Plugin(object): + def __init__(self, config): + self.config = config + + def access(self, kong: kong.kong): + host, err = kong.request.get_header("host") + if err: + pass # error handling + # if run with --no-lua-style + # try: + # host = kong.request.get_header("host") + # except Exception as ex: + # pass # error handling + message = "hello" + if 'message' in self.config: + message = self.config['message'] + kong.response.set_header("x-hello-from-python", "Python says %s to %s" % (message, host)) + kong.response.set_header("x-python-pid", str(os.getpid())) + + +# add below section to allow this plugin optionally be running in a dedicated process +if __name__ == "__main__": + from kong_pdk.cli import start_dedicated_server + start_dedicated_server("py-hello", Plugin, version, priority, Schema) diff --git a/spec/fixtures/external_plugins/py/requirements.txt b/spec/fixtures/external_plugins/py/requirements.txt new file mode 100644 index 000000000000..0f887887fd55 --- /dev/null +++ b/spec/fixtures/external_plugins/py/requirements.txt @@ -0,0 +1 @@ +kong-pdk diff --git a/spec/helpers.lua b/spec/helpers.lua index 6b6f3e320518..c7c003158802 100644 --- a/spec/helpers.lua +++ b/spec/helpers.lua @@ -131,7 +131,7 @@ end bin_path = CONSTANTS.BIN_PATH, test_conf = conf, test_conf_path = CONSTANTS.TEST_CONF_PATH, - go_plugin_path = CONSTANTS.GO_PLUGIN_PATH, + external_plugins_path = CONSTANTS.EXTERNAL_PLUGINS_PATH, mock_upstream_hostname = CONSTANTS.MOCK_UPSTREAM_HOSTNAME, mock_upstream_protocol = CONSTANTS.MOCK_UPSTREAM_PROTOCOL, mock_upstream_host = CONSTANTS.MOCK_UPSTREAM_HOST, @@ -284,4 +284,6 @@ end return p ~= nil end, -- XXX EE ]] + + build_go_plugins = cmd.build_go_plugins, } diff --git a/spec/internal/cmd.lua b/spec/internal/cmd.lua index 783042096b3b..e0b9952ceda6 100644 --- a/spec/internal/cmd.lua +++ b/spec/internal/cmd.lua @@ -279,8 +279,8 @@ local function start_kong(env, tables, preserve_prefix, fixtures) -- go plugins are enabled -- compile fixture go plugins if any setting mentions it for _,v in pairs(env) do - if type(v) == "string" and v:find(CONSTANTS.GO_PLUGIN_PATH) then - build_go_plugins(CONSTANTS.GO_PLUGIN_PATH) + if type(v) == "string" and v:find(CONSTANTS.EXTERNAL_PLUGINS_PATH .. "/go") then + build_go_plugins(CONSTANTS.EXTERNAL_PLUGINS_PATH .. "/go") break end end @@ -478,5 +478,7 @@ return { kill_all = kill_all, signal = signal, signal_workers = signal_workers, + + build_go_plugins = build_go_plugins, } diff --git a/spec/internal/constants.lua b/spec/internal/constants.lua index fa98aa7223ee..b37fc65dc0d2 100644 --- a/spec/internal/constants.lua +++ b/spec/internal/constants.lua @@ -16,7 +16,7 @@ local CONSTANTS = { CUSTOM_VAULT_PATH = "./spec/fixtures/custom_vaults/?.lua;./spec/fixtures/custom_vaults/?/init.lua", DNS_MOCK_LUA_PATH = "./spec/fixtures/mocks/lua-resty-dns/?.lua", - GO_PLUGIN_PATH = "./spec/fixtures/go", + EXTERNAL_PLUGINS_PATH = "./spec/fixtures/external_plugins", GRPC_TARGET_SRC_PATH = "./spec/fixtures/grpc/target/", MOCK_UPSTREAM_PROTOCOL = "http", MOCK_UPSTREAM_SSL_PROTOCOL = "https",