diff --git a/changelog/unreleased/kong/pdk-telemetry-log.yml b/changelog/unreleased/kong/pdk-telemetry-log.yml new file mode 100644 index 000000000000..3de258d3f6ec --- /dev/null +++ b/changelog/unreleased/kong/pdk-telemetry-log.yml @@ -0,0 +1,5 @@ +message: | + Added a new PDK module `kong.telemetry` and function: `kong.telemetry.log` + to generate log entries to be reported via the OpenTelemetry plugin. +type: feature +scope: PDK diff --git a/kong-3.8.0-0.rockspec b/kong-3.8.0-0.rockspec index ce680566797a..d2470f407ced 100644 --- a/kong-3.8.0-0.rockspec +++ b/kong-3.8.0-0.rockspec @@ -338,6 +338,7 @@ build = { ["kong.pdk.vault"] = "kong/pdk/vault.lua", ["kong.pdk.tracing"] = "kong/pdk/tracing.lua", ["kong.pdk.plugin"] = "kong/pdk/plugin.lua", + ["kong.pdk.telemetry"] = "kong/pdk/telemetry.lua", ["kong.plugins.basic-auth.migrations"] = "kong/plugins/basic-auth/migrations/init.lua", ["kong.plugins.basic-auth.migrations.000_base_basic_auth"] = "kong/plugins/basic-auth/migrations/000_base_basic_auth.lua", diff --git a/kong/globalpatches.lua b/kong/globalpatches.lua index 2d69fbb973a1..397c4fc7c4e8 100644 --- a/kong/globalpatches.lua +++ b/kong/globalpatches.lua @@ -608,7 +608,7 @@ return function(options) -- 4: patched function -- 5: caller hook_called = true - dynamic_hook.run_hook("observability_logs", "push", 5, ...) + dynamic_hook.run_hook("observability_logs", "push", 5, nil, ...) hook_called = false return old_ngx_log(...) end diff --git a/kong/observability/logs.lua b/kong/observability/logs.lua index 145493ecf08d..1fd99d029c40 100644 --- a/kong/observability/logs.lua +++ b/kong/observability/logs.lua @@ -11,6 +11,7 @@ end local request_id_get = require "kong.observability.tracing.request_id".get local time_ns = require "kong.tools.time".time_ns +local table_merge = require "kong.tools.table".table_merge local deep_copy = require "kong.tools.utils".deep_copy local get_log_level = require "resty.kong.log".get_log_level @@ -77,7 +78,7 @@ local function concat_tostring(tab) end -local function generate_log_entry(request_scoped, log_level, log_str, request_id, debug_info) +local function generate_log_entry(request_scoped, inj_attributes, log_level, log_str, request_id, debug_info) local span_id @@ -97,6 +98,9 @@ local function generate_log_entry(request_scoped, log_level, log_str, request_id ["introspection.source"] = debug_info.source, ["introspection.what"] = debug_info.what, } + if inj_attributes then + attributes = table_merge(attributes, inj_attributes) + end local now_ns = time_ns() return { @@ -120,13 +124,13 @@ local function get_request_log_buffer() end -function _M.maybe_push(stack_level, log_level, ...) +function _M.maybe_push(stack_level, attributes, log_level, ...) -- WARNING: do not yield in this function, as it is called from ngx.log -- Early return cases: -- log level too low - if configured_log_level() < log_level then + if log_level and configured_log_level() < log_level then return end @@ -152,16 +156,19 @@ function _M.maybe_push(stack_level, log_level, ...) return end - -- no (or empty) log line local args = table_pack(...) local log_str = concat_tostring(args) - if log_str == "" then - return - end -- generate & push log entry local debug_info = debug.getinfo(stack_level, "nSl") - local log_entry = generate_log_entry(request_scoped, log_level, log_str, request_id, debug_info) + local log_entry = generate_log_entry( + request_scoped, + attributes, + log_level, + log_str, + request_id, + debug_info + ) table.insert(log_buffer, log_entry) end diff --git a/kong/pdk/init.lua b/kong/pdk/init.lua index 858795a368e1..13974dd3bb1d 100644 --- a/kong/pdk/init.lua +++ b/kong/pdk/init.lua @@ -208,6 +208,7 @@ local MAJOR_MODULES = { "vault", "tracing", "plugin", + "telemetry", } if ngx.config.subsystem == 'http' then diff --git a/kong/pdk/log.lua b/kong/pdk/log.lua index 61d4d99c9c51..990ea202471d 100644 --- a/kong/pdk/log.lua +++ b/kong/pdk/log.lua @@ -315,7 +315,7 @@ local function gen_log_func(lvl_const, imm_buf, to_string, stack_level, sep) -- 1: maybe_push -- 2: dynamic_hook.pcall -- 3: dynamic_hook.run_hook - dynamic_hook.run_hook("observability_logs", "push", stack_level + 3, lvl_const, ...) + dynamic_hook.run_hook("observability_logs", "push", stack_level + 3, nil, lvl_const, ...) local n = select("#", ...) diff --git a/kong/pdk/telemetry.lua b/kong/pdk/telemetry.lua new file mode 100644 index 000000000000..e47d8ef5ea6a --- /dev/null +++ b/kong/pdk/telemetry.lua @@ -0,0 +1,91 @@ +--- +-- The telemetry module provides capabilities for telemetry operations. +-- +-- @module kong.telemetry.log + + +local dynamic_hook = require("kong.dynamic_hook") + +local dyn_hook_run_hook = dynamic_hook.run_hook +local dyn_hook_is_group_enabled = dynamic_hook.is_group_enabled + +local function new() + local telemetry = {} + + + --- + -- Records a structured log entry, to be reported via the OpenTelemetry plugin. + -- + -- This function has a dependency on the OpenTelemetry plugin, which must be + -- configured to report OpenTelemetry logs. + -- + -- @function kong.telemetry.log + -- @phases `rewrite`, `access`, `balancer`, `timer`, `header_filter`, + -- `response`, `body_filter`, `log` + -- @tparam string plugin_name the name of the plugin + -- @tparam table plugin_config the plugin configuration + -- @tparam string message_type the type of the log message, useful to categorize + -- the log entry + -- @tparam string message the log message + -- @tparam table attributes structured information to be included in the + -- `attributes` field of the log entry + -- @usage + -- local attributes = { + -- http_method = kong.request.get_method() + -- ["node.id"] = kong.node.get_id(), + -- hostname = kong.node.get_hostname(), + -- } + -- + -- local ok, err = kong.telemetry.log("my_plugin", conf, "result", "successful operation", attributes) + telemetry.log = function(plugin_name, plugin_config, message_type, message, attributes) + if type(plugin_name) ~= "string" then + return nil, "plugin_name must be a string" + end + + if type(plugin_config) ~= "table" then + return nil, "plugin_config must be a table" + end + + if type(message_type) ~= "string" then + return nil, "message_type must be a string" + end + + if message and type(message) ~= "string" then + return nil, "message must be a string" + end + + if attributes and type(attributes) ~= "table" then + return nil, "attributes must be a table" + end + + local hook_group = "observability_logs" + if not dyn_hook_is_group_enabled(hook_group) then + return nil, "Telemetry logging is disabled: log entry will not be recorded. " .. + "Ensure the OpenTelemetry plugin is correctly configured to " .. + "report logs in order to use this feature." + end + + attributes = attributes or {} + attributes["message.type"] = message_type + attributes["plugin.name"] = plugin_name + attributes["plugin.id"] = plugin_config.__plugin_id + attributes["plugin.instance.name"] = plugin_config.plugin_instance_name + + -- stack level = 5: + -- 1: maybe_push + -- 2: dynamic_hook.pcall + -- 3: dynamic_hook.run_hook + -- 4: kong.telemetry.log + -- 5: caller + dyn_hook_run_hook(hook_group, "push", 5, attributes, nil, message) + return true + end + + + return telemetry +end + + +return { + new = new, +} diff --git a/spec/01-unit/26-observability/05-logs_spec.lua b/spec/01-unit/26-observability/05-logs_spec.lua index fa79a1af1aba..7683d71771f8 100644 --- a/spec/01-unit/26-observability/05-logs_spec.lua +++ b/spec/01-unit/26-observability/05-logs_spec.lua @@ -39,35 +39,39 @@ describe("Observability/Logs unit tests", function() _G.kong = old_kong end) - it("has no effect when no log line is provided", function() - maybe_push(1, ngx.INFO) + it("has no effect when log level is lower than the configured value", function() + maybe_push(1, nil, ngx.DEBUG, "Don't mind me, I'm just a debug log") local worker_logs = get_worker_logs() assert.same({}, worker_logs) local request_logs = get_request_logs() assert.same({}, request_logs) end) - it("has no effect when log line is empty", function() - maybe_push(1, ngx.INFO, "") - local worker_logs = get_worker_logs() - assert.same({}, worker_logs) - local request_logs = get_request_logs() - assert.same({}, request_logs) - end) + it("considers log message as optional", function() + local log_level = ngx.INFO - it("has no effect when log level is lower than the configured value", function() - maybe_push(1, ngx.DEBUG, "Don't mind me, I'm just a debug log") + maybe_push(1, nil, log_level) local worker_logs = get_worker_logs() - assert.same({}, worker_logs) - local request_logs = get_request_logs() - assert.same({}, request_logs) + assert.equals(1, #worker_logs) + + local logged_entry = worker_logs[1] + assert.same(log_level, logged_entry.log_level) + assert.equals("", logged_entry.body) + assert.is_table(logged_entry.attributes) + assert.is_number(logged_entry.observed_time_unix_nano) + assert.is_number(logged_entry.time_unix_nano) + assert.is_number(logged_entry.attributes["introspection.current.line"]) + assert.is_string(logged_entry.attributes["introspection.name"]) + assert.is_string(logged_entry.attributes["introspection.namewhat"]) + assert.is_string(logged_entry.attributes["introspection.source"]) + assert.is_string(logged_entry.attributes["introspection.what"]) end) it("generates worker-scoped log entries", function() local log_level = ngx.WARN local body = "Careful! I'm a warning!" - maybe_push(1, log_level, body, true, 123, ngx.null, nil, function()end, { foo = "bar" }) + maybe_push(1, { foo = "bar", tst = "baz" }, log_level, body, true, 123, ngx.null, nil, function()end, { foo = "bar" }) local worker_logs = get_worker_logs() assert.equals(1, #worker_logs) @@ -80,6 +84,8 @@ describe("Observability/Logs unit tests", function() assert.is_string(logged_entry.attributes["introspection.namewhat"]) assert.is_string(logged_entry.attributes["introspection.source"]) assert.is_string(logged_entry.attributes["introspection.what"]) + assert.equals("bar", logged_entry.attributes.foo) + assert.equals("baz", logged_entry.attributes.tst) assert.is_number(logged_entry.observed_time_unix_nano) assert.is_number(logged_entry.time_unix_nano) end) diff --git a/spec/01-unit/26-observability/06-telemetry-pdk_spec.lua b/spec/01-unit/26-observability/06-telemetry-pdk_spec.lua new file mode 100644 index 000000000000..14139810770f --- /dev/null +++ b/spec/01-unit/26-observability/06-telemetry-pdk_spec.lua @@ -0,0 +1,38 @@ +require "kong.tools.utils" + + +describe("Telemetry PDK unit tests", function() + describe("log()", function() + local old_kong = _G.kong + + lazy_setup(function() + local kong_global = require "kong.global" + _G.kong = kong_global.new() + kong_global.init_pdk(kong) + end) + + lazy_teardown(function() + _G.kong = old_kong + end) + + it("fails as expected with invalid input", function() + local ok, err = kong.telemetry.log() + assert.is_nil(ok) + assert.equals("plugin_name must be a string", err) + + ok, err = kong.telemetry.log("plugin_name") + assert.is_nil(ok) + assert.equals("plugin_config must be a table", err) + + ok, err = kong.telemetry.log("plugin_name", {}) + assert.is_nil(ok) + assert.equals("message_type must be a string", err) + end) + + it ("considers attributes and message as optional", function() + local ok, err = kong.telemetry.log("plugin_name", {}, "message_type") + assert.is_nil(ok) + assert.matches("Telemetry logging is disabled", err) + end) + end) +end) diff --git a/spec/02-integration/14-observability/06-telemetry-pdk_spec.lua b/spec/02-integration/14-observability/06-telemetry-pdk_spec.lua new file mode 100644 index 000000000000..c59ba5a6b290 --- /dev/null +++ b/spec/02-integration/14-observability/06-telemetry-pdk_spec.lua @@ -0,0 +1,208 @@ +local helpers = require "spec.helpers" +local pb = require "pb" + +local HTTP_SERVER_PORT_LOGS = helpers.get_available_port() + + +for _, strategy in helpers.each_strategy() do + describe("kong.pdk.telemetry #" .. strategy, function() + local bp + local plugin_instance_name = "my-pdk-logger-instance" + + describe("log", function() + describe("with OpenTelemetry", function() + local mock_logs + + lazy_setup(function() + bp, _ = assert(helpers.get_db_utils(strategy, { + "services", + "routes", + "plugins", + }, { "opentelemetry", "pdk-logger" })) + + local http_srv = assert(bp.services:insert { + name = "mock-service", + host = helpers.mock_upstream_host, + port = helpers.mock_upstream_port, + }) + + local logs_route = assert(bp.routes:insert({ + service = http_srv, + protocols = { "http" }, + paths = { "/logs" } + })) + + assert(bp.plugins:insert({ + name = "opentelemetry", + route = logs_route, + config = { + logs_endpoint = "http://127.0.0.1:" .. HTTP_SERVER_PORT_LOGS, + queue = { + max_batch_size = 1000, + max_coalescing_delay = 2, + }, + } + })) + + assert(bp.plugins:insert({ + name = "pdk-logger", + route = logs_route, + config = {}, + instance_name = plugin_instance_name, + })) + + assert(helpers.start_kong({ + database = strategy, + nginx_conf = "spec/fixtures/custom_nginx.template", + plugins = "opentelemetry,pdk-logger", + })) + + mock_logs = helpers.http_mock(HTTP_SERVER_PORT_LOGS, { timeout = 1 }) + end) + + lazy_teardown(function() + helpers.stop_kong() + if mock_logs then + mock_logs("close", true) + end + end) + + local function assert_find_valid_logs(body, request_id) + local decoded = assert(pb.decode("opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest", body)) + assert.not_nil(decoded) + + local scope_logs = decoded.resource_logs[1].scope_logs + assert.is_true(#scope_logs > 0, scope_logs) + + local found = 0 + for _, scope_log in ipairs(scope_logs) do + local log_records = scope_log.log_records + for _, log_record in ipairs(log_records) do + -- from the pdk-logger plugin: + local plugin_name = "pdk-logger" + local attributes = { + some_key = "some_value", + some_other_key = "some_other_value" + } + local expected_messages_attributes = { + access_phase = { message = "hello, access phase", attributes = attributes}, + header_filter_phase = { message = "hello, header_filter phase", attributes = {}}, + log_phase = { message = "", attributes = attributes}, + log_phase_2 = { message = "", attributes = {}}, + } + + assert.is_table(log_record.attributes) + local found_attrs = {} + for _, attr in ipairs(log_record.attributes) do + found_attrs[attr.key] = attr.value[attr.value.value] + end + + local exp_msg_attr = expected_messages_attributes[found_attrs["message.type"]] + + -- filter the right log lines + if exp_msg_attr then + -- ensure the log is from the current request + if found_attrs["request.id"] == request_id then + local logline = log_record.body and log_record.body.string_value + + assert.equals(exp_msg_attr.message, logline) + assert.partial_match(exp_msg_attr.attributes, found_attrs) + + assert.is_string(found_attrs["plugin.id"]) + assert.is_number(found_attrs["introspection.current.line"]) + assert.matches("pdk%-logger/handler%.lua", found_attrs["introspection.source"]) + assert.equals(plugin_name, found_attrs["plugin.name"]) + assert.equals(plugin_instance_name, found_attrs["plugin.instance.name"]) + + assert.is_number(log_record.time_unix_nano) + assert.is_number(log_record.observed_time_unix_nano) + + found = found + 1 + end + end + end + end + assert.equals(4, found) + end + + it("produces and exports valid logs", function() + local headers, body, request_id + + local cli = helpers.proxy_client() + local res = assert(cli:send { + method = "GET", + path = "/logs", + }) + assert.res_status(200, res) + cli:close() + + request_id = res.headers["X-Kong-Request-Id"] + + helpers.wait_until(function() + local lines + lines, body, headers = mock_logs() + + return lines + end, 10) + + assert.is_string(body) + assert.equals(headers["Content-Type"], "application/x-protobuf") + + assert_find_valid_logs(body, request_id) + assert.logfile().has.no.line("[error]", true) + end) + end) + + describe("without OpenTelemetry", function() + lazy_setup(function() + bp, _ = assert(helpers.get_db_utils(strategy, { + "services", + "routes", + "plugins", + }, { "pdk-logger" })) + + local http_srv = assert(bp.services:insert { + name = "mock-service", + host = helpers.mock_upstream_host, + port = helpers.mock_upstream_port, + }) + + local logs_route = assert(bp.routes:insert({ + service = http_srv, + protocols = { "http" }, + paths = { "/logs" } + })) + + assert(bp.plugins:insert({ + name = "pdk-logger", + route = logs_route, + config = {}, + instance_name = plugin_instance_name, + })) + + assert(helpers.start_kong({ + database = strategy, + nginx_conf = "spec/fixtures/custom_nginx.template", + plugins = "pdk-logger", + })) + end) + + lazy_teardown(function() + helpers.stop_kong() + end) + + it("handles errors correctly", function() + local cli = helpers.proxy_client() + local res = assert(cli:send { + method = "GET", + path = "/logs", + }) + assert.res_status(200, res) + cli:close() + + assert.logfile().has.line("Telemetry logging is disabled", true, 10) + end) + end) + end) + end) +end diff --git a/spec/fixtures/custom_plugins/kong/plugins/pdk-logger/handler.lua b/spec/fixtures/custom_plugins/kong/plugins/pdk-logger/handler.lua new file mode 100644 index 000000000000..7a5d4d4bc4b0 --- /dev/null +++ b/spec/fixtures/custom_plugins/kong/plugins/pdk-logger/handler.lua @@ -0,0 +1,49 @@ +local PDKLoggerHandler = { + VERSION = "0.1-t", + PRIORITY = 1000, +} + +local plugin_name = "pdk-logger" +local attributes = { some_key = "some_value", some_other_key = "some_other_value"} + + +function PDKLoggerHandler:access(conf) + local message_type = "access_phase" + local message = "hello, access phase" + -- pass both optional arguments (message and attributes) + local ok, err = kong.telemetry.log(plugin_name, conf, message_type, message, attributes) + if not ok then + kong.log.err(err) + end +end + + +function PDKLoggerHandler:header_filter(conf) + local message_type = "header_filter_phase" + local message = "hello, header_filter phase" + -- no attributes + local ok, err = kong.telemetry.log(plugin_name, conf, message_type, message, nil) + if not ok then + kong.log.err(err) + end +end + + +function PDKLoggerHandler:log(conf) + local message_type = "log_phase" + -- no message + local ok, err = kong.telemetry.log(plugin_name, conf, message_type, nil, attributes) + if not ok then + kong.log.err(err) + end + + message_type = "log_phase_2" + -- no attributes and no message + ok, err = kong.telemetry.log(plugin_name, conf, message_type, nil, nil) + if not ok then + kong.log.err(err) + end +end + + +return PDKLoggerHandler diff --git a/spec/fixtures/custom_plugins/kong/plugins/pdk-logger/schema.lua b/spec/fixtures/custom_plugins/kong/plugins/pdk-logger/schema.lua new file mode 100644 index 000000000000..cbdafd0a09e7 --- /dev/null +++ b/spec/fixtures/custom_plugins/kong/plugins/pdk-logger/schema.lua @@ -0,0 +1,18 @@ +local typedefs = require "kong.db.schema.typedefs" + + +return { + name = "pdk-logger", + fields = { + { + protocols = typedefs.protocols { default = { "http", "https", "tcp", "tls", "grpc", "grpcs" } }, + }, + { + config = { + type = "record", + fields = { + }, + }, + }, + }, +}