Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Aggregate vector metrics that expose a pod_name label #419

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
1 change: 1 addition & 0 deletions vector/node/namespaced/kustomization.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ configMapGenerator:
- metrics.yaml=resources/metrics.yaml
- pods.yaml=resources/pods.yaml
- systemd.yaml=resources/systemd.yaml
- resources/metrics.lua
137 changes: 137 additions & 0 deletions vector/node/namespaced/resources/metrics.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
function init()
-- Initialize the global Data table
Data = {
inactive = {
component_received_events_total = 0,
component_received_event_bytes_total = 0,
},
active = {
component_received_events_total = {},
component_received_event_bytes_total = {},
},
}
end

function on_event(event, emit)
if not pcall(upsert_metric, event) then
emit(generate_log("ERROR on upsert_metric", event))
error() -- delegates on vector generating and increasing the error metric
end
end

function on_timer(emit)
if not pcall(emit_metrics, emit) then
emit(generate_log("ERROR on emit_metrics", Data))
error() -- delegates on vector generating and increasing the error metric
end
end

function upsert_metric(event)
-- ensure that we don't mess with custom kube sources like "kubernetes_events"
if event.metric.tags.component_id ~= "kubernetes_logs" then
error()
end

local name = event.metric.name
local ns = event.metric.tags.pod_namespace
local pod = event.metric.tags.pod_name
local value = event.metric.counter.value

-- ensure that the metric type hasn't changed
if event.metric.kind ~= "absolute" then
error()
end

Data["active"][name][ns .. "__" .. pod] = value
end

function emit_metrics(emit)
cleanup_inactive_pods()
emit(generate_metric("component_received_events_total"))
emit(generate_metric("component_received_event_bytes_total"))
end

function cleanup_inactive_pods()
active = active_pods()

for metric, pods in pairs(Data.active) do
for pod, value in pairs(pods) do
if not active[pod] then
Data["inactive"][metric] = Data["inactive"][metric] + value
Data["active"][metric][pod] = nil
end
end
end
end

function active_pods()
local ls_handle = io.popen("ls /var/log/containers")
local containers
if ls_handle then
containers = ls_handle:read("*a")
ls_handle:close()
end

local pods = {}
for container in string.gmatch(containers, "[^\n]+") do
local pod, ns = string.match(container, "([^_]+)_([^_]+)")
local id = ns .. "__" .. pod
pods[id] = true
end
return pods
end

function generate_log(message, payload)
local json = '{"timestamp":"'
.. os.date("%Y-%m-%dT%H:%M:%S")
.. '","message":"'
.. message
.. '","payload":'
.. table_to_json(payload)
.. "}"

return {
log = {
message = json,
timestamp = os.date("!*t"),
},
}
end

function generate_metric(name)
local active = 0
for _, value in pairs(Data["active"][name]) do
active = active + tostring(value)
end
local total = active + Data["inactive"][name]
return {
metric = {
name = name,
namespace = "vector",
tags = {
component_id = "kubernetes_logs",
component_kind = "source",
component_type = "kubernetes_logs",
},
kind = "absolute",
counter = {
value = total,
},
timestamp = os.date("!*t"),
},
}
end

function table_to_json(t)
local contents = {}
for key, value in pairs(t) do
if type(value) == "table" then
table.insert(contents, '"' .. key .. '"' .. ":" .. table_to_json(value))
elseif "number" == type(value) then
table.insert(contents, string.format('"%s":%s', key, value))
elseif "string" == type(value) then
table.insert(contents, string.format('"%s":"%s"', key, value))
end
end
return "{" .. table.concat(contents, ",") .. "}"
end
48 changes: 47 additions & 1 deletion vector/node/namespaced/resources/metrics.yaml
Original file line number Diff line number Diff line change
@@ -1,9 +1,55 @@
sources:
vector_metrics:
type: internal_metrics

transforms:
# Reroute the metrics that have pod_name labels, so they can be aggregated before being exported
expensive_metrics_router:
type: route
inputs:
- vector_metrics
route:
received_bytes: .tags.component_type == "kubernetes_logs" && .name == "component_received_event_bytes_total"
received_events: .tags.component_type == "kubernetes_logs" && .name == "component_received_events_total"

# Agreggate metrics by removing pod_name and pod_namespace labels
expensive_metrics_aggregator:
type: lua
version: "2"
inputs:
- expensive_metrics_router.received_bytes
- expensive_metrics_router.received_events
source: "require('metrics')" # sources the file `metrics.lua`
hooks:
init: init
process: on_event
shutdown: on_timer
timers:
- handler: on_timer
interval_seconds: 5

# Split the generated metrics and the potential error logs, since they go to different sinks
expensive_metrics_aggregator_router:
type: route
inputs:
- expensive_metrics_aggregator
route:
logs:
type: is_log
metrics:
type: is_metric

sinks:
prometheus:
type: prometheus_exporter
address: 0.0.0.0:8080
inputs:
- vector_metrics
- expensive_metrics_router._unmatched
- expensive_metrics_aggregator_router.metrics

expensive_metrics_aggregator_logger:
type: console
encoding:
codec: text
inputs:
- expensive_metrics_aggregator_router.logs
Loading