From 38ddd0e203240cdb1d6c5352ea4b5dca9de0ce51 Mon Sep 17 00:00:00 2001 From: Guilherme Salazar Date: Wed, 7 Feb 2024 14:15:16 -0300 Subject: [PATCH] refactor(pluginservers): general refactor Context ------- The overall goal of this commit is to refactor the external plugins implementation, with the following goals in mind: - Make plugin server code more approachable to unfamiliar engineers and easier to evolve with confidence - Harden configuration; ensure configuration defects are caught before Kong is started - Extend testing coverage This is related to ongoing work on the Go PDK, with similar goals in mind. Summary ------- This commit implements the following overall changes to plugin server code: - Move configuration related code into conf loader, so that configuration loading and validation happens at startup time, rather than lazily, when plugin data is loaded or pluginservers are started. Add tests for current behavior. - Move process-management code - for starting up plugin servers as well as querying external plugins info - into the `process.lua` module. - Introduce a `kong.runloop.plugin_servers.rpc` module that encapsulates RPC initialization and protocol-specific implementations. This further simplifies the main plugin server main module. - Factor exposed API and phase handlers bridging code into a new `plugin` module, which encapsulates an external plugin representation, including the expected fields for any Kong plugin, plus external plugin-specific bits, such as the RPC instance. Part of this external plugin-specific part is the instance life cycle management. With this structure, the `kong.runloop.plugin_servers` main module contains only general external plugin code, including a list of loaded external plugins, and associated start/stop functions for plugin servers. Testing ------- This commit also implements the following improvements to tests: - Restructure fixtures to accommodate new external plugin servers -- namely, targeting for now in the existing Python and Javascript - Add new test cases for external plugins: * External plugin configuration: add test cases for current behavior; in particular: - Fail if no `query_cmd` is provided; - Warn if no `start_cmd` is provided - this is by design, as external plugins servers can be managed outside of Kong * Plugin server start / stop - for both Go and Python plugins * External plugin info querying for both Go and Python plugins * External plugin execution - for both Go and Python plugins Internal flow ------------- `.plugin_servers.init:` loads all external plugins, by calling .plugin_servers.process and `.plugin_servers.plugin` `.plugin_servers.process`: queries external plugins info with the command specified in `_query_cmd` proeprties `.plugin_servers.plugin`: with info obtained as described above, `.plugin:new` returns a kong-compatible representation of an external plugin, with phase handlers, PRIORITY, and wrappers to the PDK. Calls `.plugin_servers.rpc` to create an RPC through which Kong communicates with the plugin process `.plugin_servers.rpc`: based on info contained in the plugin (protocol field), creates the correct RPC for the given external plugin `.plugin_servers.rpc.pb_rpc`: protobuf rpc implementation - used by Golang `.plugin_servers.rpc.mp.rpc`: messagepack rpc implementation - used by JS and Python `.plugin_servers.init`: calls `.plugin_servers.process` to start external plugin servers `.plugin_servers.process`: optionally starts all external plugin servers (if a `_start_cmd` is found) uses the resty pipe API to manage the external plugin process --- .gitignore | 1 + kong-3.7.0-0.rockspec | 7 +- kong.conf.default | 9 +- kong/conf_loader/init.lua | 32 ++ kong/init.lua | 4 +- kong/runloop/plugin_servers/init.lua | 438 +++--------------- kong/runloop/plugin_servers/plugin.lua | 346 ++++++++++++++ kong/runloop/plugin_servers/process.lua | 208 ++++----- kong/runloop/plugin_servers/rpc/init.lua | 22 + .../plugin_servers/{ => rpc}/mp_rpc.lua | 69 +-- .../plugin_servers/{ => rpc}/pb_rpc.lua | 74 ++- kong/runloop/plugin_servers/rpc/util.lua | 19 + spec/01-unit/03-conf_loader_spec.lua | 66 +++ .../01-process-management_spec.lua | 160 +++++++ .../10-external-plugins/02-execution_spec.lua | 76 +++ .../99-reports_spec.lua} | 4 +- .../{ => external_plugins}/go/go-hello.go | 0 .../fixtures/{ => external_plugins}/go/go.mod | 0 .../fixtures/{ => external_plugins}/go/go.sum | 0 spec/fixtures/external_plugins/js/js-hello.js | 33 ++ spec/fixtures/external_plugins/py/py-hello.py | 37 ++ .../external_plugins/py/requirements.txt | 1 + spec/helpers.lua | 10 +- 23 files changed, 1024 insertions(+), 592 deletions(-) create mode 100644 kong/runloop/plugin_servers/plugin.lua create mode 100644 kong/runloop/plugin_servers/rpc/init.lua rename kong/runloop/plugin_servers/{ => rpc}/mp_rpc.lua (84%) rename kong/runloop/plugin_servers/{ => rpc}/pb_rpc.lua (88%) create mode 100644 kong/runloop/plugin_servers/rpc/util.lua create mode 100644 spec/02-integration/10-external-plugins/01-process-management_spec.lua create mode 100644 spec/02-integration/10-external-plugins/02-execution_spec.lua rename spec/02-integration/{10-go_plugins/01-reports_spec.lua => 10-external-plugins/99-reports_spec.lua} (95%) rename spec/fixtures/{ => external_plugins}/go/go-hello.go (100%) rename spec/fixtures/{ => external_plugins}/go/go.mod (100%) rename spec/fixtures/{ => external_plugins}/go/go.sum (100%) create mode 100644 spec/fixtures/external_plugins/js/js-hello.js create mode 100755 spec/fixtures/external_plugins/py/py-hello.py create mode 100644 spec/fixtures/external_plugins/py/requirements.txt diff --git a/.gitignore b/.gitignore index 5651c0f40c44..460b86388642 100644 --- a/.gitignore +++ b/.gitignore @@ -41,3 +41,4 @@ bin/h2client *.wasm spec/fixtures/proxy_wasm_filters/build spec/fixtures/proxy_wasm_filters/target +spec/fixtures/external_plugins/go/go-hello diff --git a/kong-3.7.0-0.rockspec b/kong-3.7.0-0.rockspec index 0be10e5335f7..69e4fc267fb7 100644 --- a/kong-3.7.0-0.rockspec +++ b/kong-3.7.0-0.rockspec @@ -201,8 +201,11 @@ build = { ["kong.runloop.balancer.upstreams"] = "kong/runloop/balancer/upstreams.lua", ["kong.runloop.plugin_servers"] = "kong/runloop/plugin_servers/init.lua", ["kong.runloop.plugin_servers.process"] = "kong/runloop/plugin_servers/process.lua", - ["kong.runloop.plugin_servers.mp_rpc"] = "kong/runloop/plugin_servers/mp_rpc.lua", - ["kong.runloop.plugin_servers.pb_rpc"] = "kong/runloop/plugin_servers/pb_rpc.lua", + ["kong.runloop.plugin_servers.plugin"] = "kong/runloop/plugin_servers/plugin.lua", + ["kong.runloop.plugin_servers.rpc"] = "kong/runloop/plugin_servers/rpc/init.lua", + ["kong.runloop.plugin_servers.rpc.util"] = "kong/runloop/plugin_servers/rpc/util.lua", + ["kong.runloop.plugin_servers.rpc.mp_rpc"] = "kong/runloop/plugin_servers/rpc/mp_rpc.lua", + ["kong.runloop.plugin_servers.rpc.pb_rpc"] = "kong/runloop/plugin_servers/rpc/pb_rpc.lua", ["kong.runloop.wasm"] = "kong/runloop/wasm.lua", ["kong.runloop.wasm.properties"] = "kong/runloop/wasm/properties.lua", diff --git a/kong.conf.default b/kong.conf.default index ff99a36909ed..5e06566f4c5f 100644 --- a/kong.conf.default +++ b/kong.conf.default @@ -197,14 +197,17 @@ #pluginserver_XXX_socket = /.socket # Path to the unix socket # used by the pluginserver. + #pluginserver_XXX_start_cmd = /usr/local/bin/ # Full command (including # any needed arguments) to - # start the pluginserver + # start the + # pluginserver. + #pluginserver_XXX_query_cmd = /usr/local/bin/query_ # Full command to "query" the # pluginserver. Should # produce a JSON with the - # dump info of all plugins it - # manages + # dump info of the plugin it + # manages. #port_maps = # With this configuration parameter, you can # let the Kong to know about the port from diff --git a/kong/conf_loader/init.lua b/kong/conf_loader/init.lua index bb36dde41e9f..a06d8fa0f053 100644 --- a/kong/conf_loader/init.lua +++ b/kong/conf_loader/init.lua @@ -853,6 +853,38 @@ local function load(path, custom_conf, opts) end end + -- parse and validate pluginserver directives + if conf.pluginserver_names then + local pluginservers = {} + for i, name in ipairs(conf.pluginserver_names) do + name = name:lower() + local env_prefix = "pluginserver_" .. name:gsub("-", "_") + local socket = conf[env_prefix .. "_socket"] or (conf.prefix .. "/" .. name .. ".socket") + local start_command = conf[env_prefix .. "_start_cmd"] or exists("/usr/local/bin/" .. name) + local query_command = conf[env_prefix .. "_query_cmd"] or exists("/usr/local/bin/" .. name .. " -dump") + + -- if start_command is unset, we assume the pluginserver process is + -- managed externally + if not start_command then + log.warn("start_command undefined for pluginserver " .. name .. "; assuming external process management") + end + + -- query_command is required + if not query_command then + return nil, "query_command undefined for pluginserver " .. name + end + + pluginservers[i] = { + name = name, + socket = socket, + start_command = start_command, + query_command = query_command, + } + end + + conf.pluginservers = setmetatable(pluginservers, conf_constants._NOP_TOSTRING_MT) + end + -- initialize the dns client, so the globally patched tcp.connect method -- will work from here onwards. assert(require("kong.tools.dns")(conf)) diff --git a/kong/init.lua b/kong/init.lua index 2c837dd0e52b..a5dfb571efc4 100644 --- a/kong/init.lua +++ b/kong/init.lua @@ -956,7 +956,7 @@ function Kong.init_worker() end if is_not_control_plane then - plugin_servers.start() + assert(plugin_servers.start()) end if kong.clustering then @@ -976,7 +976,7 @@ end function Kong.exit_worker() if process.type() ~= "privileged agent" and not is_control_plane(kong.configuration) then - plugin_servers.stop() + assert(plugin_servers.stop()) end end diff --git a/kong/runloop/plugin_servers/init.lua b/kong/runloop/plugin_servers/init.lua index 316bb11012cb..700591a2c403 100644 --- a/kong/runloop/plugin_servers/init.lua +++ b/kong/runloop/plugin_servers/init.lua @@ -1,372 +1,45 @@ - local proc_mgmt = require "kong.runloop.plugin_servers.process" -local cjson = require "cjson.safe" -local clone = require "table.clone" -local ngx_ssl = require "ngx.ssl" -local SIGTERM = 15 +local plugin = require "kong.runloop.plugin_servers.plugin" -local type = type local pairs = pairs -local ipairs = ipairs -local tonumber = tonumber - -local ngx = ngx local kong = kong -local ngx_var = ngx.var -local ngx_sleep = ngx.sleep -local worker_id = ngx.worker.id - -local coroutine_running = coroutine.running -local get_plugin_info = proc_mgmt.get_plugin_info -local get_ctx_table = require("resty.core.ctx").get_ctx_table - -local cjson_encode = cjson.encode -local native_timer_at = _G.native_timer_at or ngx.timer.at - -local req_start_time -local req_get_headers -local resp_get_headers - -if ngx.config.subsystem == "http" then - req_start_time = ngx.req.start_time - req_get_headers = ngx.req.get_headers - resp_get_headers = ngx.resp.get_headers - -else - local NOOP = function() end - - req_start_time = NOOP - req_get_headers = NOOP - resp_get_headers = NOOP -end - -local SLEEP_STEP = 0.1 -local WAIT_TIME = 10 -local MAX_WAIT_STEPS = WAIT_TIME / SLEEP_STEP - ---- keep request data a bit longer, into the log timer -local save_for_later = {} - ---- handle notifications from pluginservers -local rpc_notifications = {} - ---- currently running plugin instances -local running_instances = {} - -local function get_saved() - return save_for_later[coroutine_running()] -end - -local exposed_api = { - kong = kong, - - ["kong.log.serialize"] = function() - local saved = get_saved() - return cjson_encode(saved and saved.serialize_data or kong.log.serialize()) - end, - - ["kong.nginx.get_var"] = function(v) - return ngx_var[v] - end, - - ["kong.nginx.get_tls1_version_str"] = ngx_ssl.get_tls1_version_str, - - ["kong.nginx.get_ctx"] = function(k) - local saved = get_saved() - local ngx_ctx = saved and saved.ngx_ctx or ngx.ctx - return ngx_ctx[k] - end, - - ["kong.nginx.set_ctx"] = function(k, v) - local saved = get_saved() - local ngx_ctx = saved and saved.ngx_ctx or ngx.ctx - ngx_ctx[k] = v - end, - - ["kong.ctx.shared.get"] = function(k) - local saved = get_saved() - local ctx_shared = saved and saved.ctx_shared or kong.ctx.shared - return ctx_shared[k] - end, - - ["kong.ctx.shared.set"] = function(k, v) - local saved = get_saved() - local ctx_shared = saved and saved.ctx_shared or kong.ctx.shared - ctx_shared[k] = v - end, - - ["kong.request.get_headers"] = function(max) - local saved = get_saved() - return saved and saved.request_headers or kong.request.get_headers(max) - end, - - ["kong.request.get_header"] = function(name) - local saved = get_saved() - if not saved then - return kong.request.get_header(name) - end - - local header_value = saved.request_headers[name] - if type(header_value) == "table" then - header_value = header_value[1] - end - - return header_value - end, - - ["kong.request.get_uri_captures"] = function() - local saved = get_saved() - local ngx_ctx = saved and saved.ngx_ctx or ngx.ctx - return kong.request.get_uri_captures(ngx_ctx) - end, - - ["kong.response.get_status"] = function() - local saved = get_saved() - return saved and saved.response_status or kong.response.get_status() - end, - - ["kong.response.get_headers"] = function(max) - local saved = get_saved() - return saved and saved.response_headers or kong.response.get_headers(max) - end, - - ["kong.response.get_header"] = function(name) - local saved = get_saved() - if not saved then - return kong.response.get_header(name) - end - - local header_value = saved.response_headers and saved.response_headers[name] - if type(header_value) == "table" then - header_value = header_value[1] - end - - return header_value - end, - - ["kong.response.get_source"] = function() - local saved = get_saved() - return kong.response.get_source(saved and saved.ngx_ctx or nil) - end, - - ["kong.nginx.req_start_time"] = function() - local saved = get_saved() - return saved and saved.req_start_time or req_start_time() - end, -} - - -local get_instance_id -local reset_instance -local reset_instances_for_plugin - -local protocol_implementations = { - ["MsgPack:1"] = "kong.runloop.plugin_servers.mp_rpc", - ["ProtoBuf:1"] = "kong.runloop.plugin_servers.pb_rpc", -} - -local function get_server_rpc(server_def) - if not server_def.rpc then - - local rpc_modname = protocol_implementations[server_def.protocol] - if not rpc_modname then - kong.log.err("Unknown protocol implementation: ", server_def.protocol) - return nil, "Unknown protocol implementation" - end - - local rpc = require (rpc_modname) - rpc.get_instance_id = rpc.get_instance_id or get_instance_id - rpc.reset_instance = rpc.reset_instance or reset_instance - rpc.save_for_later = rpc.save_for_later or save_for_later - rpc.exposed_api = rpc.exposed_api or exposed_api - - server_def.rpc = rpc.new(server_def.socket, rpc_notifications) - end - - return server_def.rpc -end - - ---- get_instance_id: gets an ID to reference a plugin instance running in a ---- pluginserver each configuration in the database is handled by a different ---- instance. Biggest complexity here is due to the remote (and thus non-atomic ---- and fallible) operation of starting the instance at the server. -function get_instance_id(plugin_name, conf) - local key = type(conf) == "table" and kong.plugin.get_id() or plugin_name - local instance_info = running_instances[key] +-- module cache of loaded external plugins +-- XXX historically, this list of plugins has not been invalidated; +-- however, as plugin servers can be managed externally, users may also +-- change and restart the plugin server, potentially with new configurations +-- this needs to be improved -- docs and code hardening +local loaded_plugins - local wait_count = 0 - while instance_info and not instance_info.id do - -- some other thread is already starting an instance - -- prevent busy-waiting - ngx_sleep(SLEEP_STEP) - - -- to prevent a potential dead loop when someone failed to release the ID - wait_count = wait_count + 1 - if wait_count > MAX_WAIT_STEPS then - running_instances[key] = nil - return nil, "Could not claim instance_id for " .. plugin_name .. " (key: " .. key .. ")" - end - instance_info = running_instances[key] - end - - if instance_info - and instance_info.id - and instance_info.seq == conf.__seq__ - and instance_info.conf and instance_info.conf.__plugin_id == key - then - -- exact match, return it - return instance_info.id - end - - local old_instance_id = instance_info and instance_info.id - if not instance_info then - -- we're the first, put something to claim - instance_info = { - conf = conf, - seq = conf.__seq__, - } - running_instances[key] = instance_info - else - - -- there already was something, make it evident that we're changing it - instance_info.id = nil - end - - local plugin_info = get_plugin_info(plugin_name) - local server_rpc = get_server_rpc(plugin_info.server_def) - - local new_instance_info, err = server_rpc:call_start_instance(plugin_name, conf) - if new_instance_info == nil then - kong.log.err("starting instance: ", err) - -- remove claim, some other thread might succeed - running_instances[key] = nil - error(err) - end - - instance_info.id = new_instance_info.id - instance_info.plugin_name = plugin_name - instance_info.conf = new_instance_info.conf - instance_info.seq = new_instance_info.seq - instance_info.Config = new_instance_info.Config - instance_info.rpc = new_instance_info.rpc - - if old_instance_id then - -- there was a previous instance with same key, close it - server_rpc:call_close_instance(old_instance_id) - -- don't care if there's an error, maybe other thread closed it first. +local function load_external_plugins() + if loaded_plugins then + return true end - return instance_info.id -end - -function reset_instances_for_plugin(plugin_name) - for k, instance in pairs(running_instances) do - if instance.plugin_name == plugin_name then - running_instances[k] = nil - end - end -end + loaded_plugins = {} ---- reset_instance: removes an instance from the table. -function reset_instance(plugin_name, conf) - -- - -- the same plugin (which acts as a plugin server) is shared among - -- instances of the plugin; for example, the same plugin can be applied - -- to many routes - -- `reset_instance` is called when (but not only) the plugin server died; - -- in such case, all associated instances must be removed, not only the current - -- - reset_instances_for_plugin(plugin_name) + local kong_config = kong.configuration - local ok, err = kong.worker_events.post("plugin_server", "reset_instances", { plugin_name = plugin_name }) - if not ok then - kong.log.err("failed to post plugin_server reset_instances event: ", err) + local plugins_info, err = proc_mgmt.load_external_plugins_info(kong_config) + if not plugins_info then + return nil, "failed loading external plugins: " .. err end -end - ---- serverPid notification sent by the pluginserver. if it changes, ---- all instances tied to this RPC socket should be restarted. -function rpc_notifications:serverPid(n) - n = tonumber(n) - if self.pluginserver_pid and n ~= self.pluginserver_pid then - for key, instance in pairs(running_instances) do - if instance.rpc == self then - running_instances[key] = nil - end - end + for plugin_name, plugin_info in pairs(plugins_info) do + local plugin = plugin.new(plugin_info) + loaded_plugins[plugin_name] = plugin end - self.pluginserver_pid = n + return loaded_plugins end - - - - ---- Phase closures -local function build_phases(plugin) - if not plugin then - return - end - - local server_rpc = get_server_rpc(plugin.server_def) - - for _, phase in ipairs(plugin.phases) do - if phase == "log" then - plugin[phase] = function(self, conf) - native_timer_at(0, function(premature, saved) - if premature then - return - end - get_ctx_table(saved.ngx_ctx) - local co = coroutine_running() - save_for_later[co] = saved - server_rpc:handle_event(self.name, conf, phase) - save_for_later[co] = nil - end, { - plugin_name = self.name, - serialize_data = kong.log.serialize(), - ngx_ctx = clone(ngx.ctx), - ctx_shared = kong.ctx.shared, - request_headers = req_get_headers(), - response_headers = resp_get_headers(), - response_status = ngx.status, - req_start_time = req_start_time(), - }) - end - - else - plugin[phase] = function(self, conf) - server_rpc:handle_event(self.name, conf, phase) - end - end - end - - return plugin -end - - - ---- module table -local plugin_servers = {} - - -local loaded_plugins = {} - local function get_plugin(plugin_name) - kong = kong or _G.kong -- some CLI cmds set the global after loading the module. - if not loaded_plugins[plugin_name] then - local plugin = get_plugin_info(plugin_name) - loaded_plugins[plugin_name] = build_phases(plugin) - end + assert(load_external_plugins()) return loaded_plugins[plugin_name] end -function plugin_servers.load_plugin(plugin_name) +local function load_plugin(plugin_name) local plugin = get_plugin(plugin_name) if plugin and plugin.PRIORITY then return true, plugin @@ -375,7 +48,7 @@ function plugin_servers.load_plugin(plugin_name) return false, "no plugin found" end -function plugin_servers.load_schema(plugin_name) +local function load_schema(plugin_name) local plugin = get_plugin(plugin_name) if plugin and plugin.PRIORITY then return true, plugin.schema @@ -384,36 +57,53 @@ function plugin_servers.load_schema(plugin_name) return false, "no plugin found" end - -function plugin_servers.start() - if worker_id() ~= 0 then - return - end - - local pluginserver_timer = proc_mgmt.pluginserver_timer - - for _, server_def in ipairs(proc_mgmt.get_server_defs()) do - if server_def.start_command then - native_timer_at(0, pluginserver_timer, server_def) - end - end - +local function start() -- in case plugin server restarts, all workers need to update their defs kong.worker_events.register(function (data) - reset_instances_for_plugin(data.plugin_name) + plugin.reset_instances_for_plugin(data.plugin_name) end, "plugin_server", "reset_instances") + + assert(proc_mgmt.start_pluginservers()) + + return true end -function plugin_servers.stop() - if worker_id() ~= 0 then - return - end +local function stop() + assert(proc_mgmt.stop_pluginservers()) - for _, server_def in ipairs(proc_mgmt.get_server_defs()) do - if server_def.proc then - server_def.proc:kill(SIGTERM) - end - end + return true end -return plugin_servers + +-- +-- This modules sole responsibility is to +-- manage external plugins: starting/stopping plugins servers, +-- and return plugins info (such as schema and their loaded representations) +-- +-- The general initialization flow is: +-- - kong.init: calls start and stop to start/stop external plugins servers +-- - kong.db.schema.plugin_loader: calls load_schema to get an external plugin schema +-- - kong.db.dao.plugins: calls load_plugin to get the expected representation of a plugin +-- (phase handlers, priority, etc) +-- +-- Internal flow: +-- .plugin_servers.init: loads all external plugins, by calling .plugin_servers.process and .plugin_servers.plugin +-- .plugin_servers.process: queries external plugins info with the command specified in _query_cmd proeprties +-- .plugin_servers.plugin: with info obtained as described above, .plugin:new returns a kong-compatible representation +-- of an external plugin, with phase handlers, PRIORITY, and wrappers to the PDK. Calls +-- .plugin_servers.rpc to create an RPC through which Kong communicates with the plugin process +-- .plugin_servers.rpc: based on info contained in the plugin (protocol field), creates the correct RPC for the +-- given external plugin +-- .plugin_servers.rpc.pb_rpc: protobuf rpc implementation - used by Golang +-- .plugin_servers.rpc.mp.rpc: messagepack rpc implementation - used by JS and Python +-- .plugin_servers.init: calls .plugin_servers.process to start external plugin servers +-- .plugin_servers.process: optionally starts all external plugin servers (if a _start_cmd is found) +-- uses the resty pipe API to manage the external plugin process +-- + +return { + start = start, + stop = stop, + load_schema = load_schema, + load_plugin = load_plugin, +} \ No newline at end of file diff --git a/kong/runloop/plugin_servers/plugin.lua b/kong/runloop/plugin_servers/plugin.lua new file mode 100644 index 000000000000..ecdd2c2d16a0 --- /dev/null +++ b/kong/runloop/plugin_servers/plugin.lua @@ -0,0 +1,346 @@ +local cjson = require "cjson.safe" +local ngx_ssl = require "ngx.ssl" +local clone = require "table.clone" +local rpc = require "kong.runloop.plugin_servers.rpc" + +local type = type +local ngx_sleep = ngx.sleep +local ngx_var = ngx.var +local cjson_encode = cjson.encode +local ipairs = ipairs +local coroutine_running = coroutine.running +local get_ctx_table = require("resty.core.ctx").get_ctx_table +local native_timer_at = _G.native_timer_at or ngx.timer.at + +--- keep request data a bit longer, into the log timer +local save_for_later = {} + +--- currently running plugin instances +local running_instances = {} + +local req_start_time +local req_get_headers +local resp_get_headers + +if ngx.config.subsystem == "http" then + req_start_time = ngx.req.start_time + req_get_headers = ngx.req.get_headers + resp_get_headers = ngx.resp.get_headers + +else + local NOOP = function() end + + req_start_time = NOOP + req_get_headers = NOOP + resp_get_headers = NOOP +end + +local function get_saved() + return save_for_later[coroutine_running()] +end + +local exposed_api = { + kong = kong, + + get_saved_for_later = get_saved, + + ["kong.log.serialize"] = function() + local saved = get_saved() + return cjson_encode(saved and saved.serialize_data or kong.log.serialize()) + end, + + ["kong.nginx.get_var"] = function(v) + return ngx_var[v] + end, + + ["kong.nginx.get_tls1_version_str"] = ngx_ssl.get_tls1_version_str, + + ["kong.nginx.get_ctx"] = function(k) + local saved = get_saved() + local ngx_ctx = saved and saved.ngx_ctx or ngx.ctx + return ngx_ctx[k] + end, + + ["kong.nginx.set_ctx"] = function(k, v) + local saved = get_saved() + local ngx_ctx = saved and saved.ngx_ctx or ngx.ctx + ngx_ctx[k] = v + end, + + ["kong.ctx.shared.get"] = function(k) + local saved = get_saved() + local ctx_shared = saved and saved.ctx_shared or kong.ctx.shared + return ctx_shared[k] + end, + + ["kong.ctx.shared.set"] = function(k, v) + local saved = get_saved() + local ctx_shared = saved and saved.ctx_shared or kong.ctx.shared + ctx_shared[k] = v + end, + + ["kong.request.get_headers"] = function(max) + local saved = get_saved() + return saved and saved.request_headers or kong.request.get_headers(max) + end, + + ["kong.request.get_header"] = function(name) + local saved = get_saved() + if not saved then + return kong.request.get_header(name) + end + + local header_value = saved.request_headers[name] + if type(header_value) == "table" then + header_value = header_value[1] + end + + return header_value + end, + + ["kong.request.get_uri_captures"] = function() + local saved = get_saved() + local ngx_ctx = saved and saved.ngx_ctx or ngx.ctx + return kong.request.get_uri_captures(ngx_ctx) + end, + + ["kong.response.get_status"] = function() + local saved = get_saved() + return saved and saved.response_status or kong.response.get_status() + end, + + ["kong.response.get_headers"] = function(max) + local saved = get_saved() + return saved and saved.response_headers or kong.response.get_headers(max) + end, + + ["kong.response.get_header"] = function(name) + local saved = get_saved() + if not saved then + return kong.response.get_header(name) + end + + local header_value = saved.response_headers and saved.response_headers[name] + if type(header_value) == "table" then + header_value = header_value[1] + end + + return header_value + end, + + ["kong.response.get_source"] = function() + local saved = get_saved() + return kong.response.get_source(saved and saved.ngx_ctx or nil) + end, + + ["kong.nginx.req_start_time"] = function() + local saved = get_saved() + return saved and saved.req_start_time or req_start_time() + end, +} + + +--- Phase closures +local function build_phases(plugin) + if not plugin then + return + end + + for _, phase in ipairs(plugin.phases) do + if phase == "log" then + plugin[phase] = function(self, conf) + native_timer_at(0, function(premature, saved) + if premature then + return + end + get_ctx_table(saved.ngx_ctx) + local co = coroutine_running() + save_for_later[co] = saved + plugin.rpc:handle_event(conf, phase) + save_for_later[co] = nil + end, { + plugin_name = self.name, + serialize_data = kong.log.serialize(), + ngx_ctx = clone(ngx.ctx), + ctx_shared = kong.ctx.shared, + request_headers = req_get_headers(), + response_headers = resp_get_headers(), + response_status = ngx.status, + req_start_time = req_start_time(), + }) + end + + else + plugin[phase] = function(self, conf) + plugin.rpc:handle_event(conf, phase) + end + end + end + + return plugin +end + +--- handle notifications from pluginservers +local rpc_notifications = {} + +--- serverPid notification sent by the pluginserver. if it changes, +--- all instances tied to this RPC socket should be restarted. +function rpc_notifications:serverPid(n) + n = tonumber(n) + if self.pluginserver_pid and n ~= self.pluginserver_pid then + for key, instance in pairs(running_instances) do + if instance.rpc == self then + running_instances[key] = nil + end + end + end + + self.pluginserver_pid = n +end + +local function reset_instances_for_plugin(plugin_name) + for k, instance in pairs(running_instances) do + if instance.plugin_name == plugin_name then + running_instances[k] = nil + end + end +end + +--- reset_instance: removes an instance from the table. +local function reset_instance(plugin_name, conf) + -- + -- the same plugin (which acts as a plugin server) is shared among + -- instances of the plugin; for example, the same plugin can be applied + -- to many routes + -- `reset_instance` is called when (but not only) the plugin server died; + -- in such case, all associated instances must be removed, not only the current + -- + reset_instances_for_plugin(plugin_name) + + local ok, err = kong.worker_events.post("plugin_server", "reset_instances", { plugin_name = plugin_name }) + if not ok then + kong.log.err("failed to post plugin_server reset_instances event: ", err) + end +end + +local get_instance_id + +do + local SLEEP_STEP = 0.1 + local WAIT_TIME = 10 + local MAX_WAIT_STEPS = WAIT_TIME / SLEEP_STEP + + --- get_instance_id: gets an ID to reference a plugin instance running in the + --- pluginserver; each configuration of a plugin is handled by a different + --- instance. Biggest complexity here is due to the remote (and thus non-atomic + --- and fallible) operation of starting the instance at the server. + function get_instance_id(plugin, conf) + local plugin_name = plugin.name + + local key = kong.plugin.get_id() + local instance_info = running_instances[key] + + local wait_count = 0 + while instance_info and not instance_info.id do + -- some other thread is already starting an instance + -- prevent busy-waiting + ngx_sleep(SLEEP_STEP) + + -- to prevent a potential dead loop when someone failed to release the ID + wait_count = wait_count + 1 + if wait_count > MAX_WAIT_STEPS then + running_instances[key] = nil + return nil, "Could not claim instance_id for " .. plugin_name .. " (key: " .. key .. ")" + end + instance_info = running_instances[key] + end + + if instance_info + and instance_info.id + and instance_info.seq == conf.__seq__ + and instance_info.conf and instance_info.conf.__plugin_id == key + then + -- exact match, return it + return instance_info.id + end + + local old_instance_id = instance_info and instance_info.id + if not instance_info then + -- we're the first, put something to claim + instance_info = { + conf = conf, + seq = conf.__seq__, + } + running_instances[key] = instance_info + else + + -- there already was something, make it evident that we're changing it + instance_info.id = nil + end + + local new_instance_info, err = plugin.rpc:call_start_instance(plugin_name, conf) + if new_instance_info == nil then + kong.log.err("starting instance: ", err) + -- remove claim, some other thread might succeed + running_instances[key] = nil + error(err) + end + + instance_info.id = new_instance_info.id + instance_info.plugin_name = plugin_name + instance_info.conf = new_instance_info.conf + instance_info.seq = new_instance_info.seq + instance_info.Config = new_instance_info.Config + instance_info.rpc = new_instance_info.rpc + + if old_instance_id then + -- there was a previous instance with same key, close it + plugin.rpc:call_close_instance(old_instance_id) + -- don't care if there's an error, maybe other thread closed it first. + end + + return instance_info.id + end +end + +-- +-- instance callbacks manage the state of a plugin instance +-- - get_instance_id (which also starts and instance) +-- - reset_instance, which removes an instance from the local cache +-- +local instance_callbacks = { + reset_instance = reset_instance, + get_instance_id = get_instance_id, +} + +local function new(plugin_info) + -- + -- plugin_info + -- * name + -- * priority + -- * version + -- * schema + -- * phases + -- * server_def + -- + + local self = build_phases(plugin_info) + self.instance_callbacks = instance_callbacks + self.exposed_api = exposed_api + self.rpc_notifications = rpc_notifications + + local plugin_rpc, err = rpc.new(self) + if not rpc then + return nil, err + end + + self.rpc = plugin_rpc + + return self +end + + +return { + new = new, + reset_instances_for_plugin = reset_instances_for_plugin, +} \ No newline at end of file diff --git a/kong/runloop/plugin_servers/process.lua b/kong/runloop/plugin_servers/process.lua index 79c5ee44c7b1..bde565d8c5cf 100644 --- a/kong/runloop/plugin_servers/process.lua +++ b/kong/runloop/plugin_servers/process.lua @@ -1,81 +1,19 @@ local cjson = require "cjson.safe" -local pl_path = require "pl.path" local raw_log = require "ngx.errlog".raw_log -local is_not_http_subsystem = ngx.config.subsystem ~= "http" - +local worker_id = ngx.worker.id +local native_timer_at = _G.native_timer_at or ngx.timer.at local _, ngx_pipe = pcall(require, "ngx.pipe") - local kong = kong local ngx_INFO = ngx.INFO local cjson_decode = cjson.decode +local SIGTERM = 15 -local proc_mgmt = {} - -local _servers -local _plugin_infos - ---[[ - -Configuration - -We require three settings to communicate with each pluginserver. To make it -fit in the config structure, use a dynamic namespace and generous defaults. - -- pluginserver_names: a list of names, one for each pluginserver. - -- pluginserver_XXX_socket: unix socket to communicate with the pluginserver. -- pluginserver_XXX_start_cmd: command line to strat the pluginserver. -- pluginserver_XXX_query_cmd: command line to query the pluginserver. - -Note: the `_start_cmd` and `_query_cmd` are set to the defaults only if -they exist on the filesystem. If omitted and the default doesn't exist, -they're disabled. - -A disabled `_start_cmd` (unset and the default doesn't exist in the filesystem) -means this process isn't managed by Kong. It's expected that the socket -still works, supposedly handled by an externally-managed process. - -A disable `_query_cmd` means it won't be queried and so the corresponding -socket wouldn't be used, even if the process is managed (if the `_start_cmd` -is valid). Currently this has no use, but it could eventually be added via -other means, perhaps dynamically. - ---]] - -local function ifexists(path) - if pl_path.exists(path) then - return path - end -end +local _M = {} -local function get_server_defs() - local config = kong.configuration - - if not _servers then - _servers = {} - - for i, name in ipairs(config.pluginserver_names) do - name = name:lower() - kong.log.debug("search config for pluginserver named: ", name) - local env_prefix = "pluginserver_" .. name:gsub("-", "_") - _servers[i] = { - name = name, - socket = config[env_prefix .. "_socket"] or "/usr/local/kong/" .. name .. ".socket", - start_command = config[env_prefix .. "_start_cmd"] or ifexists("/usr/local/bin/"..name), - query_command = config[env_prefix .. "_query_cmd"] or ifexists("/usr/local/bin/query_"..name), - } - end - end - - return _servers -end - -proc_mgmt.get_server_defs = get_server_defs --[[ - Plugin info requests Disclaimer: The best way to do it is to have "ListPlugins()" and "GetInfo(plugin)" @@ -103,69 +41,66 @@ defining the name, priority, version, schema and phases of one plugin. This array should describe all plugins currently available through this server, no matter if actually enabled in Kong's configuration or not. - --]] - -local function register_plugin_info(server_def, plugin_info) - if _plugin_infos[plugin_info.Name] then - kong.log.err(string.format("Duplicate plugin name [%s] by %s and %s", - plugin_info.Name, _plugin_infos[plugin_info.Name].server_def.name, server_def.name)) - return - end - - _plugin_infos[plugin_info.Name] = { - server_def = server_def, - --rpc = server_def.rpc, - name = plugin_info.Name, - PRIORITY = plugin_info.Priority, - VERSION = plugin_info.Version, - schema = plugin_info.Schema, - phases = plugin_info.Phases, - } -end - -local function ask_info(server_def) +local function query_external_plugin_info(server_def) if not server_def.query_command then - kong.log.info(string.format("No info query for %s", server_def.name)) - return + return nil, "no info query for " .. server_def.name end local fd, err = io.popen(server_def.query_command) if not fd then - local msg = string.format("loading plugins info from [%s]:\n", server_def.name) - kong.log.err(msg, err) - return + return nil, string.format("error loading plugins info from [%s]: %s", server_def.name, err) end local infos_dump = fd:read("*a") fd:close() - local dump = cjson_decode(infos_dump) + local dump, err = cjson_decode(infos_dump) + if err then + return nil, "failed decoding plugin info: " .. err + end + if type(dump) ~= "table" then - error(string.format("Not a plugin info table: \n%s\n%s", - server_def.query_command, infos_dump)) - return + return nil, string.format("not a plugin info table: \n%s\n%s", server_def.query_command, infos_dump) end server_def.protocol = dump.Protocol or "MsgPack:1" - local infos = dump.Plugins or dump - - for _, plugin_info in ipairs(infos) do - register_plugin_info(server_def, plugin_info) - end + local info = (dump.Plugins or dump)[1] -- XXX can a pluginserver (in the embedded plugin server world + -- have more than one plugin? only a single + -- configuration is initialized currently, so this + -- seems to be legacy code) + + -- in remote times, a plugin server could serve more than one plugin + -- nowadays (2.8+), external plugins use an "embedded pluginserver" model, where + -- each plugin acts as an independent plugin server + return { + server_def = server_def, + name = info.Name, + PRIORITY = info.Priority, + VERSION = info.Version, + schema = info.Schema, + phases = info.Phases, + } end -function proc_mgmt.get_plugin_info(plugin_name) - if not _plugin_infos then - kong = kong or _G.kong -- some CLI cmds set the global after loading the module. - _plugin_infos = {} - for _, server_def in ipairs(get_server_defs()) do - ask_info(server_def) +function _M.load_external_plugins_info(kong_conf) + local available_external_plugins = {} + + kong.log.notice("[pluginserver] loading external plugins info") + + for _, pluginserver in ipairs(kong_conf.pluginservers) do + local plugin_info, err = query_external_plugin_info(pluginserver) + if not plugin_info then + return nil, err end + + available_external_plugins[plugin_info.name] = plugin_info end - return _plugin_infos[plugin_name] + kong.log.notice("[pluginserver] loaded #", #kong_conf.pluginservers, " external plugins info") + + return available_external_plugins end @@ -179,7 +114,6 @@ event and respawns the server. If the `_start_cmd` is unset (and the default doesn't exist in the filesystem) it's assumed the process is managed externally. - --]] local function grab_logs(proc, name) @@ -198,12 +132,13 @@ local function grab_logs(proc, name) end end -function proc_mgmt.pluginserver_timer(premature, server_def) + +local function pluginserver_timer(premature, server_def) if premature then return end - if is_not_http_subsystem then + if ngx.config.subsystem ~= "http" then return end @@ -214,30 +149,73 @@ function proc_mgmt.pluginserver_timer(premature, server_def) ngx.sleep(next_spawn - ngx.now()) end - kong.log.notice("Starting " .. server_def.name or "") + kong.log.notice("[pluginserver] starting pluginserver process for ", server_def.name or "") server_def.proc = assert(ngx_pipe.spawn("exec " .. server_def.start_command, { merge_stderr = true, })) next_spawn = ngx.now() + 1 server_def.proc:set_timeouts(nil, nil, nil, 0) -- block until something actually happens + kong.log.notice("[pluginserver] started, pid ", server_def.proc:pid()) while true do grab_logs(server_def.proc, server_def.name) local ok, reason, status = server_def.proc:wait() + + -- exited with a non 0 status if ok == false and reason == "exit" and status == 127 then kong.log.err(string.format( - "external pluginserver %q start command %q exited with \"command not found\"", + "[pluginserver] external pluginserver %q start command %q exited with \"command not found\"", server_def.name, server_def.start_command)) break + + -- waited on an exited thread elseif ok ~= nil or reason == "exited" or ngx.worker.exiting() then kong.log.notice("external pluginserver '", server_def.name, "' terminated: ", tostring(reason), " ", tostring(status)) break end + + -- XXX what happens if the process stops with a 0 status code? + end + end + + kong.log.notice("[pluginserver] exiting: pluginserver '", server_def.name, "' not respawned.") +end + + +function _M.start_pluginservers() + local kong_config = kong.configuration + + -- only worker 0 manages plugin server processes + if worker_id() == 0 then -- TODO move to privileged worker? + local pluginserver_timer = pluginserver_timer + + for _, server_def in ipairs(kong_config.pluginservers) do + if server_def.start_command then -- if not defined, we assume it's managed externally + native_timer_at(0, pluginserver_timer, server_def) + end end end - kong.log.notice("Exiting: pluginserver '", server_def.name, "' not respawned.") + + return true end +function _M.stop_pluginservers() + local kong_config = kong.configuration + + -- only worker 0 manages plugin server processes + if worker_id() == 0 then -- TODO move to privileged worker? + for _, server_def in ipairs(kong_config.pluginservers) do + if server_def.proc then + local ok, err = server_def.proc:kill(SIGTERM) + if not ok then + kong.log.error("[pluginserver] failed to stop pluginserver '", server_def.name, ": ", err) + end + kong.log.notice("[pluginserver] successfully stopped pluginserver '", server_def.name, "', pid ", server_def.proc:pid()) + end + end + end + return true +end -return proc_mgmt +return _M diff --git a/kong/runloop/plugin_servers/rpc/init.lua b/kong/runloop/plugin_servers/rpc/init.lua new file mode 100644 index 000000000000..d84c7b1b1e93 --- /dev/null +++ b/kong/runloop/plugin_servers/rpc/init.lua @@ -0,0 +1,22 @@ +local protocol_implementations = { + ["MsgPack:1"] = "kong.runloop.plugin_servers.rpc.mp_rpc", + ["ProtoBuf:1"] = "kong.runloop.plugin_servers.rpc.pb_rpc", +} + +local function new(plugin) + local rpc_modname = protocol_implementations[plugin.server_def.protocol] + if not rpc_modname then + return nil, "unknown protocol implementation: " .. (plugin.server_def.protocol or "nil") + end + + kong.log.notice("[pluginserver] loading protocol ", plugin.server_def.protocol, " for plugin ", plugin.name) + + local rpc_mod = require (rpc_modname) + local rpc = rpc_mod.new(plugin) + + return rpc +end + +return { + new = new, +} \ No newline at end of file diff --git a/kong/runloop/plugin_servers/mp_rpc.lua b/kong/runloop/plugin_servers/rpc/mp_rpc.lua similarity index 84% rename from kong/runloop/plugin_servers/mp_rpc.lua rename to kong/runloop/plugin_servers/rpc/mp_rpc.lua index 0895c44b600f..ef06faa55830 100644 --- a/kong/runloop/plugin_servers/mp_rpc.lua +++ b/kong/runloop/plugin_servers/rpc/mp_rpc.lua @@ -1,5 +1,6 @@ local kong_global = require "kong.global" local cjson = require "cjson.safe" +local rpc_util = require "kong.runloop.plugin_servers.rpc.util" local _ local msgpack do @@ -27,19 +28,6 @@ local str_find = string.find local Rpc = {} Rpc.__index = Rpc -Rpc.notifications_callbacks = {} - -function Rpc.new(socket_path, notifications) - kong.log.debug("mp_rpc.new: ", socket_path) - return setmetatable({ - socket_path = socket_path, - msg_id = 0, - notifications_callbacks = notifications, - }, Rpc) -end - - - -- add MessagePack empty array/map msgpack.packers['function'] = function (buffer, f) @@ -106,48 +94,30 @@ Kong API exposed to external plugins --]] --- global method search and cache -local function index_table(table, field) - if table[field] then - return table[field] - end - - local res = table - for segment, e in ngx.re.gmatch(field, "\\w+", "jo") do - if res[segment[0]] then - res = res[segment[0]] - else - return nil - end - end - return res -end - - local get_field do local method_cache = {} - function get_field(method) + function get_field(pdk, method) if method_cache[method] then return method_cache[method] else - method_cache[method] = index_table(Rpc.exposed_api, method) + method_cache[method] = rpc_util.index_table(pdk, method) return method_cache[method] end end end -local function call_pdk_method(cmd, args) - local method = get_field(cmd) +local function call_pdk_method(pdk, cmd, args) + local method = get_field(pdk, cmd) if not method then kong.log.err("could not find pdk method: ", cmd) return end - local saved = Rpc.save_for_later[coroutine.running()] + local saved = pdk.get_saved_for_later() if saved and saved.plugin_name then kong_global.set_namespaced_log(kong, saved.plugin_name) end @@ -203,7 +173,7 @@ function Rpc:call(method, ...) self.msg_id = self.msg_id + 1 local msg_id = self.msg_id - local c, err = ngx.socket.connect("unix:" .. self.socket_path) + local c, err = ngx.socket.connect("unix:" .. self.plugin.server_def.socket) if not c then kong.log.err("trying to connect: ", err) return nil, err @@ -278,7 +248,7 @@ end function Rpc:notification(label, args) - local f = self.notifications_callbacks[label] + local f = self.plugin.rpc_notifications[label] if f then f(self, args) end @@ -311,6 +281,7 @@ local function bridge_loop(instance_rpc, instance_id, phase) end local pdk_res, pdk_err = call_pdk_method( + instance_rpc.plugin.exposed_api, step_in.Data.Method, step_in.Data.Args) @@ -327,8 +298,10 @@ local function bridge_loop(instance_rpc, instance_id, phase) end -function Rpc:handle_event(plugin_name, conf, phase) - local instance_id, err = self.get_instance_id(plugin_name, conf) +function Rpc:handle_event(conf, phase) + local plugin_name = self.plugin.name + + local instance_id, err = self.plugin.instance_callbacks.get_instance_id(self.plugin, conf) if not err then _, err = bridge_loop(self, instance_id, phase) end @@ -337,16 +310,24 @@ function Rpc:handle_event(plugin_name, conf, phase) local err_lowered = err:lower() if str_find(err_lowered, "no plugin instance") then - self.reset_instance(plugin_name, conf) + self.plugin.instance_callbacks.reset_instance(plugin_name, conf) kong.log.warn(err) - return self:handle_event(plugin_name, conf, phase) + return self:handle_event(conf, phase) end kong.log.err(err) end end +local function new(plugin) + local self = setmetatable({ + msg_id = 0, + plugin = plugin, + }, Rpc) + return self +end - -return Rpc +return { + new = new, +} diff --git a/kong/runloop/plugin_servers/pb_rpc.lua b/kong/runloop/plugin_servers/rpc/pb_rpc.lua similarity index 88% rename from kong/runloop/plugin_servers/pb_rpc.lua rename to kong/runloop/plugin_servers/rpc/pb_rpc.lua index 8dbb85857abc..c1173fc0bc51 100644 --- a/kong/runloop/plugin_servers/pb_rpc.lua +++ b/kong/runloop/plugin_servers/rpc/pb_rpc.lua @@ -3,6 +3,7 @@ local cjson = require "cjson.safe" local grpc_tools = require "kong.tools.grpc" local pb = require "pb" local lpack = require "lua_pack" +local rpc_util = require "kong.runloop.plugin_servers.rpc.util" local ngx = ngx local kong = kong @@ -13,11 +14,9 @@ local st_unpack = lpack.unpack local str_find = string.find local proto_fname = "kong/pluginsocket.proto" - local Rpc = {} Rpc.__index = Rpc - local pb_unwrap do local structpb_value, structpb_list, structpb_struct @@ -176,24 +175,7 @@ do } end - -local function index_table(table, field) - if table[field] then - return table[field] - end - - local res = table - for segment, e in ngx.re.gmatch(field, "\\w+", "jo") do - if res[segment[0]] then - res = res[segment[0]] - else - return nil - end - end - return res -end - -local function load_service() +local function load_service(pdk) local p = grpc_tools.new() local protoc_instance = p.protoc_instance @@ -212,7 +194,7 @@ local function load_service() service[lower_name] = { method_name = method_name, - method = index_table(Rpc.exposed_api, lower_name), + method = rpc_util.index_table(pdk, lower_name), input_type = m.input_type, output_type = m.output_type, } @@ -233,13 +215,13 @@ local function identity_function(x) end -local function call_pdk(method_name, arg) +local function call_pdk(pdk, method_name, arg) local method = rpc_service[method_name] if not method then return nil, ("method %q not found"):format(method_name) end - local saved = Rpc.save_for_later[coroutine.running()] + local saved = pdk.get_saved_for_later() if saved and saved.plugin_name then kong_global.set_namespaced_log(kong, saved.plugin_name) end @@ -283,25 +265,10 @@ local function write_frame(c, msg) assert (c:send(msg)) end -function Rpc.new(socket_path, notifications) - - if not rpc_service then - rpc_service = load_service() - end - - --kong.log.debug("pb_rpc.new: ", socket_path) - return setmetatable({ - socket_path = socket_path, - msg_id = 0, - notifications_callbacks = notifications, - }, Rpc) -end - - function Rpc:call(method, data, do_bridge_loop) self.msg_id = self.msg_id + 1 local msg_id = self.msg_id - local c, err = ngx.socket.connect("unix:" .. self.socket_path) + local c, err = ngx.socket.connect("unix:" .. self.plugin.server_def.socket) if not c then kong.log.err("trying to connect: ", err) return nil, err @@ -336,7 +303,7 @@ function Rpc:call(method, data, do_bridge_loop) end local reply - reply, err = call_pdk(method_name, args) + reply, err = call_pdk(self.plugin.exposed_api, method_name, args) if not reply then return nil, err end @@ -371,7 +338,7 @@ function Rpc:call_start_instance(plugin_name, conf) return nil, err end - kong.log.debug("started plugin server: seq ", conf.__seq__, ", worker ", ngx.worker.id() or -1, ", instance id ", + kong.log.debug("started plugin server: seq ", conf.__seq__, ", worker ", ngx.worker.id(), ", instance id ", status.instance_status.instance_id) return { @@ -390,9 +357,10 @@ function Rpc:call_close_instance(instance_id) end +function Rpc:handle_event(conf, phase) + local plugin_name = self.plugin.name -function Rpc:handle_event(plugin_name, conf, phase) - local instance_id, err = self.get_instance_id(plugin_name, conf) + local instance_id, err = self.plugin.instance_callbacks.get_instance_id(self.plugin, conf) local res if not err then res, err = self:call("cmd_handle_event", { @@ -406,14 +374,28 @@ function Rpc:handle_event(plugin_name, conf, phase) if str_find(err_lowered, "no plugin instance", nil, true) or str_find(err_lowered, "closed", nil, true) then - self.reset_instance(plugin_name, conf) + self.plugin.instance_callbacks.reset_instance(plugin_name, conf) kong.log.warn(err) - return self:handle_event(plugin_name, conf, phase) + return self:handle_event(conf, phase) end kong.log.err(err) end end +local function new(plugin) + if not rpc_service then + rpc_service = load_service(plugin.exposed_api) + end + + local self = setmetatable({ + msg_id = 0, + plugin = plugin, + }, Rpc) + + return self +end -return Rpc +return { + new = new, +} diff --git a/kong/runloop/plugin_servers/rpc/util.lua b/kong/runloop/plugin_servers/rpc/util.lua new file mode 100644 index 000000000000..c868e1e23a30 --- /dev/null +++ b/kong/runloop/plugin_servers/rpc/util.lua @@ -0,0 +1,19 @@ +local function index_table(table, field) + if table[field] then + return table[field] + end + + local res = table + for segment, e in ngx.re.gmatch(field, "\\w+", "jo") do + if res[segment[0]] then + res = res[segment[0]] + else + return nil + end + end + return res + end + +return { + index_table = index_table, +} diff --git a/spec/01-unit/03-conf_loader_spec.lua b/spec/01-unit/03-conf_loader_spec.lua index 752471584a75..af1a4d400b9c 100644 --- a/spec/01-unit/03-conf_loader_spec.lua +++ b/spec/01-unit/03-conf_loader_spec.lua @@ -2481,4 +2481,70 @@ describe("Configuration loader", function() end) end) + describe("pluginserver config", function() + describe("fails if", function() + it("no query_command and not found in default location", function() + local _, err = conf_loader(nil, { + pluginserver_names = "gopher", + -- query_command = {}, + }) + assert.is_not_nil(err) + assert.matches("query_command undefined for pluginserver gopher", err) + end) + + -- TODO need one more test case: + -- if no query_command is given, `/usr/local/bin/` is checked + -- we cannot write there in tests - figure out a way to write the test + end) + describe("warns if", function() + it("no start_command (meaning process is externally maintained)", function() + local spy_log = spy.on(log, "warn") + + finally(function() + log.warn:revert() + assert:unregister("matcher", "str_match") + end) + + assert:register("matcher", "str_match", function (_state, arguments) + local expected = arguments[1] + return function(value) + return string.match(value, expected) ~= nil + end + end) + + local _, err = conf_loader(nil, { + pluginserver_names = "gopher", + pluginserver_gopher_query_cmd = "gopher -dump", + }) + assert.is_nil(err) + assert.spy(spy_log).was_called(1) + assert.spy(spy_log).was_called_with("start_command undefined for pluginserver gopher; assuming external process management") + end) + end) + it("fills in default settings", function() + local conf, err = conf_loader(nil, { + pluginserver_names = "gopher", + pluginserver_gopher_query_cmd = "gopher -dump", + pluginserver_gopher_start_cmd = "gopher -p $KONG_PREFIX", + }) + assert.is_nil(err) + assert.same("gopher", conf.pluginservers[1].name) + assert.same("gopher -dump", conf.pluginservers[1].query_command) + assert.same("gopher -p $KONG_PREFIX", conf.pluginservers[1].start_command) + assert.same(conf.prefix .. "/gopher.socket", conf.pluginservers[1].socket) + end) + it("accepts custom settings", function() + local conf, err = conf_loader(nil, { + pluginserver_names = "gopher", + pluginserver_gopher_query_cmd = "gopher -dump", + pluginserver_gopher_start_cmd = "gopher -p $KONG_PREFIX", + pluginserver_gopher_socket = "/foo/bar/gopher.socket", + }) + assert.is_nil(err) + assert.same("gopher", conf.pluginservers[1].name) + assert.same("gopher -dump", conf.pluginservers[1].query_command) + assert.same("gopher -p $KONG_PREFIX", conf.pluginservers[1].start_command) + assert.same("/foo/bar/gopher.socket", conf.pluginservers[1].socket) + end) + end) end) diff --git a/spec/02-integration/10-external-plugins/01-process-management_spec.lua b/spec/02-integration/10-external-plugins/01-process-management_spec.lua new file mode 100644 index 000000000000..04c983773b7d --- /dev/null +++ b/spec/02-integration/10-external-plugins/01-process-management_spec.lua @@ -0,0 +1,160 @@ +local helpers = require "spec.helpers" + +for _, strategy in helpers.each_strategy() do + describe("manages a pluginserver #" .. strategy, function() + lazy_setup(function() + assert(helpers.get_db_utils(strategy, { + "services", + "routes", + "plugins", + })) + end) + + describe("process management", function() + it("starts/stops an external plugin server [golang]", function() + local kong_prefix = helpers.test_conf.prefix + + assert(helpers.start_kong({ + nginx_conf = "spec/fixtures/custom_nginx.template", + log_level = "notice", + database = strategy, + plugins = "bundled,go-hello", + pluginserver_names = "test", + pluginserver_test_socket = kong_prefix .. "/go-hello.socket", + pluginserver_test_query_cmd = helpers.external_plugins_path .. "/go/go-hello -dump", + pluginserver_test_start_cmd = helpers.external_plugins_path .. "/go/go-hello -kong-prefix " .. kong_prefix, + })) + assert.logfile().has.line([[started, pid [0-9]+]]) + assert(helpers.stop_kong(nil, true)) + assert.logfile().has.line([[successfully stopped pluginserver 'test', pid [0-9]+]]) + end) + + it("starts/stops an external plugin server [python]", function() + local kong_prefix = helpers.test_conf.prefix + + assert(helpers.start_kong({ + nginx_conf = "spec/fixtures/custom_nginx.template", + log_level = "notice", + database = strategy, + plugins = "bundled,py-hello", + pluginserver_names = "test", + pluginserver_test_socket = kong_prefix .. "/py-hello.socket", + pluginserver_test_query_cmd = helpers.external_plugins_path .. "/py/py-hello.py --dump", + pluginserver_test_start_cmd = helpers.external_plugins_path .. "/py/py-hello.py --socket-name py-hello.socket --kong-prefix " .. kong_prefix, + })) + assert.logfile().has.line([[started, pid [0-9]+]]) + assert(helpers.stop_kong(nil, true)) + assert.logfile().has.line([[successfully stopped pluginserver 'test', pid [0-9]+]]) + end) + + it("starts/stops an external plugin server [golang, python]", function() + local kong_prefix = helpers.test_conf.prefix + + assert(helpers.start_kong({ + nginx_conf = "spec/fixtures/custom_nginx.template", + log_level = "notice", + database = strategy, + plugins = "bundled,go-hello,py-hello", + pluginserver_names = "test-go,test-py", + pluginserver_test_go_socket = kong_prefix .. "/go-hello.socket", + pluginserver_test_go_query_cmd = helpers.external_plugins_path .. "/go/go-hello -dump", + pluginserver_test_go_start_cmd = helpers.external_plugins_path .. "/go/go-hello -kong-prefix " .. kong_prefix, + pluginserver_test_py_socket = kong_prefix .. "/py-hello.socket", + pluginserver_test_py_query_cmd = helpers.external_plugins_path .. "/py/py-hello.py --dump", + pluginserver_test_py_start_cmd = helpers.external_plugins_path .. "/py/py-hello.py --socket-name py-hello.socket --kong-prefix " .. kong_prefix, + })) + assert.logfile().has.line([[started, pid [0-9]+]]) + assert(helpers.stop_kong(nil, true)) + assert.logfile().has.line([[successfully stopped pluginserver 'test-go', pid [0-9]+]]) + assert.logfile().has.line([[successfully stopped pluginserver 'test-py', pid [0-9]+]]) + end) + end) + + it("queries plugin info [golang]", function() + local proc_management = require "kong.runloop.plugin_servers.process" + local kong_prefix = helpers.test_conf.prefix + local conf_loader = require "kong.conf_loader" + + local conf, err = conf_loader(nil, { + plugins = "bundled,go-hello", + pluginserver_names = "test", + pluginserver_test_socket = kong_prefix .. "/go-hello.socket", + pluginserver_test_query_cmd = helpers.external_plugins_path .. "/go/go-hello -dump", + pluginserver_test_start_cmd = helpers.external_plugins_path .. "/go/go-hello -kong-prefix " .. kong_prefix, + }) + assert.is_nil(err) + + helpers.build_go_plugins(helpers.external_plugins_path .. "/go") + local plugin_infos = proc_management.load_external_plugins_info(conf) + assert.not_nil(plugin_infos["go-hello"]) + + local info = plugin_infos["go-hello"] + assert.equal(1, info.PRIORITY) + assert.equal("0.1", info.VERSION) + assert.equal("go-hello", info.name) + assert.same({ "access", "response", "log" }, info.phases) + assert.same("ProtoBuf:1", info.server_def.protocol) + end) + + it("queries plugin info [python]", function() + local proc_management = require "kong.runloop.plugin_servers.process" + local kong_prefix = helpers.test_conf.prefix + local conf_loader = require "kong.conf_loader" + + local conf, err = conf_loader(nil, { + plugins = "bundled,py-hello", + pluginserver_names = "test", + pluginserver_test_socket = kong_prefix .. "/py-hello.socket", + pluginserver_test_query_cmd = helpers.external_plugins_path .. "/py/py-hello.py --dump", + pluginserver_test_start_cmd = helpers.external_plugins_path .. "/py/py-hello.py --socket-name py-hello.socket --kong-prefix " .. kong_prefix, + }) + assert.is_nil(err) + + local plugin_infos = proc_management.load_external_plugins_info(conf) + assert.not_nil(plugin_infos["py-hello"]) + + local info = plugin_infos["py-hello"] + assert.equal(100, info.PRIORITY) + assert.equal("0.1.0", info.VERSION) + assert.equal("py-hello", info.name) + assert.same({ "access" }, info.phases) + assert.same("MsgPack:1", info.server_def.protocol) + end) + + it("queries plugin info [golang, python]", function() + local proc_management = require "kong.runloop.plugin_servers.process" + local kong_prefix = helpers.test_conf.prefix + local conf_loader = require "kong.conf_loader" + + local conf, err = conf_loader(nil, { + plugins = "bundled,py-hello", + pluginserver_names = "test-go,test-py", + pluginserver_test_go_socket = kong_prefix .. "/go-hello.socket", + pluginserver_test_go_query_cmd = helpers.external_plugins_path .. "/go/go-hello -dump", + pluginserver_test_go_start_cmd = helpers.external_plugins_path .. "/go/go-hello -kong-prefix " .. kong_prefix, + pluginserver_test_py_socket = kong_prefix .. "/py-hello.socket", + pluginserver_test_py_query_cmd = helpers.external_plugins_path .. "/py/py-hello.py --dump", + pluginserver_test_py_start_cmd = helpers.external_plugins_path .. "/py/py-hello.py --socket-name py-hello.socket --kong-prefix " .. kong_prefix, + }) + assert.is_nil(err) + + local plugin_infos = proc_management.load_external_plugins_info(conf) + assert.not_nil(plugin_infos["go-hello"]) + assert.not_nil(plugin_infos["py-hello"]) + + local go_info = plugin_infos["go-hello"] + assert.equal(1, go_info.PRIORITY) + assert.equal("0.1", go_info.VERSION) + assert.equal("go-hello", go_info.name) + assert.same({ "access", "response", "log" }, go_info.phases) + assert.same("ProtoBuf:1", go_info.server_def.protocol) + + local py_info = plugin_infos["py-hello"] + assert.equal(100, py_info.PRIORITY) + assert.equal("0.1.0", py_info.VERSION) + assert.equal("py-hello", py_info.name) + assert.same({ "access" }, py_info.phases) + assert.same("MsgPack:1", py_info.server_def.protocol) + end) + end) +end diff --git a/spec/02-integration/10-external-plugins/02-execution_spec.lua b/spec/02-integration/10-external-plugins/02-execution_spec.lua new file mode 100644 index 000000000000..72e6e20982de --- /dev/null +++ b/spec/02-integration/10-external-plugins/02-execution_spec.lua @@ -0,0 +1,76 @@ +local helpers = require "spec.helpers" + +for _, strategy in helpers.each_strategy() do + describe("anonymous reports for go plugins #" .. strategy, function() + lazy_setup(function() + local bp = assert(helpers.get_db_utils(strategy, { + "services", + "routes", + "plugins", + })) + + assert(bp.services:insert {}) + assert(bp.routes:insert({ + protocols = { "http" }, + paths = { "/" } + })) + + local kong_prefix = helpers.test_conf.prefix + + assert(helpers.start_kong({ + nginx_conf = "spec/fixtures/custom_nginx.template", + database = strategy, + plugins = "bundled,reports-api,go-hello,py-hello", + pluginserver_names = "test-go,test-py", + pluginserver_test_go_socket = kong_prefix .. "/go-hello.socket", + pluginserver_test_go_query_cmd = helpers.external_plugins_path .. "/go/go-hello -dump -kong-prefix " .. kong_prefix, + pluginserver_test_go_start_cmd = helpers.external_plugins_path .. "/go/go-hello -kong-prefix " .. kong_prefix, + pluginserver_test_py_socket = kong_prefix .. "/py-hello.socket", + pluginserver_test_py_query_cmd = helpers.external_plugins_path .. "/py/py-hello.py --dump", + pluginserver_test_py_start_cmd = helpers.external_plugins_path .. "/py/py-hello.py --socket-name py-hello.socket --kong-prefix " .. kong_prefix, + })) + + local admin_client = helpers.admin_client() + + local res = admin_client:post("/plugins", { + headers = { + ["Content-Type"] = "application/json" + }, + body = { + name = "go-hello", + config = { + message = "Kong!" + } + } + }) + assert.res_status(201, res) + + res = admin_client:post("/plugins", { + headers = { + ["Content-Type"] = "application/json" + }, + body = { + name = "py-hello", + config = { + message = "Kong!" + } + } + }) + assert.res_status(201, res) + end) + + lazy_teardown(function() + helpers.stop_kong() + end) + + it("executes external plugins [golang, python]", function() + local proxy_client = assert(helpers.proxy_client()) + local res = proxy_client:get("/") + assert.res_status(200, res) + local h = assert.response(res).has.header("x-hello-from-go") + assert.matches("Go says Kong! to", h) + h = assert.response(res).has.header("x-hello-from-python") + assert.matches("Python says Kong! to", h) + end) + end) +end diff --git a/spec/02-integration/10-go_plugins/01-reports_spec.lua b/spec/02-integration/10-external-plugins/99-reports_spec.lua similarity index 95% rename from spec/02-integration/10-go_plugins/01-reports_spec.lua rename to spec/02-integration/10-external-plugins/99-reports_spec.lua index d3457d1683fe..6d110f619aa6 100644 --- a/spec/02-integration/10-go_plugins/01-reports_spec.lua +++ b/spec/02-integration/10-external-plugins/99-reports_spec.lua @@ -53,8 +53,8 @@ for _, strategy in helpers.each_strategy() do plugins = "bundled,reports-api,go-hello", pluginserver_names = "test", pluginserver_test_socket = kong_prefix .. "/go-hello.socket", - pluginserver_test_query_cmd = "./spec/fixtures/go/go-hello -dump -kong-prefix " .. kong_prefix, - pluginserver_test_start_cmd = "./spec/fixtures/go/go-hello -kong-prefix " .. kong_prefix, + pluginserver_test_query_cmd = helpers.external_plugins_path .. "/go/go-hello -dump -kong-prefix " .. kong_prefix, + pluginserver_test_start_cmd = helpers.external_plugins_path .. "/go/go-hello -kong-prefix " .. kong_prefix, anonymous_reports = true, })) diff --git a/spec/fixtures/go/go-hello.go b/spec/fixtures/external_plugins/go/go-hello.go similarity index 100% rename from spec/fixtures/go/go-hello.go rename to spec/fixtures/external_plugins/go/go-hello.go diff --git a/spec/fixtures/go/go.mod b/spec/fixtures/external_plugins/go/go.mod similarity index 100% rename from spec/fixtures/go/go.mod rename to spec/fixtures/external_plugins/go/go.mod diff --git a/spec/fixtures/go/go.sum b/spec/fixtures/external_plugins/go/go.sum similarity index 100% rename from spec/fixtures/go/go.sum rename to spec/fixtures/external_plugins/go/go.sum diff --git a/spec/fixtures/external_plugins/js/js-hello.js b/spec/fixtures/external_plugins/js/js-hello.js new file mode 100644 index 000000000000..b1141644bd62 --- /dev/null +++ b/spec/fixtures/external_plugins/js/js-hello.js @@ -0,0 +1,33 @@ +'use strict'; + +// This is an example plugin that add a header to the response + +class KongPlugin { + constructor(config) { + this.config = config + } + + async access(kong) { + let host = await kong.request.getHeader("host") + if (host === undefined) { + return await kong.log.err("unable to get header for request") + } + + let message = this.config.message || "hello" + + // the following can be "parallel"ed + await Promise.all([ + kong.response.setHeader("x-hello-from-javascript", "Javascript says " + message + " to " + host), + kong.response.setHeader("x-javascript-pid", process.pid), + ]) + } +} + +module.exports = { + Plugin: KongPlugin, + Schema: [ + { message: { type: "string" } }, + ], + Version: '0.1.0', + Priority: 0, +} \ No newline at end of file diff --git a/spec/fixtures/external_plugins/py/py-hello.py b/spec/fixtures/external_plugins/py/py-hello.py new file mode 100755 index 000000000000..3301e059e157 --- /dev/null +++ b/spec/fixtures/external_plugins/py/py-hello.py @@ -0,0 +1,37 @@ +#!/usr/bin/env python3 +import os +import kong_pdk.pdk.kong as kong + +Schema = ( + {"message": {"type": "string"}}, +) + +version = '0.1.0' +priority = 100 + +# This is an example plugin that add a header to the response + +class Plugin(object): + def __init__(self, config): + self.config = config + + def access(self, kong: kong.kong): + host, err = kong.request.get_header("host") + if err: + pass # error handling + # if run with --no-lua-style + # try: + # host = kong.request.get_header("host") + # except Exception as ex: + # pass # error handling + message = "hello" + if 'message' in self.config: + message = self.config['message'] + kong.response.set_header("x-hello-from-python", "Python says %s to %s" % (message, host)) + kong.response.set_header("x-python-pid", str(os.getpid())) + + +# add below section to allow this plugin optionally be running in a dedicated process +if __name__ == "__main__": + from kong_pdk.cli import start_dedicated_server + start_dedicated_server("py-hello", Plugin, version, priority, Schema) diff --git a/spec/fixtures/external_plugins/py/requirements.txt b/spec/fixtures/external_plugins/py/requirements.txt new file mode 100644 index 000000000000..0f887887fd55 --- /dev/null +++ b/spec/fixtures/external_plugins/py/requirements.txt @@ -0,0 +1 @@ +kong-pdk diff --git a/spec/helpers.lua b/spec/helpers.lua index cea72bad2b71..f3c3a9c88ee1 100644 --- a/spec/helpers.lua +++ b/spec/helpers.lua @@ -10,7 +10,7 @@ local TEST_CONF_PATH = os.getenv("KONG_SPEC_TEST_CONF_PATH") or "spec/kong_tests local CUSTOM_PLUGIN_PATH = "./spec/fixtures/custom_plugins/?.lua" local CUSTOM_VAULT_PATH = "./spec/fixtures/custom_vaults/?.lua;./spec/fixtures/custom_vaults/?/init.lua" local DNS_MOCK_LUA_PATH = "./spec/fixtures/mocks/lua-resty-dns/?.lua" -local GO_PLUGIN_PATH = "./spec/fixtures/go" +local EXTERNAL_PLUGINS_PATH = "./spec/fixtures/external_plugins" local GRPC_TARGET_SRC_PATH = "./spec/fixtures/grpc/target/" local MOCK_UPSTREAM_PROTOCOL = "http" local MOCK_UPSTREAM_SSL_PROTOCOL = "https" @@ -3744,8 +3744,8 @@ local function start_kong(env, tables, preserve_prefix, fixtures) -- go plugins are enabled -- compile fixture go plugins if any setting mentions it for _,v in pairs(env) do - if type(v) == "string" and v:find(GO_PLUGIN_PATH) then - build_go_plugins(GO_PLUGIN_PATH) + if type(v) == "string" and v:find(EXTERNAL_PLUGINS_PATH .. "/go") then + build_go_plugins(EXTERNAL_PLUGINS_PATH .. "/go") break end end @@ -4218,7 +4218,7 @@ end bin_path = BIN_PATH, test_conf = conf, test_conf_path = TEST_CONF_PATH, - go_plugin_path = GO_PLUGIN_PATH, + external_plugins_path = EXTERNAL_PLUGINS_PATH, mock_upstream_hostname = MOCK_UPSTREAM_HOSTNAME, mock_upstream_protocol = MOCK_UPSTREAM_PROTOCOL, mock_upstream_host = MOCK_UPSTREAM_HOST, @@ -4397,4 +4397,6 @@ end get_available_port = get_available_port, make_temp_dir = make_temp_dir, + + build_go_plugins = build_go_plugins, }