diff --git a/spec/01-unit/26-observability/05-logs_spec.lua b/spec/01-unit/26-observability/05-logs_spec.lua new file mode 100644 index 000000000000..fa79a1af1aba --- /dev/null +++ b/spec/01-unit/26-observability/05-logs_spec.lua @@ -0,0 +1,87 @@ +require "kong.tools.utils" + + +describe("Observability/Logs unit tests", function() + describe("maybe_push()", function() + local o11y_logs, maybe_push, get_request_logs, get_worker_logs + local old_ngx, old_kong + + lazy_setup(function() + old_ngx = _G.ngx + old_kong = _G.kong + + _G.ngx = { + config = { subsystem = "http" }, + ctx = {}, + DEBUG = ngx.DEBUG, + INFO = ngx.INFO, + WARN = ngx.WARN, + } + + _G.kong = { + configuration = { + log_level = "info", + }, + } + + o11y_logs = require "kong.observability.logs" + maybe_push = o11y_logs.maybe_push + get_request_logs = o11y_logs.get_request_logs + get_worker_logs = o11y_logs.get_worker_logs + end) + + before_each(function() + _G.ngx.ctx = {} + end) + + lazy_teardown(function() + _G.ngx = old_ngx + _G.kong = old_kong + end) + + it("has no effect when no log line is provided", function() + maybe_push(1, ngx.INFO) + local worker_logs = get_worker_logs() + assert.same({}, worker_logs) + local request_logs = get_request_logs() + assert.same({}, request_logs) + end) + + it("has no effect when log line is empty", function() + maybe_push(1, ngx.INFO, "") + local worker_logs = get_worker_logs() + assert.same({}, worker_logs) + local request_logs = get_request_logs() + assert.same({}, request_logs) + end) + + it("has no effect when log level is lower than the configured value", function() + maybe_push(1, ngx.DEBUG, "Don't mind me, I'm just a debug log") + local worker_logs = get_worker_logs() + assert.same({}, worker_logs) + local request_logs = get_request_logs() + assert.same({}, request_logs) + end) + + it("generates worker-scoped log entries", function() + local log_level = ngx.WARN + local body = "Careful! I'm a warning!" + + maybe_push(1, log_level, body, true, 123, ngx.null, nil, function()end, { foo = "bar" }) + local worker_logs = get_worker_logs() + assert.equals(1, #worker_logs) + + local logged_entry = worker_logs[1] + assert.same(log_level, logged_entry.log_level) + assert.matches(body .. "true123nilnilfunction:%s0x%x+table:%s0x%x+", logged_entry.body) + assert.is_table(logged_entry.attributes) + assert.is_number(logged_entry.attributes["introspection.current.line"]) + assert.is_string(logged_entry.attributes["introspection.name"]) + assert.is_string(logged_entry.attributes["introspection.namewhat"]) + assert.is_string(logged_entry.attributes["introspection.source"]) + assert.is_string(logged_entry.attributes["introspection.what"]) + assert.is_number(logged_entry.observed_time_unix_nano) + assert.is_number(logged_entry.time_unix_nano) + end) + end) +end) diff --git a/spec/02-integration/14-tracing/01-instrumentations_spec.lua b/spec/02-integration/14-observability/01-instrumentations_spec.lua similarity index 100% rename from spec/02-integration/14-tracing/01-instrumentations_spec.lua rename to spec/02-integration/14-observability/01-instrumentations_spec.lua diff --git a/spec/02-integration/14-tracing/02-propagation_spec.lua b/spec/02-integration/14-observability/02-propagation_spec.lua similarity index 100% rename from spec/02-integration/14-tracing/02-propagation_spec.lua rename to spec/02-integration/14-observability/02-propagation_spec.lua diff --git a/spec/02-integration/14-tracing/03-tracer-pdk_spec.lua b/spec/02-integration/14-observability/03-tracer-pdk_spec.lua similarity index 100% rename from spec/02-integration/14-tracing/03-tracer-pdk_spec.lua rename to spec/02-integration/14-observability/03-tracer-pdk_spec.lua diff --git a/spec/02-integration/14-tracing/04-trace-ids-log_spec.lua b/spec/02-integration/14-observability/04-trace-ids-log_spec.lua similarity index 100% rename from spec/02-integration/14-tracing/04-trace-ids-log_spec.lua rename to spec/02-integration/14-observability/04-trace-ids-log_spec.lua diff --git a/spec/02-integration/14-observability/05-logs_spec.lua b/spec/02-integration/14-observability/05-logs_spec.lua new file mode 100644 index 000000000000..3334386bb5d2 --- /dev/null +++ b/spec/02-integration/14-observability/05-logs_spec.lua @@ -0,0 +1,86 @@ +local helpers = require "spec.helpers" + +for _, strategy in helpers.each_strategy() do + describe("Observability Logs", function () + describe("ngx.log patch", function() + local proxy_client + local post_function_access = [[ + local threads = {} + local n_threads = 100 + + for i = 1, n_threads do + threads[i] = ngx.thread.spawn(function() + ngx.log(ngx.INFO, "thread_" .. i .. " logged") + end) + end + + for i = 1, n_threads do + ngx.thread.wait(threads[i]) + end + ]] + + lazy_setup(function() + local bp = helpers.get_db_utils(strategy, { + "routes", + "services", + "plugins", + }) + + local http_srv = assert(bp.services:insert { + name = "mock-service", + host = helpers.mock_upstream_host, + port = helpers.mock_upstream_port, + }) + + local logs_route = assert(bp.routes:insert({ service = http_srv, + protocols = { "http" }, + paths = { "/logs" }})) + + assert(bp.plugins:insert({ + name = "post-function", + route = logs_route, + config = { + access = { post_function_access }, + }, + })) + + -- only needed to enable the log collection hook + assert(bp.plugins:insert({ + name = "opentelemetry", + route = logs_route, + config = { + logs_endpoint = "http://" .. helpers.mock_upstream_host .. ":" .. helpers.mock_upstream_port, + } + })) + + helpers.start_kong({ + database = strategy, + nginx_conf = "spec/fixtures/custom_nginx.template", + plugins = "opentelemetry,post-function", + }) + proxy_client = helpers.proxy_client() + end) + + lazy_teardown(function() + if proxy_client then + proxy_client:close() + end + helpers.stop_kong() + end) + + it("does not produce yielding and concurrent executions", function () + local res = assert(proxy_client:send { + method = "GET", + path = "/logs", + }) + assert.res_status(200, res) + + -- plugin produced logs: + assert.logfile().has.line("thread_1 logged", true, 10) + assert.logfile().has.line("thread_100 logged", true, 10) + -- plugin did not produce concurrent accesses to ngx.log: + assert.logfile().has.no.line("[error]", true) + end) + end) + end) +end diff --git a/spec/03-plugins/37-opentelemetry/01-otlp_spec.lua b/spec/03-plugins/37-opentelemetry/01-otlp_spec.lua index db243147b74a..07add4f743cc 100644 --- a/spec/03-plugins/37-opentelemetry/01-otlp_spec.lua +++ b/spec/03-plugins/37-opentelemetry/01-otlp_spec.lua @@ -43,6 +43,14 @@ local pb_decode_span = function(data) return pb.decode("opentelemetry.proto.trace.v1.Span", data) end +local pb_encode_log = function(data) + return pb.encode("opentelemetry.proto.logs.v1.LogRecord", data) +end + +local pb_decode_log = function(data) + return pb.decode("opentelemetry.proto.logs.v1.LogRecord", data) +end + describe("Plugin: opentelemetry (otlp)", function() local old_ngx_get_phase @@ -66,7 +74,7 @@ describe("Plugin: opentelemetry (otlp)", function() ngx.ctx.KONG_SPANS = nil end) - it("encode/decode pb", function () + it("encode/decode pb (traces)", function () local N = 10000 local test_spans = { @@ -125,6 +133,41 @@ describe("Plugin: opentelemetry (otlp)", function() end end) + it("encode/decode pb (logs)", function () + local N = 10000 + + local test_logs = {} + + for _ = 1, N do + local now_ns = time_ns() + + local log = { + time_unix_nano = now_ns, + observed_time_unix_nano = now_ns, + log_level = ngx.INFO, + span_id = rand_bytes(8), + body = "log line", + attributes = { + foo = "bar", + test = true, + version = 0.1, + }, + } + insert(test_logs, log) + end + + local trace_id = rand_bytes(16) + local flags = tonumber(rand_bytes(1)) + local prepared_logs = otlp.prepare_logs(test_logs, trace_id, flags) + + for _, prepared_log in ipairs(prepared_logs) do + local decoded_log = pb_decode_log(pb_encode_log(prepared_log)) + + local ok, err = table_compare(prepared_log, decoded_log) + assert.is_true(ok, err) + end + end) + it("check lengths of trace_id and span_id ", function () local TRACE_ID_LEN, PARENT_SPAN_ID_LEN = 16, 8 local default_span = { diff --git a/spec/03-plugins/37-opentelemetry/04-exporter_spec.lua b/spec/03-plugins/37-opentelemetry/04-exporter_spec.lua index aa65233e123a..b01e717fe6b1 100644 --- a/spec/03-plugins/37-opentelemetry/04-exporter_spec.lua +++ b/spec/03-plugins/37-opentelemetry/04-exporter_spec.lua @@ -26,9 +26,14 @@ local function sort_by_key(tbl) end) end -local HTTP_SERVER_PORT = helpers.get_available_port() +local HTTP_SERVER_PORT_TRACES = helpers.get_available_port() +local HTTP_SERVER_PORT_LOGS = helpers.get_available_port() local PROXY_PORT = 9000 +local post_function_access_body = + [[kong.log.info("this is a log from kong.log"); + ngx.log(ngx.INFO, "this is a log from ngx.log")]] + for _, strategy in helpers.each_strategy() do describe("opentelemetry exporter #" .. strategy, function() local bp @@ -63,6 +68,14 @@ for _, strategy in helpers.each_strategy() do protocols = { "http" }, paths = { "/" }})) + local logs_route = assert(bp.routes:insert({ service = http_srv, + protocols = { "http" }, + paths = { "/logs" }})) + + local logs_traces_route = assert(bp.routes:insert({ service = http_srv, + protocols = { "http" }, + paths = { "/traces_logs" }})) + assert(bp.routes:insert({ service = http_srv2, protocols = { "http" }, paths = { "/no_plugin" }})) @@ -72,16 +85,57 @@ for _, strategy in helpers.each_strategy() do route = router_scoped and route, service = service_scoped and http_srv, config = table_merge({ - endpoint = "http://127.0.0.1:" .. HTTP_SERVER_PORT, + traces_endpoint = "http://127.0.0.1:" .. HTTP_SERVER_PORT_TRACES, batch_flush_delay = 0, -- report immediately }, config) })) + assert(bp.plugins:insert({ + name = "opentelemetry", + route = logs_traces_route, + config = table_merge({ + traces_endpoint = "http://127.0.0.1:" .. HTTP_SERVER_PORT_TRACES, + logs_endpoint = "http://127.0.0.1:" .. HTTP_SERVER_PORT_LOGS, + queue = { + max_batch_size = 1000, + max_coalescing_delay = 2, + }, + }, config) + })) + + assert(bp.plugins:insert({ + name = "opentelemetry", + route = logs_route, + config = table_merge({ + logs_endpoint = "http://127.0.0.1:" .. HTTP_SERVER_PORT_LOGS, + queue = { + max_batch_size = 1000, + max_coalescing_delay = 2, + }, + }, config) + })) + + assert(bp.plugins:insert({ + name = "post-function", + route = logs_traces_route, + config = { + access = { post_function_access_body }, + }, + })) + + assert(bp.plugins:insert({ + name = "post-function", + route = logs_route, + config = { + access = { post_function_access_body }, + }, + })) + if another_global then assert(bp.plugins:insert({ name = "opentelemetry", config = table_merge({ - endpoint = "http://127.0.0.1:" .. HTTP_SERVER_PORT, + traces_endpoint = "http://127.0.0.1:" .. HTTP_SERVER_PORT_TRACES, batch_flush_delay = 0, -- report immediately }, config) })) @@ -91,14 +145,14 @@ for _, strategy in helpers.each_strategy() do proxy_listen = "0.0.0.0:" .. PROXY_PORT, database = strategy, nginx_conf = "spec/fixtures/custom_nginx.template", - plugins = "opentelemetry", + plugins = "opentelemetry,post-function", tracing_instrumentations = types, tracing_sampling_rate = global_sampling_rate or 1, }, nil, nil, fixtures)) end describe("valid #http request", function () - local mock + local mock_traces, mock_logs lazy_setup(function() bp, _ = assert(helpers.get_db_utils(strategy, { "services", @@ -111,17 +165,21 @@ for _, strategy in helpers.each_strategy() do ["X-Access-Token"] = "token", }, }) - mock = helpers.http_mock(HTTP_SERVER_PORT, { timeout = HTTP_MOCK_TIMEOUT }) + mock_traces = helpers.http_mock(HTTP_SERVER_PORT_TRACES, { timeout = HTTP_MOCK_TIMEOUT }) + mock_logs = helpers.http_mock(HTTP_SERVER_PORT_LOGS, { timeout = HTTP_MOCK_TIMEOUT }) end) lazy_teardown(function() helpers.stop_kong() - if mock then - mock("close", true) + if mock_traces then + mock_traces("close", true) + end + if mock_logs then + mock_logs("close", true) end end) - it("works", function () + it("exports valid traces", function () local headers, body helpers.wait_until(function() local cli = helpers.proxy_client(7000, PROXY_PORT) @@ -134,7 +192,7 @@ for _, strategy in helpers.each_strategy() do cli:close() local lines - lines, body, headers = mock() + lines, body, headers = mock_traces() return lines end, 10) @@ -162,6 +220,127 @@ for _, strategy in helpers.each_strategy() do local scope_spans = decoded.resource_spans[1].scope_spans assert.is_true(#scope_spans > 0, scope_spans) end) + + local function assert_find_valid_logs(body, request_id, trace_id) + local decoded = assert(pb.decode("opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest", body)) + assert.not_nil(decoded) + + -- array is unstable + local res_attr = decoded.resource_logs[1].resource.attributes + sort_by_key(res_attr) + -- default resource attributes + assert.same("service.instance.id", res_attr[1].key) + assert.same("service.name", res_attr[2].key) + assert.same({string_value = "kong", value = "string_value"}, res_attr[2].value) + assert.same("service.version", res_attr[3].key) + assert.same({string_value = kong.version, value = "string_value"}, res_attr[3].value) + + local scope_logs = decoded.resource_logs[1].scope_logs + assert.is_true(#scope_logs > 0, scope_logs) + + local found = 0 + for _, scope_log in ipairs(scope_logs) do + local log_records = scope_log.log_records + for _, log_record in ipairs(log_records) do + local logline = log_record.body.string_value + + -- filter the right log lines + if string.find(logline, "this is a log") then + assert(logline:sub(-7) == "ngx.log" or logline:sub(-8) == "kong.log", logline) + + assert.is_table(log_record.attributes) + local found_attrs = {} + for _, attr in ipairs(log_record.attributes) do + found_attrs[attr.key] = attr.value[attr.value.value] + end + + -- ensure the log is from the current request + if found_attrs["request.id"] == request_id then + local expected_line + if logline:sub(-8) == "kong.log" then + expected_line = 1 + else + expected_line = 2 + end + + assert.is_number(log_record.time_unix_nano) + assert.is_number(log_record.observed_time_unix_nano) + assert.equals(post_function_access_body, found_attrs["introspection.source"]) + assert.equals(expected_line, found_attrs["introspection.current.line"]) + assert.equals(log_record.severity_number, 9) + assert.equals(log_record.severity_text, "INFO") + if trace_id then + assert.equals(trace_id, to_hex(log_record.trace_id)) + assert.is_string(log_record.span_id) + assert.is_number(log_record.flags) + end + + found = found + 1 + if found == 2 then + break + end + end + end + end + end + assert.equals(2, found) + end + + it("exports valid logs with tracing", function () + local trace_id = gen_trace_id() + + local headers, body, request_id + + local cli = helpers.proxy_client(7000, PROXY_PORT) + local res = assert(cli:send { + method = "GET", + path = "/traces_logs", + headers = { + traceparent = fmt("00-%s-0123456789abcdef-01", trace_id), + }, + }) + assert.res_status(200, res) + cli:close() + + request_id = res.headers["X-Kong-Request-Id"] + + helpers.wait_until(function() + local lines + lines, body, headers = mock_logs() + + return lines + end, 10) + + assert.is_string(body) + assert.equals(headers["Content-Type"], "application/x-protobuf") + assert_find_valid_logs(body, request_id, trace_id) + end) + + it("exports valid logs without tracing", function () + local headers, body, request_id + + local cli = helpers.proxy_client(7000, PROXY_PORT) + local res = assert(cli:send { + method = "GET", + path = "/logs", + }) + assert.res_status(200, res) + cli:close() + + request_id = res.headers["X-Kong-Request-Id"] + + helpers.wait_until(function() + local lines + lines, body, headers = mock_logs() + + return lines + end, 10) + + assert.is_string(body) + assert.equals(headers["Content-Type"], "application/x-protobuf") + + assert_find_valid_logs(body, request_id) + end) end) -- this test is not meant to check that the sampling rate is applied @@ -187,7 +366,7 @@ for _, strategy in helpers.each_strategy() do setup_instrumentations("all", { sampling_rate = sampling_rate, }, nil, nil, nil, nil, global_sampling_rate) - mock = helpers.http_mock(HTTP_SERVER_PORT, { timeout = HTTP_MOCK_TIMEOUT }) + mock = helpers.http_mock(HTTP_SERVER_PORT_TRACES, { timeout = HTTP_MOCK_TIMEOUT }) end) lazy_teardown(function() @@ -260,7 +439,7 @@ for _, strategy in helpers.each_strategy() do }, { "opentelemetry" })) setup_instrumentations("all", {}, nil, nil, nil, nil, sampling_rate) - mock = helpers.http_mock(HTTP_SERVER_PORT, { timeout = HTTP_MOCK_TIMEOUT }) + mock = helpers.http_mock(HTTP_SERVER_PORT_TRACES, { timeout = HTTP_MOCK_TIMEOUT }) end) lazy_teardown(function() @@ -342,7 +521,7 @@ for _, strategy in helpers.each_strategy() do ["X-Access-Token"] = "token", }, }, nil, case[1], case[2], case[3]) - mock = helpers.http_mock(HTTP_SERVER_PORT, { timeout = HTTP_MOCK_TIMEOUT }) + mock = helpers.http_mock(HTTP_SERVER_PORT_TRACES, { timeout = HTTP_MOCK_TIMEOUT }) end) lazy_teardown(function() @@ -390,7 +569,7 @@ for _, strategy in helpers.each_strategy() do ["os.version"] = "debian", } }) - mock = helpers.http_mock(HTTP_SERVER_PORT, { timeout = HTTP_MOCK_TIMEOUT }) + mock = helpers.http_mock(HTTP_SERVER_PORT_TRACES, { timeout = HTTP_MOCK_TIMEOUT }) end) lazy_teardown(function() @@ -459,7 +638,7 @@ for _, strategy in helpers.each_strategy() do fixtures.http_mock.my_server_block = [[ server { server_name myserver; - listen ]] .. HTTP_SERVER_PORT .. [[; + listen ]] .. HTTP_SERVER_PORT_TRACES .. [[; client_body_buffer_size 1024k; location / { @@ -550,7 +729,7 @@ for _, strategy in helpers.each_strategy() do }, { "opentelemetry" })) setup_instrumentations("request") - mock = helpers.http_mock(HTTP_SERVER_PORT, { timeout = HTTP_MOCK_TIMEOUT }) + mock = helpers.http_mock(HTTP_SERVER_PORT_TRACES, { timeout = HTTP_MOCK_TIMEOUT }) end) lazy_teardown(function() @@ -618,7 +797,7 @@ for _, strategy in helpers.each_strategy() do describe("#referenceable fields", function () local mock lazy_setup(function() - helpers.setenv("TEST_OTEL_ENDPOINT", "http://127.0.0.1:" .. HTTP_SERVER_PORT) + helpers.setenv("TEST_OTEL_ENDPOINT", "http://127.0.0.1:" .. HTTP_SERVER_PORT_TRACES) helpers.setenv("TEST_OTEL_ACCESS_KEY", "secret-1") helpers.setenv("TEST_OTEL_ACCESS_SECRET", "secret-2") @@ -635,7 +814,7 @@ for _, strategy in helpers.each_strategy() do ["X-Access-Secret"] = "{vault://env/test_otel_access_secret}", }, }) - mock = helpers.http_mock(HTTP_SERVER_PORT, { timeout = HTTP_MOCK_TIMEOUT }) + mock = helpers.http_mock(HTTP_SERVER_PORT_TRACES, { timeout = HTTP_MOCK_TIMEOUT }) end) lazy_teardown(function() diff --git a/spec/03-plugins/37-opentelemetry/05-otelcol_spec.lua b/spec/03-plugins/37-opentelemetry/05-otelcol_spec.lua index 6e15019aba25..41e36f8a180b 100644 --- a/spec/03-plugins/37-opentelemetry/05-otelcol_spec.lua +++ b/spec/03-plugins/37-opentelemetry/05-otelcol_spec.lua @@ -3,7 +3,7 @@ local helpers = require "spec.helpers" local kong_table = require "kong.tools.table" local ngx_re = require "ngx.re" local http = require "resty.http" - +local cjson = require "cjson.safe" local fmt = string.format local table_merge = kong_table.table_merge @@ -32,9 +32,13 @@ for _, strategy in helpers.each_strategy() do port = helpers.mock_upstream_port, }) - bp.routes:insert({ service = http_srv, - protocols = { "http" }, - paths = { "/" }}) + local traces_route = bp.routes:insert({ service = http_srv, + protocols = { "http" }, + paths = { "/traces" }}) + + local logs_route = bp.routes:insert({ service = http_srv, + protocols = { "http" }, + paths = { "/logs" }}) local route_traceid = bp.routes:insert({ service = http_srv, protocols = { "http" }, @@ -42,12 +46,35 @@ for _, strategy in helpers.each_strategy() do bp.plugins:insert({ name = "opentelemetry", + route = { id = traces_route.id }, config = table_merge({ traces_endpoint = fmt("http://%s:%s/v1/traces", OTELCOL_HOST, OTELCOL_HTTP_PORT), batch_flush_delay = 0, -- report immediately }, config) }) + bp.plugins:insert({ + name = "opentelemetry", + route = { id = logs_route.id }, + config = table_merge({ + logs_endpoint = fmt("http://%s:%s/v1/logs", OTELCOL_HOST, OTELCOL_HTTP_PORT), + queue = { + max_batch_size = 1000, + max_coalescing_delay = 2, + }, + }, config) + }) + + bp.plugins:insert({ + name = "post-function", + route = logs_route, + config = { + access = {[[ + ngx.log(ngx.WARN, "this is a log") + ]]}, + }, + }) + bp.plugins:insert({ name = "opentelemetry", route = { id = route_traceid.id }, @@ -61,7 +88,8 @@ for _, strategy in helpers.each_strategy() do assert(helpers.start_kong { database = strategy, nginx_conf = "spec/fixtures/custom_nginx.template", - plugins = "opentelemetry", + plugins = "opentelemetry, post-function", + log_level = "warn", tracing_instrumentations = types, tracing_sampling_rate = 1, }) @@ -88,7 +116,7 @@ for _, strategy in helpers.each_strategy() do it("send traces", function() local httpc = http.new() for i = 1, LIMIT do - local res, err = httpc:request_uri(proxy_url) + local res, err = httpc:request_uri(proxy_url .. "/traces") assert.is_nil(err) assert.same(200, res.status) end @@ -125,7 +153,7 @@ for _, strategy in helpers.each_strategy() do assert(helpers.restart_kong { database = strategy, nginx_conf = "spec/fixtures/custom_nginx.template", - plugins = "opentelemetry", + plugins = "opentelemetry, post-function", tracing_instrumentations = "all", tracing_sampling_rate = 0.00005, }) @@ -147,8 +175,82 @@ for _, strategy in helpers.each_strategy() do end httpc:close() end) - end) + describe("otelcol receives logs #http", function() + local REQUESTS = 100 + + lazy_setup(function() + -- clear file + local shell = require "resty.shell" + shell.run("mkdir -p $(dirname " .. OTELCOL_FILE_EXPORTER_PATH .. ")", nil, 0) + shell.run("cat /dev/null > " .. OTELCOL_FILE_EXPORTER_PATH, nil, 0) + setup_instrumentations("all") + end) + + lazy_teardown(function() + helpers.stop_kong() + end) + + it("send valid logs", function() + local httpc = http.new() + for i = 1, REQUESTS do + local res, err = httpc:request_uri(proxy_url .. "/logs") + assert.is_nil(err) + assert.same(200, res.status) + end + httpc:close() + + local parts + helpers.wait_until(function() + local f = assert(io.open(OTELCOL_FILE_EXPORTER_PATH, "rb")) + local raw_content = f:read("*all") + f:close() + + parts = split(raw_content, "\n", "jo") + return #parts > 0 + end, 10) + + local contents = {} + for _, p in ipairs(parts) do + -- after the file is truncated the collector + -- may continue exporting partial json objects + local trimmed = string.match(p, "({.*)") + local decoded = cjson.decode(trimmed) + if decoded then + table.insert(contents, decoded) + end + end + + local count = 0 + for _, content in ipairs(contents) do + if not content.resourceLogs then + goto continue + end + + local scope_logs = content.resourceLogs[1].scopeLogs + assert.is_true(#scope_logs > 0, scope_logs) + + for _, scope_log in ipairs(scope_logs) do + local log_records = scope_log.logRecords + for _, log_record in ipairs(log_records) do + if log_record.body.stringValue == "this is a log" then + count = count + 1 + + assert.not_nil(log_record.observedTimeUnixNano) + assert.not_nil(log_record.timeUnixNano) + assert.equals("SEVERITY_NUMBER_WARN", log_record.severityNumber) + assert.equals("WARN", log_record.severityText) + assert.not_nil(log_record.attributes) + end + end + end + + ::continue:: + end + + assert.equals(REQUESTS, count) + end) + end) end) end diff --git a/spec/fixtures/opentelemetry/otelcol.yaml b/spec/fixtures/opentelemetry/otelcol.yaml index 9cd430d1fc93..a15acde86bfe 100644 --- a/spec/fixtures/opentelemetry/otelcol.yaml +++ b/spec/fixtures/opentelemetry/otelcol.yaml @@ -26,3 +26,7 @@ service: receivers: [otlp] processors: [batch] exporters: [logging, file] + logs: + receivers: [otlp] + processors: [batch] + exporters: [logging, file]