Skip to content

Commit

Permalink
fixup! feat(observability): add OpenTelemetry logs
Browse files Browse the repository at this point in the history
  • Loading branch information
samugi committed Jul 5, 2024
1 parent a66c32d commit 648e787
Show file tree
Hide file tree
Showing 10 changed files with 91 additions and 61 deletions.
8 changes: 8 additions & 0 deletions kong/clustering/compat/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,14 @@ local function invalidate_keys_from_config(config_plugins, keys, log_suffix, dp_
end
end

-- Any dataplane older than 3.8.0
if dp_version_num < 3008000000 then
-- OSS
if name == "opentelemetry" then
has_update = rename_field(config, "traces_endpoint", "endpoint", has_update)
end
end

for _, key in ipairs(keys[name]) do
if delete_at(config, key) then
ngx_log(ngx_WARN, _log_prefix, name, " plugin contains configuration '", key,
Expand Down
13 changes: 12 additions & 1 deletion kong/globalpatches.lua
Original file line number Diff line number Diff line change
Expand Up @@ -593,17 +593,28 @@ return function(options)

-- OTel-formatted logs feature
local dynamic_hook = require "kong.dynamic_hook"
local hook_called = false
_G.ngx.log = function(...)
if hook_called then
-- detect recursive loops or yielding from the hook:
old_ngx_log(ngx.ERR, debug.traceback("concurrent execution detected for: ngx.log", 2))
return old_ngx_log(...)
end

-- stack level = 5:
-- 1: maybe_push
-- 2: dynamic_hook.pcall
-- 3: dynamic_hook.run_hook
-- 4: patched function
-- 5: caller
hook_called = true
dynamic_hook.run_hook("observability_logs", "push", 5, ...)

hook_called = false
return old_ngx_log(...)
end
-- export native ngx.log to be used where
-- the patched code must not be executed
_G.native_ngx_log = old_ngx_log

if not options.cli and not options.rbusted then
local timing = require "kong.timing"
Expand Down
81 changes: 45 additions & 36 deletions kong/observability/logs.lua
Original file line number Diff line number Diff line change
Expand Up @@ -11,32 +11,42 @@ end

local request_id_get = require "kong.observability.tracing.request_id".get
local time_ns = require "kong.tools.time".time_ns
local cycle_aware_deep_copy = require "kong.tools.utils".cycle_aware_deep_copy
local deep_copy = require "kong.tools.utils".deep_copy

local get_log_level = require "resty.kong.log".get_log_level
local constants_log_levels = require "kong.constants".LOG_LEVELS

local table_new = require "table.new"
local table_clear = require "table.clear"
local string_buffer = require "string.buffer"

local ngx = ngx
local kong = kong
local table = table
local tostring = tostring

local ngx_null = ngx.null
local table_pack = table.pack -- luacheck: ignore

local MAX_WORKER_LOGS = 1000
local MAX_REQUEST_LOGS = 1000
local INITIAL_SIZE_WORKER_LOGS = 100
local NGX_CTX_REQUEST_LOGS_KEY = "o11y_logs_request_scoped"

local worker_logs = table_new(100, 0)
local worker_logs = table_new(INITIAL_SIZE_WORKER_LOGS, 0)
local logline_buf = string_buffer.new()


-- WARNING: avoid using `ngx.log` in this function to prevent recursive loops
local function configured_log_level()
local ok, level = pcall(get_log_level)
if not ok then
level = constants_log_levels[kong.configuration.log_level]
-- This is unexpected outside of the context of unit tests
local level_str = kong.configuration.log_level
_G.native_ngx_log(ngx.WARN,
"[observability] OpenTelemetry logs failed reading dynamic log level. " ..
"Using log level: " .. level_str .. " from configuration."
)
level = constants_log_levels[level_str]
end

return level
Expand All @@ -45,18 +55,24 @@ end

-- needed because table.concat doesn't like booleans
local function concat_tostring(tab)
if #tab == 0 then
local tab_len = #tab
if tab_len == 0 then
return ""
end

for i = 1, #tab do
logline_buf:put(tostring(tab[i]))
end
for i = 1, tab_len do
local value = tab[i]

local retstr = logline_buf:tostring()
logline_buf:reset()
if value == ngx_null then
value = "nil"
else
value = tostring(value)
end

logline_buf:put(value)
end

return retstr
return logline_buf:get()
end


Expand All @@ -73,12 +89,12 @@ local function generate_log_entry(request_scoped, log_level, log_str, request_id
end

local attributes = {
request_id = request_id,
introspection_current_line = debug_info.currentline,
introspection_name = debug_info.name,
introspection_namewhat = debug_info.namewhat,
introspection_source = debug_info.source,
introspection_what = debug_info.what,
["request.id"] = request_id,
["introspection.current.line"] = debug_info.currentline,
["introspection.name"] = debug_info.name,
["introspection.namewhat"] = debug_info.namewhat,
["introspection.source"] = debug_info.source,
["introspection.what"] = debug_info.what,
}

local now_ns = time_ns()
Expand All @@ -104,22 +120,9 @@ end


function _M.maybe_push(stack_level, log_level, ...)
-- !WARNING! no logging here, to avoid infinite recursion.
--
-- Check if this log entry is eligible to go in the log buffer.
-- Early return cases:
-- WARNING: do not yield in this function, as it is called from ngx.log

-- no log line
local args = { ... }
if #args == 0 then
return
end

-- empty log line
local log_str = concat_tostring(args)
if log_str == "" then
return
end
-- Early return cases:

-- log level too low
if configured_log_level() < log_level then
Expand All @@ -145,6 +148,13 @@ function _M.maybe_push(stack_level, log_level, ...)
return
end

-- no (or empty) log line
local args = table_pack(...)
local log_str = concat_tostring(args)
if log_str == "" then
return
end

-- generate & push log entry
local debug_info = debug.getinfo(stack_level, "nSl")
local log_entry = generate_log_entry(request_scoped, log_level, log_str, request_id, debug_info)
Expand All @@ -153,16 +163,15 @@ end


function _M.get_worker_logs()
local wl = cycle_aware_deep_copy(worker_logs)

table_clear(worker_logs)
local wl = worker_logs
worker_logs = table_new(INITIAL_SIZE_WORKER_LOGS, 0)
return wl
end


function _M.get_request_logs()
local request_logs = get_request_log_buffer()
return cycle_aware_deep_copy(request_logs)
return deep_copy(request_logs)
end


Expand Down
2 changes: 1 addition & 1 deletion kong/plugins/opentelemetry/handler.lua
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ function OpenTelemetryHandler:configure(configs)
for _, config in ipairs(configs) do
if config.logs_endpoint then
dynamic_hook.hook("observability_logs", "push", o11y_logs.maybe_push)
dynamic_hook.always_enable("observability_logs")
dynamic_hook.enable_by_default("observability_logs")
end
end
end
Expand Down
13 changes: 4 additions & 9 deletions kong/plugins/opentelemetry/logs.lua
Original file line number Diff line number Diff line change
@@ -1,10 +1,3 @@
-- This software is copyright Kong Inc. and its licensors.
-- Use of the software is subject to the agreement between your organization
-- and Kong Inc. If there is no such agreement, use is governed by and
-- subject to the terms of the Kong Master Software License Agreement found
-- at https://konghq.com/enterprisesoftwarelicense/.
-- [ END OF LICENSE 0867164ffc95e54f04670b5169c09574bdbd9bba ]

local Queue = require "kong.tools.queue"
local o11y_logs = require "kong.observability.logs"
local otlp = require "kong.plugins.opentelemetry.otlp"
Expand Down Expand Up @@ -53,10 +46,12 @@ local function log(conf)
local worker_logs = o11y_logs.get_worker_logs()
local request_logs = o11y_logs.get_request_logs()

local worker_logs_len = #worker_logs
local request_logs_len = #request_logs
ngx_log(ngx_DEBUG, _log_prefix, "total request_logs in current request: ",
#request_logs, " total worker_logs in current request: ", #worker_logs)
request_logs_len, " total worker_logs in current request: ", worker_logs_len)

if #request_logs + #worker_logs == 0 then
if request_logs_len + worker_logs_len == 0 then
return
end

Expand Down
20 changes: 10 additions & 10 deletions kong/plugins/opentelemetry/otlp.lua
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ local pb = require "pb"
local new_tab = require "table.new"
local nkeys = require "table.nkeys"
local tablepool = require "tablepool"
local cycle_aware_deep_copy = require("kong.tools.table").cycle_aware_deep_copy
local deep_copy = require("kong.tools.table").deep_copy

local kong = kong
local insert = table.insert
Expand Down Expand Up @@ -169,7 +169,7 @@ do
encode_traces = function(spans, resource_attributes)
local tab = tablepool_fetch(POOL_OTLP, 0, 2)
if not tab.resource_spans then
tab.resource_spans = cycle_aware_deep_copy(pb_memo_trace.resource_spans)
tab.resource_spans = deep_copy(pb_memo_trace.resource_spans)
end

local resource = tab.resource_spans[1].resource
Expand Down Expand Up @@ -204,7 +204,7 @@ do
encode_logs = function(log_batch, resource_attributes)
local tab = tablepool_fetch(POOL_OTLP, 0, 3)
if not tab.resource_logs then
tab.resource_logs = cycle_aware_deep_copy(pb_memo_log.resource_logs)
tab.resource_logs = deep_copy(pb_memo_log.resource_logs)
end

local resource = tab.resource_logs[1].resource
Expand All @@ -225,14 +225,14 @@ do

-- see: kong/include/opentelemetry/proto/logs/v1/logs.proto
local map_severity = {
[ngx.DEBUG] = { 5, "DEBUG" },
[ngx.INFO] = { 9, "INFO" },
[ngx.DEBUG] = { 5, "DEBUG" },
[ngx.INFO] = { 9, "INFO" },
[ngx.NOTICE] = { 11, "NOTICE" },
[ngx.WARN] = { 13, "WARN" },
[ngx.ERR] = { 17, "ERR" },
[ngx.CRIT] = { 19, "CRIT" },
[ngx.ALERT] = { 21, "ALERT" },
[ngx.EMERG] = { 23, "EMERG" },
[ngx.WARN] = { 13, "WARN" },
[ngx.ERR] = { 17, "ERR" },
[ngx.CRIT] = { 19, "CRIT" },
[ngx.ALERT] = { 21, "ALERT" },
[ngx.EMERG] = { 23, "EMERG" },
}

prepare_logs = function(logs, trace_id, flags)
Expand Down
1 change: 0 additions & 1 deletion kong/plugins/opentelemetry/schema.lua
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ return {
{
endpoint = typedefs.url {
referenceable = true,
translate_backwards = {'traces_endpoint'},
deprecation = {
message = "OpenTelemetry: config.endpoint is deprecated, please use config.traces_endpoint instead",
removal_in_version = "4.0", },
Expand Down
4 changes: 2 additions & 2 deletions kong/plugins/opentelemetry/utils.lua
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
local http = require "resty.http"
local clone = require "table.clone"
local null = ngx.null

local tostring = tostring
local null = ngx.null


local CONTENT_TYPE_HEADER_NAME = "Content-Type"
Expand Down Expand Up @@ -52,4 +52,4 @@ return {
http_export_request = http_export_request,
get_headers = get_headers,
_log_prefix = _log_prefix,
}
}
6 changes: 6 additions & 0 deletions kong/runloop/plugins_iterator.lua
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
local workspaces = require "kong.workspaces"
local constants = require "kong.constants"
local tablepool = require "tablepool"
local req_dyn_hook = require "kong.dynamic_hook"


local kong = kong
Expand All @@ -17,6 +18,7 @@ local fetch_table = tablepool.fetch
local release_table = tablepool.release
local uuid = require("kong.tools.uuid").uuid
local get_updated_monotonic_ms = require("kong.tools.time").get_updated_monotonic_ms
local req_dyn_hook_disable_by_default = req_dyn_hook.disable_by_default


local TTL_ZERO = { ttl = 0 }
Expand Down Expand Up @@ -428,6 +430,10 @@ end


local function configure(configurable, ctx)
-- Disable hooks that are selectively enabled by plugins
-- in their :configure handler
req_dyn_hook_disable_by_default("observability_logs")

ctx = ctx or ngx.ctx
local kong_global = require "kong.global"
for _, plugin in ipairs(CONFIGURABLE_PLUGINS) do
Expand Down
4 changes: 3 additions & 1 deletion spec/02-integration/09-hybrid_mode/09-config-compat_spec.lua
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,8 @@ describe("CP/DP config compat transformations #" .. strategy, function()
expected_otel_prior_35.config.propagation = nil
expected_otel_prior_35.config.traces_endpoint = nil
expected_otel_prior_35.config.logs_endpoint = nil

expected_otel_prior_35.config.endpoint = "http://1.1.1.1:12345/v1/trace"

do_assert(uuid(), "3.4.0", expected_otel_prior_35)

-- cleanup
Expand All @@ -279,6 +280,7 @@ describe("CP/DP config compat transformations #" .. strategy, function()
expected_otel_prior_34.config.propagation = nil
expected_otel_prior_34.config.traces_endpoint = nil
expected_otel_prior_34.config.logs_endpoint = nil
expected_otel_prior_34.config.endpoint = "http://1.1.1.1:12345/v1/trace"
do_assert(uuid(), "3.3.0", expected_otel_prior_34)

-- cleanup
Expand Down

0 comments on commit 648e787

Please sign in to comment.