Skip to content

Commit

Permalink
fixup! tests(opentelemetry): OTel formatted logs
Browse files Browse the repository at this point in the history
  • Loading branch information
samugi committed Jul 5, 2024
1 parent 648e787 commit 1ff59b7
Show file tree
Hide file tree
Showing 7 changed files with 184 additions and 50 deletions.
21 changes: 7 additions & 14 deletions spec/01-unit/26-observability/05-logs_spec.lua
Original file line number Diff line number Diff line change
@@ -1,10 +1,3 @@
-- This software is copyright Kong Inc. and its licensors.
-- Use of the software is subject to the agreement between your organization
-- and Kong Inc. If there is no such agreement, use is governed by and
-- subject to the terms of the Kong Master Software License Agreement found
-- at https://konghq.com/enterprisesoftwarelicense/.
-- [ END OF LICENSE 0867164ffc95e54f04670b5169c09574bdbd9bba ]

require "kong.tools.utils"


Expand Down Expand Up @@ -74,19 +67,19 @@ describe("Observability/Logs unit tests", function()
local log_level = ngx.WARN
local body = "Careful! I'm a warning!"

maybe_push(1, log_level, body)
maybe_push(1, log_level, body, true, 123, ngx.null, nil, function()end, { foo = "bar" })
local worker_logs = get_worker_logs()
assert.equals(1, #worker_logs)

local logged_entry = worker_logs[1]
assert.same(log_level, logged_entry.log_level)
assert.same(body, logged_entry.body)
assert.matches(body .. "true123nilnilfunction:%s0x%x+table:%s0x%x+", logged_entry.body)
assert.is_table(logged_entry.attributes)
assert.is_number(logged_entry.attributes.introspection_current_line)
assert.is_string(logged_entry.attributes.introspection_name)
assert.is_string(logged_entry.attributes.introspection_namewhat)
assert.is_string(logged_entry.attributes.introspection_source)
assert.is_string(logged_entry.attributes.introspection_what)
assert.is_number(logged_entry.attributes["introspection.current.line"])
assert.is_string(logged_entry.attributes["introspection.name"])
assert.is_string(logged_entry.attributes["introspection.namewhat"])
assert.is_string(logged_entry.attributes["introspection.source"])
assert.is_string(logged_entry.attributes["introspection.what"])
assert.is_number(logged_entry.observed_time_unix_nano)
assert.is_number(logged_entry.time_unix_nano)
end)
Expand Down
86 changes: 86 additions & 0 deletions spec/02-integration/14-observability/05-logs_spec.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
local helpers = require "spec.helpers"

for _, strategy in helpers.each_strategy() do
describe("Observability Logs", function ()
describe("ngx.log patch", function()
local proxy_client
local post_function_access = [[
local threads = {}
local n_threads = 100
for i = 1, n_threads do
threads[i] = ngx.thread.spawn(function()
ngx.log(ngx.INFO, "thread_" .. i .. " logged")
end)
end
for i = 1, n_threads do
ngx.thread.wait(threads[i])
end
]]

lazy_setup(function()
local bp = helpers.get_db_utils(strategy, {
"routes",
"services",
"plugins",
})

local http_srv = assert(bp.services:insert {
name = "mock-service",
host = helpers.mock_upstream_host,
port = helpers.mock_upstream_port,
})

local logs_route = assert(bp.routes:insert({ service = http_srv,
protocols = { "http" },
paths = { "/logs" }}))

assert(bp.plugins:insert({
name = "post-function",
route = logs_route,
config = {
access = { post_function_access },
},
}))

-- only needed to enable the log collection hook
assert(bp.plugins:insert({
name = "opentelemetry",
route = logs_route,
config = {
logs_endpoint = "http://" .. helpers.mock_upstream_host .. ":" .. helpers.mock_upstream_port,
}
}))

helpers.start_kong({
database = strategy,
nginx_conf = "spec/fixtures/custom_nginx.template",
plugins = "opentelemetry,post-function",
})
proxy_client = helpers.proxy_client()
end)

lazy_teardown(function()
if proxy_client then
proxy_client:close()
end
helpers.stop_kong()
end)

it("does not produce yielding and concurrent executions", function ()
local res = assert(proxy_client:send {
method = "GET",
path = "/logs",
})
assert.res_status(200, res)

-- plugin produced logs:
assert.logfile().has.line("thread_1 logged", true, 10)
assert.logfile().has.line("thread_100 logged", true, 10)
-- plugin did not produce concurrent accesses to ngx.log:
assert.logfile().has.no.line("[error]", true)
end)
end)
end)
end
127 changes: 91 additions & 36 deletions spec/03-plugins/37-opentelemetry/04-exporter_spec.lua
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ for _, strategy in helpers.each_strategy() do
protocols = { "http" },
paths = { "/logs" }}))

local logs_traces_route = assert(bp.routes:insert({ service = http_srv,
protocols = { "http" },
paths = { "/traces_logs" }}))

assert(bp.routes:insert({ service = http_srv2,
protocols = { "http" },
paths = { "/no_plugin" }}))
Expand All @@ -88,7 +92,7 @@ for _, strategy in helpers.each_strategy() do

