-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Aggregate vector metrics that expose a pod_name label
- Loading branch information
1 parent
2b353e1
commit 60d221e
Showing
3 changed files
with
185 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,137 @@ | ||
function init() | ||
-- Initialize the global Data table | ||
Data = { | ||
inactive = { | ||
component_received_events_total = 0, | ||
component_received_event_bytes_total = 0, | ||
}, | ||
active = { | ||
component_received_events_total = {}, | ||
component_received_event_bytes_total = {}, | ||
}, | ||
} | ||
end | ||
|
||
function on_event(event, emit) | ||
if not pcall(upsert_metric, event) then | ||
emit(generate_log("ERROR on upsert_metric", event)) | ||
error() -- delegates on vector generating and increasing the error metric | ||
end | ||
end | ||
|
||
function on_timer(emit) | ||
if not pcall(emit_metrics, emit) then | ||
emit(generate_log("ERROR on emit_metrics", Data)) | ||
error() -- delegates on vector generating and increasing the error metric | ||
end | ||
end | ||
|
||
function upsert_metric(event) | ||
-- ensure that we don't mess with custom kube sources like "kubernetes_events" | ||
if event.metric.tags.component_id ~= "kubernetes_logs" then | ||
error() | ||
end | ||
|
||
local name = event.metric.name | ||
local ns = event.metric.tags.pod_namespace | ||
local pod = event.metric.tags.pod_name | ||
local value = event.metric.counter.value | ||
|
||
-- ensure that the metric type hasn't changed | ||
if event.metric.kind ~= "absolute" then | ||
error() | ||
end | ||
|
||
Data["active"][name][ns .. "__" .. pod] = value | ||
end | ||
|
||
function emit_metrics(emit) | ||
cleanup_inactive_pods() | ||
emit(generate_metric("component_received_events_total")) | ||
emit(generate_metric("component_received_event_bytes_total")) | ||
end | ||
|
||
function cleanup_inactive_pods() | ||
active = active_pods() | ||
|
||
for metric, pods in pairs(Data.active) do | ||
for pod, value in pairs(pods) do | ||
if not active[pod] then | ||
Data["inactive"][metric] = Data["inactive"][metric] + value | ||
Data["active"][metric][pod] = nil | ||
end | ||
end | ||
end | ||
end | ||
|
||
function active_pods() | ||
local ls_handle = io.popen("ls /var/log/containers") | ||
local containers | ||
if ls_handle then | ||
containers = ls_handle:read("*a") | ||
ls_handle:close() | ||
end | ||
|
||
local pods = {} | ||
for container in string.gmatch(containers, "[^\n]+") do | ||
local pod, ns = string.match(container, "([^_]+)_([^_]+)") | ||
local id = ns .. "__" .. pod | ||
pods[id] = true | ||
end | ||
return pods | ||
end | ||
|
||
function generate_log(message, payload) | ||
local json = '{"timestamp":"' | ||
.. os.date("%Y-%m-%dT%H:%M:%S") | ||
.. '","message":"' | ||
.. message | ||
.. '","payload":' | ||
.. table_to_json(payload) | ||
.. "}" | ||
|
||
return { | ||
log = { | ||
message = json, | ||
timestamp = os.date("!*t"), | ||
}, | ||
} | ||
end | ||
|
||
function generate_metric(name) | ||
local active = 0 | ||
for _, value in pairs(Data["active"][name]) do | ||
active = active + tostring(value) | ||
end | ||
local total = active + Data["inactive"][name] | ||
return { | ||
metric = { | ||
name = name, | ||
namespace = "vector", | ||
tags = { | ||
component_id = "kubernetes_logs", | ||
component_kind = "source", | ||
component_type = "kubernetes_logs", | ||
}, | ||
kind = "absolute", | ||
counter = { | ||
value = total, | ||
}, | ||
timestamp = os.date("!*t"), | ||
}, | ||
} | ||
end | ||
|
||
function table_to_json(t) | ||
local contents = {} | ||
for key, value in pairs(t) do | ||
if type(value) == "table" then | ||
table.insert(contents, '"' .. key .. '"' .. ":" .. table_to_json(value)) | ||
elseif "number" == type(value) then | ||
table.insert(contents, string.format('"%s":%s', key, value)) | ||
elseif "string" == type(value) then | ||
table.insert(contents, string.format('"%s":"%s"', key, value)) | ||
end | ||
end | ||
return "{" .. table.concat(contents, ",") .. "}" | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,9 +1,55 @@ | ||
sources: | ||
vector_metrics: | ||
type: internal_metrics | ||
|
||
transforms: | ||
# Reroute the metrics that have pod_name labels, so they can be aggregated before being exported | ||
expensive_metrics_router: | ||
type: route | ||
inputs: | ||
- vector_metrics | ||
route: | ||
received_bytes: .tags.component_type == "kubernetes_logs" && .name == "component_received_event_bytes_total" | ||
received_events: .tags.component_type == "kubernetes_logs" && .name == "component_received_events_total" | ||
|
||
# Agreggate metrics by removing pod_name and pod_namespace labels | ||
expensive_metrics_aggregator: | ||
type: lua | ||
version: "2" | ||
inputs: | ||
- expensive_metrics_router.received_bytes | ||
- expensive_metrics_router.received_events | ||
source: "require('metrics')" # sources the file `metrics.lua` | ||
hooks: | ||
init: init | ||
process: on_event | ||
shutdown: on_timer | ||
timers: | ||
- handler: on_timer | ||
interval_seconds: 5 | ||
|
||
# Split the generated metrics and the potential error logs, since they go to different sinks | ||
expensive_metrics_aggregator_router: | ||
type: route | ||
inputs: | ||
- expensive_metrics_aggregator | ||
route: | ||
logs: | ||
type: is_log | ||
metrics: | ||
type: is_metric | ||
|
||
sinks: | ||
prometheus: | ||
type: prometheus_exporter | ||
address: 0.0.0.0:8080 | ||
inputs: | ||
- vector_metrics | ||
- expensive_metrics_router._unmatched | ||
- expensive_metrics_aggregator_router.metrics | ||
|
||
expensive_metrics_aggregator_logger: | ||
type: console | ||
encoding: | ||
codec: text | ||
inputs: | ||
- expensive_metrics_aggregator_router.logs |