From b54b96222e0ad197f7fc1fb046ef008a10983d15 Mon Sep 17 00:00:00 2001 From: samugi Date: Wed, 3 Jul 2024 10:24:21 +0200 Subject: [PATCH] feat(pdk): telemetry log This commit: * introduces the `kong.telemetry` pdk module * adds the `kong.telemetry.log` function to allow generating log entries meant to be reported via the OpenTelemetry plugin --- .../unreleased/kong/pdk-telemetry-log.yml | 5 + kong/globalpatches.lua | 2 +- kong/observability/logs.lua | 19 +- kong/pdk/init.lua | 1 + kong/pdk/log.lua | 2 +- kong/pdk/telemetry.lua | 91 ++++++++ .../01-unit/26-observability/05-logs_spec.lua | 10 +- .../06-telemetry-pdk_spec.lua | 38 ++++ .../06-telemetry-pdk_spec.lua | 206 ++++++++++++++++++ .../kong/plugins/pdk-logger/handler.lua | 51 +++++ .../kong/plugins/pdk-logger/schema.lua | 18 ++ 11 files changed, 433 insertions(+), 10 deletions(-) create mode 100644 changelog/unreleased/kong/pdk-telemetry-log.yml create mode 100644 kong/pdk/telemetry.lua create mode 100644 spec/01-unit/26-observability/06-telemetry-pdk_spec.lua create mode 100644 spec/02-integration/14-observability/06-telemetry-pdk_spec.lua create mode 100644 spec/fixtures/custom_plugins/kong/plugins/pdk-logger/handler.lua create mode 100644 spec/fixtures/custom_plugins/kong/plugins/pdk-logger/schema.lua diff --git a/changelog/unreleased/kong/pdk-telemetry-log.yml b/changelog/unreleased/kong/pdk-telemetry-log.yml new file mode 100644 index 000000000000..3de258d3f6ec --- /dev/null +++ b/changelog/unreleased/kong/pdk-telemetry-log.yml @@ -0,0 +1,5 @@ +message: | + Added a new PDK module `kong.telemetry` and function: `kong.telemetry.log` + to generate log entries to be reported via the OpenTelemetry plugin. +type: feature +scope: PDK diff --git a/kong/globalpatches.lua b/kong/globalpatches.lua index 99c128560df1..c835d439e6b3 100644 --- a/kong/globalpatches.lua +++ b/kong/globalpatches.lua @@ -608,7 +608,7 @@ return function(options) -- 4: patched function -- 5: caller hook_called = true - dynamic_hook.run_hook("observability_logs", "push", 5, ...) + dynamic_hook.run_hook("observability_logs", "push", 5, nil, ...) hook_called = false return old_ngx_log(...) end diff --git a/kong/observability/logs.lua b/kong/observability/logs.lua index f936b2e23c0c..b00f6ade8f4b 100644 --- a/kong/observability/logs.lua +++ b/kong/observability/logs.lua @@ -11,6 +11,7 @@ end local request_id_get = require "kong.observability.tracing.request_id".get local time_ns = require "kong.tools.time".time_ns +local table_merge = require "kong.tools.table".table_merge local cycle_aware_deep_copy = require "kong.tools.utils".cycle_aware_deep_copy local get_log_level = require "resty.kong.log".get_log_level @@ -57,7 +58,7 @@ local function concat_tostring(tab) end -local function generate_log_entry(request_scoped, log_level, log_str, request_id, debug_info) +local function generate_log_entry(request_scoped, inj_attributes, log_level, log_str, request_id, debug_info) local span_id @@ -77,6 +78,9 @@ local function generate_log_entry(request_scoped, log_level, log_str, request_id ["introspection.source"] = debug_info.source, ["introspection.what"] = debug_info.what, } + if inj_attributes then + attributes = table_merge(attributes, inj_attributes) + end local now_ns = time_ns() return { @@ -100,13 +104,13 @@ local function get_request_log_buffer() end -function _M.maybe_push(stack_level, log_level, ...) +function _M.maybe_push(stack_level, attributes, log_level, ...) -- WARNING: do not yield in this function, as it is called from ngx.log -- Early return cases: -- log level too low - if configured_log_level() < log_level then + if log_level and configured_log_level() < log_level then return end @@ -138,7 +142,14 @@ function _M.maybe_push(stack_level, log_level, ...) -- generate & push log entry local debug_info = debug.getinfo(stack_level, "nSl") - local log_entry = generate_log_entry(request_scoped, log_level, log_str, request_id, debug_info) + local log_entry = generate_log_entry( + request_scoped, + attributes, + log_level, + log_str, + request_id, + debug_info + ) table.insert(log_buffer, log_entry) end diff --git a/kong/pdk/init.lua b/kong/pdk/init.lua index 858795a368e1..13974dd3bb1d 100644 --- a/kong/pdk/init.lua +++ b/kong/pdk/init.lua @@ -208,6 +208,7 @@ local MAJOR_MODULES = { "vault", "tracing", "plugin", + "telemetry", } if ngx.config.subsystem == 'http' then diff --git a/kong/pdk/log.lua b/kong/pdk/log.lua index 61d4d99c9c51..990ea202471d 100644 --- a/kong/pdk/log.lua +++ b/kong/pdk/log.lua @@ -315,7 +315,7 @@ local function gen_log_func(lvl_const, imm_buf, to_string, stack_level, sep) -- 1: maybe_push -- 2: dynamic_hook.pcall -- 3: dynamic_hook.run_hook - dynamic_hook.run_hook("observability_logs", "push", stack_level + 3, lvl_const, ...) + dynamic_hook.run_hook("observability_logs", "push", stack_level + 3, nil, lvl_const, ...) local n = select("#", ...) diff --git a/kong/pdk/telemetry.lua b/kong/pdk/telemetry.lua new file mode 100644 index 000000000000..e47d8ef5ea6a --- /dev/null +++ b/kong/pdk/telemetry.lua @@ -0,0 +1,91 @@ +--- +-- The telemetry module provides capabilities for telemetry operations. +-- +-- @module kong.telemetry.log + + +local dynamic_hook = require("kong.dynamic_hook") + +local dyn_hook_run_hook = dynamic_hook.run_hook +local dyn_hook_is_group_enabled = dynamic_hook.is_group_enabled + +local function new() + local telemetry = {} + + + --- + -- Records a structured log entry, to be reported via the OpenTelemetry plugin. + -- + -- This function has a dependency on the OpenTelemetry plugin, which must be + -- configured to report OpenTelemetry logs. + -- + -- @function kong.telemetry.log + -- @phases `rewrite`, `access`, `balancer`, `timer`, `header_filter`, + -- `response`, `body_filter`, `log` + -- @tparam string plugin_name the name of the plugin + -- @tparam table plugin_config the plugin configuration + -- @tparam string message_type the type of the log message, useful to categorize + -- the log entry + -- @tparam string message the log message + -- @tparam table attributes structured information to be included in the + -- `attributes` field of the log entry + -- @usage + -- local attributes = { + -- http_method = kong.request.get_method() + -- ["node.id"] = kong.node.get_id(), + -- hostname = kong.node.get_hostname(), + -- } + -- + -- local ok, err = kong.telemetry.log("my_plugin", conf, "result", "successful operation", attributes) + telemetry.log = function(plugin_name, plugin_config, message_type, message, attributes) + if type(plugin_name) ~= "string" then + return nil, "plugin_name must be a string" + end + + if type(plugin_config) ~= "table" then + return nil, "plugin_config must be a table" + end + + if type(message_type) ~= "string" then + return nil, "message_type must be a string" + end + + if message and type(message) ~= "string" then + return nil, "message must be a string" + end + + if attributes and type(attributes) ~= "table" then + return nil, "attributes must be a table" + end + + local hook_group = "observability_logs" + if not dyn_hook_is_group_enabled(hook_group) then + return nil, "Telemetry logging is disabled: log entry will not be recorded. " .. + "Ensure the OpenTelemetry plugin is correctly configured to " .. + "report logs in order to use this feature." + end + + attributes = attributes or {} + attributes["message.type"] = message_type + attributes["plugin.name"] = plugin_name + attributes["plugin.id"] = plugin_config.__plugin_id + attributes["plugin.instance.name"] = plugin_config.plugin_instance_name + + -- stack level = 5: + -- 1: maybe_push + -- 2: dynamic_hook.pcall + -- 3: dynamic_hook.run_hook + -- 4: kong.telemetry.log + -- 5: caller + dyn_hook_run_hook(hook_group, "push", 5, attributes, nil, message) + return true + end + + + return telemetry +end + + +return { + new = new, +} diff --git a/spec/01-unit/26-observability/05-logs_spec.lua b/spec/01-unit/26-observability/05-logs_spec.lua index 05b9ca2856dd..84d36111fe86 100644 --- a/spec/01-unit/26-observability/05-logs_spec.lua +++ b/spec/01-unit/26-observability/05-logs_spec.lua @@ -40,7 +40,7 @@ describe("Observability/Logs unit tests", function() end) it("has no effect when no log line is provided", function() - maybe_push(1, ngx.INFO) + maybe_push(1, nil, ngx.INFO) local worker_logs = get_worker_logs() assert.same({}, worker_logs) local request_logs = get_request_logs() @@ -48,7 +48,7 @@ describe("Observability/Logs unit tests", function() end) it("has no effect when log line is empty", function() - maybe_push(1, ngx.INFO, "") + maybe_push(1, nil, ngx.INFO, "") local worker_logs = get_worker_logs() assert.same({}, worker_logs) local request_logs = get_request_logs() @@ -56,7 +56,7 @@ describe("Observability/Logs unit tests", function() end) it("has no effect when log level is lower than the configured value", function() - maybe_push(1, ngx.DEBUG, "Don't mind me, I'm just a debug log") + maybe_push(1, nil, ngx.DEBUG, "Don't mind me, I'm just a debug log") local worker_logs = get_worker_logs() assert.same({}, worker_logs) local request_logs = get_request_logs() @@ -67,7 +67,7 @@ describe("Observability/Logs unit tests", function() local log_level = ngx.WARN local body = "Careful! I'm a warning!" - maybe_push(1, log_level, body) + maybe_push(1, { foo = "bar", tst = "baz" }, log_level, body) local worker_logs = get_worker_logs() assert.equals(1, #worker_logs) @@ -80,6 +80,8 @@ describe("Observability/Logs unit tests", function() assert.is_string(logged_entry.attributes["introspection.namewhat"]) assert.is_string(logged_entry.attributes["introspection.source"]) assert.is_string(logged_entry.attributes["introspection.what"]) + assert.equals("bar", logged_entry.attributes.foo) + assert.equals("baz", logged_entry.attributes.tst) assert.is_number(logged_entry.observed_time_unix_nano) assert.is_number(logged_entry.time_unix_nano) end) diff --git a/spec/01-unit/26-observability/06-telemetry-pdk_spec.lua b/spec/01-unit/26-observability/06-telemetry-pdk_spec.lua new file mode 100644 index 000000000000..14139810770f --- /dev/null +++ b/spec/01-unit/26-observability/06-telemetry-pdk_spec.lua @@ -0,0 +1,38 @@ +require "kong.tools.utils" + + +describe("Telemetry PDK unit tests", function() + describe("log()", function() + local old_kong = _G.kong + + lazy_setup(function() + local kong_global = require "kong.global" + _G.kong = kong_global.new() + kong_global.init_pdk(kong) + end) + + lazy_teardown(function() + _G.kong = old_kong + end) + + it("fails as expected with invalid input", function() + local ok, err = kong.telemetry.log() + assert.is_nil(ok) + assert.equals("plugin_name must be a string", err) + + ok, err = kong.telemetry.log("plugin_name") + assert.is_nil(ok) + assert.equals("plugin_config must be a table", err) + + ok, err = kong.telemetry.log("plugin_name", {}) + assert.is_nil(ok) + assert.equals("message_type must be a string", err) + end) + + it ("considers attributes and message as optional", function() + local ok, err = kong.telemetry.log("plugin_name", {}, "message_type") + assert.is_nil(ok) + assert.matches("Telemetry logging is disabled", err) + end) + end) +end) diff --git a/spec/02-integration/14-observability/06-telemetry-pdk_spec.lua b/spec/02-integration/14-observability/06-telemetry-pdk_spec.lua new file mode 100644 index 000000000000..679fdb27cc4b --- /dev/null +++ b/spec/02-integration/14-observability/06-telemetry-pdk_spec.lua @@ -0,0 +1,206 @@ +local helpers = require "spec.helpers" +local pb = require "pb" + +local HTTP_SERVER_PORT_LOGS = helpers.get_available_port() + + +for _, strategy in helpers.each_strategy() do + describe("kong.pdk.telemetry #" .. strategy, function() + local bp + local plugin_instance_name = "my-pdk-logger-instance" + + describe("log", function() + describe("with OpenTelemetry", function() + local mock_logs + + lazy_setup(function() + bp, _ = assert(helpers.get_db_utils(strategy, { + "services", + "routes", + "plugins", + }, { "opentelemetry", "pdk-logger" })) + + local http_srv = assert(bp.services:insert { + name = "mock-service", + host = helpers.mock_upstream_host, + port = helpers.mock_upstream_port, + }) + + local logs_route = assert(bp.routes:insert({ + service = http_srv, + protocols = { "http" }, + paths = { "/logs" } + })) + + assert(bp.plugins:insert({ + name = "opentelemetry", + route = logs_route, + config = { + logs_endpoint = "http://127.0.0.1:" .. HTTP_SERVER_PORT_LOGS, + queue = { + max_batch_size = 1000, + max_coalescing_delay = 2, + }, + } + })) + + assert(bp.plugins:insert({ + name = "pdk-logger", + route = logs_route, + config = {}, + instance_name = plugin_instance_name, + })) + + assert(helpers.start_kong({ + database = strategy, + nginx_conf = "spec/fixtures/custom_nginx.template", + plugins = "opentelemetry,pdk-logger", + })) + + mock_logs = helpers.http_mock(HTTP_SERVER_PORT_LOGS, { timeout = 1 }) + end) + + lazy_teardown(function() + helpers.stop_kong() + if mock_logs then + mock_logs("close", true) + end + end) + + local function assert_find_valid_logs(body, request_id) + local decoded = assert(pb.decode("opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest", body)) + assert.not_nil(decoded) + + local scope_logs = decoded.resource_logs[1].scope_logs + assert.is_true(#scope_logs > 0, scope_logs) + + local found = 0 + for _, scope_log in ipairs(scope_logs) do + local log_records = scope_log.log_records + for _, log_record in ipairs(log_records) do + local logline = log_record.body.string_value + + -- from the pdk-logger plugin: + local plugin_name = "pdk-logger" + local message = "log message" + local attributes = { + some_key = "some_value", + some_other_key = "some_other_value" + } + local expected_message_types = { + "rewrite_phase", + "access_phase", + "header_filter_phase", + "body_filter_phase", + "log_phase", + } + + -- filter the right log lines + if string.find(logline, message) then + assert.is_table(log_record.attributes) + local found_attrs = {} + for _, attr in ipairs(log_record.attributes) do + found_attrs[attr.key] = attr.value[attr.value.value] + end + + -- ensure the log is from the current request + if found_attrs["request.id"] == request_id then + assert.is_number(log_record.time_unix_nano) + assert.is_number(log_record.observed_time_unix_nano) + + assert.is_string(found_attrs["plugin.id"]) + assert.is_number(found_attrs["introspection.current.line"]) + assert.matches("pdk%-logger/handler%.lua", found_attrs["introspection.source"]) + assert.partial_match(attributes, found_attrs) + assert.equals(plugin_name, found_attrs["plugin.name"]) + assert.equals(plugin_instance_name, found_attrs["plugin.instance.name"]) + assert.contains(found_attrs["message.type"], expected_message_types) + + found = found + 1 + end + end + end + end + assert.equals(5, found) + end + + it("produces and exports valid logs", function() + local headers, body, request_id + + local cli = helpers.proxy_client() + local res = assert(cli:send { + method = "GET", + path = "/logs", + }) + assert.res_status(200, res) + cli:close() + + request_id = res.headers["X-Kong-Request-Id"] + + helpers.wait_until(function() + local lines + lines, body, headers = mock_logs() + + return lines + end, 10) + + assert.is_string(body) + assert.equals(headers["Content-Type"], "application/x-protobuf") + + assert_find_valid_logs(body, request_id) + end) + end) + + describe("without OpenTelemetry", function() + lazy_setup(function() + bp, _ = assert(helpers.get_db_utils(strategy, { + "services", + "routes", + "plugins", + }, { "pdk-logger" })) + + local http_srv = assert(bp.services:insert { + name = "mock-service", + host = helpers.mock_upstream_host, + port = helpers.mock_upstream_port, + }) + + local logs_route = assert(bp.routes:insert({ + service = http_srv, + protocols = { "http" }, + paths = { "/logs" } + })) + + assert(bp.plugins:insert({ + name = "pdk-logger", + route = logs_route, + config = {}, + instance_name = plugin_instance_name, + })) + + assert(helpers.start_kong({ + database = strategy, + nginx_conf = "spec/fixtures/custom_nginx.template", + plugins = "pdk-logger", + })) + end) + + lazy_teardown(function() + helpers.stop_kong() + end) + + it("handles errors correctly", function() + local cli = helpers.proxy_client() + local res = assert(cli:send { + method = "GET", + path = "/logs", + }) + assert.res_status(200, res) + cli:close() + + assert.logfile().has.line("Telemetry logging is disabled", true) + end) + end) + end) + end) +end diff --git a/spec/fixtures/custom_plugins/kong/plugins/pdk-logger/handler.lua b/spec/fixtures/custom_plugins/kong/plugins/pdk-logger/handler.lua new file mode 100644 index 000000000000..b3c3021709a5 --- /dev/null +++ b/spec/fixtures/custom_plugins/kong/plugins/pdk-logger/handler.lua @@ -0,0 +1,51 @@ +local PDKLoggerHandler = { + VERSION = "0.1-t", + PRIORITY = 1000, +} + +local plugin_name = "pdk-logger" +local message = "log message" +local attributes = { some_key = "some_value", some_other_key = "some_other_value"} + + +function PDKLoggerHandler:rewrite(conf) + local ok, err = kong.telemetry.log(plugin_name, conf, "rewrite_phase", message, attributes) + if not ok then + kong.log.err(err) + end +end + + +function PDKLoggerHandler:access(conf) + local ok, err = kong.telemetry.log(plugin_name, conf, "access_phase", message, attributes) + if not ok then + kong.log.err(err) + end +end + + +function PDKLoggerHandler:header_filter(conf) + local ok, err = kong.telemetry.log(plugin_name, conf, "header_filter_phase", message, attributes) + if not ok then + kong.log.err(err) + end +end + + +function PDKLoggerHandler:body_filter(conf) + local ok, err = kong.telemetry.log(plugin_name, conf, "body_filter_phase", message, attributes) + if not ok then + kong.log.err(err) + end +end + + +function PDKLoggerHandler:log(conf) + local ok, err = kong.telemetry.log(plugin_name, conf, "log_phase", message, attributes) + if not ok then + kong.log.err(err) + end +end + + +return PDKLoggerHandler diff --git a/spec/fixtures/custom_plugins/kong/plugins/pdk-logger/schema.lua b/spec/fixtures/custom_plugins/kong/plugins/pdk-logger/schema.lua new file mode 100644 index 000000000000..cbdafd0a09e7 --- /dev/null +++ b/spec/fixtures/custom_plugins/kong/plugins/pdk-logger/schema.lua @@ -0,0 +1,18 @@ +local typedefs = require "kong.db.schema.typedefs" + + +return { + name = "pdk-logger", + fields = { + { + protocols = typedefs.protocols { default = { "http", "https", "tcp", "tls", "grpc", "grpcs" } }, + }, + { + config = { + type = "record", + fields = { + }, + }, + }, + }, +}