assert(bp.plugins:insert({
name = "opentelemetry",
route = logs_route,
route = logs_traces_route,
config = table_merge({
traces_endpoint = "http://127.0.0.1:" .. HTTP_SERVER_PORT_TRACES,
logs_endpoint = "http://127.0.0.1:" .. HTTP_SERVER_PORT_LOGS,
Expand All @@ -99,6 +103,26 @@ for _, strategy in helpers.each_strategy() do
}, config)
}))

assert(bp.plugins:insert({
name = "opentelemetry",
route = logs_route,
config = table_merge({
logs_endpoint = "http://127.0.0.1:" .. HTTP_SERVER_PORT_LOGS,
queue = {
max_batch_size = 1000,
max_coalescing_delay = 2,
},
}, config)
}))

assert(bp.plugins:insert({
name = "post-function",
route = logs_traces_route,
config = {
access = { post_function_access_body },
},
}))

assert(bp.plugins:insert({
name = "post-function",
route = logs_route,
Expand Down Expand Up @@ -197,34 +221,7 @@ for _, strategy in helpers.each_strategy() do
assert.is_true(#scope_spans > 0, scope_spans)
end)

it("exports valid logs", function ()
local trace_id = gen_trace_id()

local headers, body, request_id

local cli = helpers.proxy_client(7000, PROXY_PORT)
local res = assert(cli:send {
method = "GET",
path = "/logs",
headers = {
traceparent = fmt("00-%s-0123456789abcdef-01", trace_id),
},
})
assert.res_status(200, res)
cli:close()

request_id = res.headers["X-Kong-Request-Id"]

helpers.wait_until(function()
local lines
lines, body, headers = mock_logs()

return lines
end, 10)

assert.is_string(body)
assert.equals(headers["Content-Type"], "application/x-protobuf")

local function assert_find_valid_logs(body, request_id, trace_id)
local decoded = assert(pb.decode("opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest", body))
assert.not_nil(decoded)

Expand Down Expand Up @@ -258,7 +255,7 @@ for _, strategy in helpers.each_strategy() do
end

-- ensure the log is from the current request
if found_attrs.request_id == request_id then
if found_attrs["request.id"] == request_id then
local expected_line
if logline:sub(-8) == "kong.log" then
expected_line = 1
Expand All @@ -268,13 +265,15 @@ for _, strategy in helpers.each_strategy() do

assert.is_number(log_record.time_unix_nano)
assert.is_number(log_record.observed_time_unix_nano)
assert.is_number(log_record.flags)
assert.equals(post_function_access_body, found_attrs.introspection_source)
assert.equals(expected_line, found_attrs.introspection_current_line)
assert.equals(post_function_access_body, found_attrs["introspection.source"])
assert.equals(expected_line, found_attrs["introspection.current.line"])
assert.equals(log_record.severity_number, 9)
assert.equals(log_record.severity_text, "INFO")
assert.equals(trace_id, to_hex(log_record.trace_id))
assert.is_string(log_record.span_id)
if trace_id then
assert.equals(trace_id, to_hex(log_record.trace_id))
assert.is_string(log_record.span_id)
assert.is_number(log_record.flags)
end

found = found + 1
if found == 2 then
Expand All @@ -285,6 +284,62 @@ for _, strategy in helpers.each_strategy() do
end
end
assert.equals(2, found)
end

it("exports valid logs with tracing", function ()
local trace_id = gen_trace_id()

local headers, body, request_id

local cli = helpers.proxy_client(7000, PROXY_PORT)
local res = assert(cli:send {
method = "GET",
path = "/traces_logs",
headers = {
traceparent = fmt("00-%s-0123456789abcdef-01", trace_id),
},
})
assert.res_status(200, res)
cli:close()

request_id = res.headers["X-Kong-Request-Id"]

helpers.wait_until(function()
local lines
lines, body, headers = mock_logs()

return lines
end, 10)

assert.is_string(body)
assert.equals(headers["Content-Type"], "application/x-protobuf")
assert_find_valid_logs(body, request_id, trace_id)
end)

it("exports valid logs without tracing", function ()
local headers, body, request_id

local cli = helpers.proxy_client(7000, PROXY_PORT)
local res = assert(cli:send {
method = "GET",
path = "/logs",
})
assert.res_status(200, res)
cli:close()

request_id = res.headers["X-Kong-Request-Id"]

helpers.wait_until(function()
local lines
lines, body, headers = mock_logs()

return lines
end, 10)

assert.is_string(body)
assert.equals(headers["Content-Type"], "application/x-protobuf")

assert_find_valid_logs(body, request_id)
end)
end)

Expand Down Expand Up @@ -384,7 +439,7 @@ for _, strategy in helpers.each_strategy() do
}, { "opentelemetry" }))

setup_instrumentations("all", {}, nil, nil, nil, nil, sampling_rate)
mock = helpers.http_mock(HTTP_SERVER_PORT, { timeout = HTTP_MOCK_TIMEOUT })
mock = helpers.http_mock(HTTP_SERVER_PORT_TRACES, { timeout = HTTP_MOCK_TIMEOUT })
end)

lazy_teardown(function()
Expand Down

0 comments on commit 1ff59b7

Please sign in to comment.