diff --git a/spec/01-unit/26-observability/05-logs_spec.lua b/spec/01-unit/26-observability/05-logs_spec.lua new file mode 100644 index 0000000000000..38fcb562df6b6 --- /dev/null +++ b/spec/01-unit/26-observability/05-logs_spec.lua @@ -0,0 +1,97 @@ +-- This software is copyright Kong Inc. and its licensors. +-- Use of the software is subject to the agreement between your organization +-- and Kong Inc. If there is no such agreement, use is governed by and +-- subject to the terms of the Kong Master Software License Agreement found +-- at https://konghq.com/enterprisesoftwarelicense/. +-- [ END OF LICENSE 0867164ffc95e54f04670b5169c09574bdbd9bba ] + +require "kong.tools.utils" + + +describe("Observability/Logs unit tests", function() + describe("maybe_push()", function() + local o11y_logs, maybe_push, get_request_logs, get_worker_logs + local old_ngx, old_kong + + lazy_setup(function() + old_ngx = _G.ngx + old_kong = _G.kong + + _G.ngx = { + config = { subsystem = "http" }, + ctx = {}, + DEBUG = ngx.DEBUG, + INFO = ngx.INFO, + WARN = ngx.WARN, + } + + _G.kong = { + configuration = { + log_level = "info", + }, + } + + o11y_logs = require "kong.observability.logs" + maybe_push = o11y_logs.maybe_push + get_request_logs = o11y_logs.get_request_logs + get_worker_logs = o11y_logs.get_worker_logs + end) + + before_each(function() + _G.ngx.ctx = {} + end) + + lazy_teardown(function() + _G.ngx = old_ngx + _G.kong = old_kong + end) + + it("has no effect when no log line is provided", function() + maybe_push(ngx.INFO) + local worker_logs = get_worker_logs() + assert.same({}, worker_logs) + local request_logs = get_request_logs() + assert.same({}, request_logs) + end) + + it("has no effect when log line is empty", function() + maybe_push(ngx.INFO, "") + local worker_logs = get_worker_logs() + assert.same({}, worker_logs) + local request_logs = get_request_logs() + assert.same({}, request_logs) + end) + + it("has no effect when log level is lower than the configured value", function() + maybe_push(ngx.DEBUG, "Don't mind me, I'm just a debug log") + local worker_logs = get_worker_logs() + assert.same({}, worker_logs) + local request_logs = get_request_logs() + assert.same({}, request_logs) + end) + + it("generates request-scoped log entries", function() + local log_level = ngx.WARN + local body = "Careful! I'm a warning!" + + maybe_push(log_level, body) + local worker_logs = get_worker_logs() + assert.same({}, worker_logs) + + local request_logs = get_request_logs() + assert.same(1, #request_logs) + + local logged_entry = request_logs[1] + assert.same(log_level, logged_entry.log_level) + assert.same(body, logged_entry.body) + assert.is_table(logged_entry.attributes) + assert.is_number(logged_entry.attributes.introspection_current_line) + assert.is_string(logged_entry.attributes.introspection_name) + assert.is_string(logged_entry.attributes.introspection_namewhat) + assert.is_string(logged_entry.attributes.introspection_source) + assert.is_string(logged_entry.attributes.introspection_what) + assert.is_number(logged_entry.observed_time_unix_nano) + assert.is_number(logged_entry.time_unix_nano) + end) + end) +end) diff --git a/spec/03-plugins/37-opentelemetry/01-otlp_spec.lua b/spec/03-plugins/37-opentelemetry/01-otlp_spec.lua index db243147b74aa..07add4f743ccf 100644 --- a/spec/03-plugins/37-opentelemetry/01-otlp_spec.lua +++ b/spec/03-plugins/37-opentelemetry/01-otlp_spec.lua @@ -43,6 +43,14 @@ local pb_decode_span = function(data) return pb.decode("opentelemetry.proto.trace.v1.Span", data) end +local pb_encode_log = function(data) + return pb.encode("opentelemetry.proto.logs.v1.LogRecord", data) +end + +local pb_decode_log = function(data) + return pb.decode("opentelemetry.proto.logs.v1.LogRecord", data) +end + describe("Plugin: opentelemetry (otlp)", function() local old_ngx_get_phase @@ -66,7 +74,7 @@ describe("Plugin: opentelemetry (otlp)", function() ngx.ctx.KONG_SPANS = nil end) - it("encode/decode pb", function () + it("encode/decode pb (traces)", function () local N = 10000 local test_spans = { @@ -125,6 +133,41 @@ describe("Plugin: opentelemetry (otlp)", function() end end) + it("encode/decode pb (logs)", function () + local N = 10000 + + local test_logs = {} + + for _ = 1, N do + local now_ns = time_ns() + + local log = { + time_unix_nano = now_ns, + observed_time_unix_nano = now_ns, + log_level = ngx.INFO, + span_id = rand_bytes(8), + body = "log line", + attributes = { + foo = "bar", + test = true, + version = 0.1, + }, + } + insert(test_logs, log) + end + + local trace_id = rand_bytes(16) + local flags = tonumber(rand_bytes(1)) + local prepared_logs = otlp.prepare_logs(test_logs, trace_id, flags) + + for _, prepared_log in ipairs(prepared_logs) do + local decoded_log = pb_decode_log(pb_encode_log(prepared_log)) + + local ok, err = table_compare(prepared_log, decoded_log) + assert.is_true(ok, err) + end + end) + it("check lengths of trace_id and span_id ", function () local TRACE_ID_LEN, PARENT_SPAN_ID_LEN = 16, 8 local default_span = { diff --git a/spec/03-plugins/37-opentelemetry/04-exporter_spec.lua b/spec/03-plugins/37-opentelemetry/04-exporter_spec.lua index 839ec5f703126..99701e32524f4 100644 --- a/spec/03-plugins/37-opentelemetry/04-exporter_spec.lua +++ b/spec/03-plugins/37-opentelemetry/04-exporter_spec.lua @@ -26,7 +26,8 @@ local function sort_by_key(tbl) end) end -local HTTP_SERVER_PORT = helpers.get_available_port() +local HTTP_SERVER_PORT_TRACES = helpers.get_available_port() +local HTTP_SERVER_PORT_LOGS = helpers.get_available_port() local PROXY_PORT = 9000 for _, strategy in helpers.each_strategy() do @@ -63,6 +64,10 @@ for _, strategy in helpers.each_strategy() do protocols = { "http" }, paths = { "/" }})) + local logs_route = assert(bp.routes:insert({ service = http_srv, + protocols = { "http" }, + paths = { "/logs" }})) + assert(bp.routes:insert({ service = http_srv2, protocols = { "http" }, paths = { "/no_plugin" }})) @@ -72,16 +77,40 @@ for _, strategy in helpers.each_strategy() do route = router_scoped and route, service = service_scoped and http_srv, config = table_merge({ - endpoint = "http://127.0.0.1:" .. HTTP_SERVER_PORT, + traces_endpoint = "http://127.0.0.1:" .. HTTP_SERVER_PORT_TRACES, batch_flush_delay = 0, -- report immediately }, config) })) + assert(bp.plugins:insert({ + name = "opentelemetry", + route = logs_route, + config = table_merge({ + traces_endpoint = "http://127.0.0.1:" .. HTTP_SERVER_PORT_TRACES, + logs_endpoint = "http://127.0.0.1:" .. HTTP_SERVER_PORT_LOGS, + queue = { + max_batch_size = 1000, + max_coalescing_delay = 2, + }, + }, config) + })) + + assert(bp.plugins:insert({ + name = "post-function", + route = logs_route, + config = { + access = {[[ + kong.log.info("this is a log from kong.log") + ngx.log(ngx.INFO, "this is a log from ngx.log") + ]]}, + }, + })) + if another_global then assert(bp.plugins:insert({ name = "opentelemetry", config = table_merge({ - endpoint = "http://127.0.0.1:" .. HTTP_SERVER_PORT, + traces_endpoint = "http://127.0.0.1:" .. HTTP_SERVER_PORT_TRACES, batch_flush_delay = 0, -- report immediately }, config) })) @@ -91,14 +120,14 @@ for _, strategy in helpers.each_strategy() do proxy_listen = "0.0.0.0:" .. PROXY_PORT, database = strategy, nginx_conf = "spec/fixtures/custom_nginx.template", - plugins = "opentelemetry", + plugins = "opentelemetry,post-function", tracing_instrumentations = types, tracing_sampling_rate = global_sampling_rate or 1, }, nil, nil, fixtures)) end describe("valid #http request", function () - local mock + local mock_traces, mock_logs lazy_setup(function() bp, _ = assert(helpers.get_db_utils(strategy, { "services", @@ -111,17 +140,21 @@ for _, strategy in helpers.each_strategy() do ["X-Access-Token"] = "token", }, }) - mock = helpers.http_mock(HTTP_SERVER_PORT, { timeout = HTTP_MOCK_TIMEOUT }) + mock_traces = helpers.http_mock(HTTP_SERVER_PORT_TRACES, { timeout = HTTP_MOCK_TIMEOUT }) + mock_logs = helpers.http_mock(HTTP_SERVER_PORT_LOGS, { timeout = HTTP_MOCK_TIMEOUT }) end) lazy_teardown(function() helpers.stop_kong() - if mock then - mock("close", true) + if mock_traces then + mock_traces("close", true) + end + if mock_logs then + mock_logs("close", true) end end) - it("works", function () + it("exports valid traces", function () local headers, body helpers.wait_until(function() local cli = helpers.proxy_client(7000, PROXY_PORT) @@ -134,7 +167,7 @@ for _, strategy in helpers.each_strategy() do cli:close() local lines - lines, body, headers = mock() + lines, body, headers = mock_traces() return lines end, 10) @@ -162,6 +195,89 @@ for _, strategy in helpers.each_strategy() do local scope_spans = decoded.resource_spans[1].scope_spans assert.is_true(#scope_spans > 0, scope_spans) end) + + it("exports valid logs", function () + local trace_id = gen_trace_id() + + local headers, body, request_id + + local cli = helpers.proxy_client(7000, PROXY_PORT) + local res = assert(cli:send { + method = "GET", + path = "/logs", + headers = { + traceparent = fmt("00-%s-0123456789abcdef-01", trace_id), + }, + }) + assert.res_status(200, res) + cli:close() + + request_id = res.headers["X-Kong-Request-Id"] + + helpers.wait_until(function() + local lines + lines, body, headers = mock_logs() + + return lines + end, 10) + + assert.is_string(body) + assert.equals(headers["Content-Type"], "application/x-protobuf") + + local decoded = assert(pb.decode("opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest", body)) + assert.not_nil(decoded) + + -- array is unstable + local res_attr = decoded.resource_logs[1].resource.attributes + sort_by_key(res_attr) + -- default resource attributes + assert.same("service.instance.id", res_attr[1].key) + assert.same("service.name", res_attr[2].key) + assert.same({string_value = "kong", value = "string_value"}, res_attr[2].value) + assert.same("service.version", res_attr[3].key) + assert.same({string_value = kong.version, value = "string_value"}, res_attr[3].value) + + local scope_logs = decoded.resource_logs[1].scope_logs + assert.is_true(#scope_logs > 0, scope_logs) + + local found = 0 + for _, scope_log in ipairs(scope_logs) do + local log_records = scope_log.log_records + for _, log_record in ipairs(log_records) do + local logline = log_record.body.string_value + + -- filter the right log lines + if string.find(logline, "this is a log") then + assert(logline:sub(-7) == "ngx.log" or logline:sub(-8) == "kong.log", logline) + + assert.is_table(log_record.attributes) + local found_attrs = {} + for _, attr in ipairs(log_record.attributes) do + found_attrs[attr.key] = attr.value[attr.value.value] + end + + -- ensure the log is from the current request + if found_attrs.request_id == request_id then + assert.is_number(found_attrs.introspection_current_line) + assert.is_number(log_record.time_unix_nano) + assert.is_number(log_record.observed_time_unix_nano) + assert.is_number(log_record.flags) + assert.is_string(found_attrs.introspection_source) + assert.is_string(log_record.span_id) + assert.equals(log_record.severity_number, 9) + assert.equals(log_record.severity_text, "INFO") + assert.equals(trace_id, to_hex(log_record.trace_id)) + + found = found + 1 + if found == 2 then + break + end + end + end + end + end + assert.equals(2, found) + end) end) -- this test is not meant to check that the sampling rate is applied @@ -187,7 +303,7 @@ for _, strategy in helpers.each_strategy() do setup_instrumentations("all", { sampling_rate = sampling_rate, }, nil, nil, nil, nil, global_sampling_rate) - mock = helpers.http_mock(HTTP_SERVER_PORT, { timeout = HTTP_MOCK_TIMEOUT }) + mock = helpers.http_mock(HTTP_SERVER_PORT_TRACES, { timeout = HTTP_MOCK_TIMEOUT }) end) lazy_teardown(function() @@ -268,7 +384,7 @@ for _, strategy in helpers.each_strategy() do ["X-Access-Token"] = "token", }, }, nil, case[1], case[2], case[3]) - mock = helpers.http_mock(HTTP_SERVER_PORT, { timeout = HTTP_MOCK_TIMEOUT }) + mock = helpers.http_mock(HTTP_SERVER_PORT_TRACES, { timeout = HTTP_MOCK_TIMEOUT }) end) lazy_teardown(function() @@ -316,7 +432,7 @@ for _, strategy in helpers.each_strategy() do ["os.version"] = "debian", } }) - mock = helpers.http_mock(HTTP_SERVER_PORT, { timeout = HTTP_MOCK_TIMEOUT }) + mock = helpers.http_mock(HTTP_SERVER_PORT_TRACES, { timeout = HTTP_MOCK_TIMEOUT }) end) lazy_teardown(function() @@ -385,7 +501,7 @@ for _, strategy in helpers.each_strategy() do fixtures.http_mock.my_server_block = [[ server { server_name myserver; - listen ]] .. HTTP_SERVER_PORT .. [[; + listen ]] .. HTTP_SERVER_PORT_TRACES .. [[; client_body_buffer_size 1024k; location / { @@ -476,7 +592,7 @@ for _, strategy in helpers.each_strategy() do }, { "opentelemetry" })) setup_instrumentations("request") - mock = helpers.http_mock(HTTP_SERVER_PORT, { timeout = HTTP_MOCK_TIMEOUT }) + mock = helpers.http_mock(HTTP_SERVER_PORT_TRACES, { timeout = HTTP_MOCK_TIMEOUT }) end) lazy_teardown(function() @@ -544,7 +660,7 @@ for _, strategy in helpers.each_strategy() do describe("#referenceable fields", function () local mock lazy_setup(function() - helpers.setenv("TEST_OTEL_ENDPOINT", "http://127.0.0.1:" .. HTTP_SERVER_PORT) + helpers.setenv("TEST_OTEL_ENDPOINT", "http://127.0.0.1:" .. HTTP_SERVER_PORT_TRACES) helpers.setenv("TEST_OTEL_ACCESS_KEY", "secret-1") helpers.setenv("TEST_OTEL_ACCESS_SECRET", "secret-2") @@ -561,7 +677,7 @@ for _, strategy in helpers.each_strategy() do ["X-Access-Secret"] = "{vault://env/test_otel_access_secret}", }, }) - mock = helpers.http_mock(HTTP_SERVER_PORT, { timeout = HTTP_MOCK_TIMEOUT }) + mock = helpers.http_mock(HTTP_SERVER_PORT_TRACES, { timeout = HTTP_MOCK_TIMEOUT }) end) lazy_teardown(function() diff --git a/spec/03-plugins/37-opentelemetry/05-otelcol_spec.lua b/spec/03-plugins/37-opentelemetry/05-otelcol_spec.lua index 6e15019aba251..41e36f8a180bb 100644 --- a/spec/03-plugins/37-opentelemetry/05-otelcol_spec.lua +++ b/spec/03-plugins/37-opentelemetry/05-otelcol_spec.lua @@ -3,7 +3,7 @@ local helpers = require "spec.helpers" local kong_table = require "kong.tools.table" local ngx_re = require "ngx.re" local http = require "resty.http" - +local cjson = require "cjson.safe" local fmt = string.format local table_merge = kong_table.table_merge @@ -32,9 +32,13 @@ for _, strategy in helpers.each_strategy() do port = helpers.mock_upstream_port, }) - bp.routes:insert({ service = http_srv, - protocols = { "http" }, - paths = { "/" }}) + local traces_route = bp.routes:insert({ service = http_srv, + protocols = { "http" }, + paths = { "/traces" }}) + + local logs_route = bp.routes:insert({ service = http_srv, + protocols = { "http" }, + paths = { "/logs" }}) local route_traceid = bp.routes:insert({ service = http_srv, protocols = { "http" }, @@ -42,12 +46,35 @@ for _, strategy in helpers.each_strategy() do bp.plugins:insert({ name = "opentelemetry", + route = { id = traces_route.id }, config = table_merge({ traces_endpoint = fmt("http://%s:%s/v1/traces", OTELCOL_HOST, OTELCOL_HTTP_PORT), batch_flush_delay = 0, -- report immediately }, config) }) + bp.plugins:insert({ + name = "opentelemetry", + route = { id = logs_route.id }, + config = table_merge({ + logs_endpoint = fmt("http://%s:%s/v1/logs", OTELCOL_HOST, OTELCOL_HTTP_PORT), + queue = { + max_batch_size = 1000, + max_coalescing_delay = 2, + }, + }, config) + }) + + bp.plugins:insert({ + name = "post-function", + route = logs_route, + config = { + access = {[[ + ngx.log(ngx.WARN, "this is a log") + ]]}, + }, + }) + bp.plugins:insert({ name = "opentelemetry", route = { id = route_traceid.id }, @@ -61,7 +88,8 @@ for _, strategy in helpers.each_strategy() do assert(helpers.start_kong { database = strategy, nginx_conf = "spec/fixtures/custom_nginx.template", - plugins = "opentelemetry", + plugins = "opentelemetry, post-function", + log_level = "warn", tracing_instrumentations = types, tracing_sampling_rate = 1, }) @@ -88,7 +116,7 @@ for _, strategy in helpers.each_strategy() do it("send traces", function() local httpc = http.new() for i = 1, LIMIT do - local res, err = httpc:request_uri(proxy_url) + local res, err = httpc:request_uri(proxy_url .. "/traces") assert.is_nil(err) assert.same(200, res.status) end @@ -125,7 +153,7 @@ for _, strategy in helpers.each_strategy() do assert(helpers.restart_kong { database = strategy, nginx_conf = "spec/fixtures/custom_nginx.template", - plugins = "opentelemetry", + plugins = "opentelemetry, post-function", tracing_instrumentations = "all", tracing_sampling_rate = 0.00005, }) @@ -147,8 +175,82 @@ for _, strategy in helpers.each_strategy() do end httpc:close() end) - end) + describe("otelcol receives logs #http", function() + local REQUESTS = 100 + + lazy_setup(function() + -- clear file + local shell = require "resty.shell" + shell.run("mkdir -p $(dirname " .. OTELCOL_FILE_EXPORTER_PATH .. ")", nil, 0) + shell.run("cat /dev/null > " .. OTELCOL_FILE_EXPORTER_PATH, nil, 0) + setup_instrumentations("all") + end) + + lazy_teardown(function() + helpers.stop_kong() + end) + + it("send valid logs", function() + local httpc = http.new() + for i = 1, REQUESTS do + local res, err = httpc:request_uri(proxy_url .. "/logs") + assert.is_nil(err) + assert.same(200, res.status) + end + httpc:close() + + local parts + helpers.wait_until(function() + local f = assert(io.open(OTELCOL_FILE_EXPORTER_PATH, "rb")) + local raw_content = f:read("*all") + f:close() + + parts = split(raw_content, "\n", "jo") + return #parts > 0 + end, 10) + + local contents = {} + for _, p in ipairs(parts) do + -- after the file is truncated the collector + -- may continue exporting partial json objects + local trimmed = string.match(p, "({.*)") + local decoded = cjson.decode(trimmed) + if decoded then + table.insert(contents, decoded) + end + end + + local count = 0 + for _, content in ipairs(contents) do + if not content.resourceLogs then + goto continue + end + + local scope_logs = content.resourceLogs[1].scopeLogs + assert.is_true(#scope_logs > 0, scope_logs) + + for _, scope_log in ipairs(scope_logs) do + local log_records = scope_log.logRecords + for _, log_record in ipairs(log_records) do + if log_record.body.stringValue == "this is a log" then + count = count + 1 + + assert.not_nil(log_record.observedTimeUnixNano) + assert.not_nil(log_record.timeUnixNano) + assert.equals("SEVERITY_NUMBER_WARN", log_record.severityNumber) + assert.equals("WARN", log_record.severityText) + assert.not_nil(log_record.attributes) + end + end + end + + ::continue:: + end + + assert.equals(REQUESTS, count) + end) + end) end) end diff --git a/spec/fixtures/opentelemetry/otelcol.yaml b/spec/fixtures/opentelemetry/otelcol.yaml index 9cd430d1fc932..a15acde86bfe8 100644 --- a/spec/fixtures/opentelemetry/otelcol.yaml +++ b/spec/fixtures/opentelemetry/otelcol.yaml @@ -26,3 +26,7 @@ service: receivers: [otlp] processors: [batch] exporters: [logging, file] + logs: + receivers: [otlp] + processors: [batch] + exporters: [logging, file]