Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Aggregate vector metrics that expose a pod_name label #419

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
1 change: 1 addition & 0 deletions vector/node/namespaced/kustomization.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ configMapGenerator:
- metrics.yaml=resources/metrics.yaml
- pods.yaml=resources/pods.yaml
- systemd.yaml=resources/systemd.yaml
- resources/metrics.lua
1 change: 0 additions & 1 deletion vector/node/namespaced/resources/global.yaml

This file was deleted.

126 changes: 126 additions & 0 deletions vector/node/namespaced/resources/metrics.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
function init()
-- Initialize the global LastValue table
LastValue = {
component_received_events_total = {},
component_received_event_bytes_total = {},
}
end

function on_event(event, emit)
if not pcall(process_event, event, emit) then
emit(generate_log("ERROR on process_event", event))
error() -- delegates on vector generating and increasing the error metric
end
end

function on_timer(emit)
if not pcall(cleanup_inactive_pods) then
emit(generate_log("ERROR on cleanup_inactive_pods", LastValue))
error() -- delegates on vector generating and increasing the error metric
end
end

function process_event(event, emit)
-- ensure that we don't mess with custom kube sources like "kubernetes_events"
if event.metric.tags.component_id ~= "kubernetes_logs" then
error()
end

local name = event.metric.name
local ns = event.metric.tags.pod_namespace
local pod = event.metric.tags.pod_name
local value = event.metric.counter.value

-- ensure that the metric type hasn't changed
if event.metric.kind ~= "absolute" then
error()
end

local last_value = LastValue[name][ns .. "__" .. pod] or 0
local inc = value - last_value

emit(generate_metric(name, ns, inc))

LastValue[name][ns .. "__" .. pod] = value
end

function cleanup_inactive_pods()
active = active_pods()

for metric, pods in pairs(LastValue) do
for pod, _ in pairs(pods) do
if not active[pod] then
LastValue[metric][pod] = nil
end
end
end
end

function active_pods()
local ls_handle = io.popen("ls /var/log/containers")
local containers
if ls_handle then
containers = ls_handle:read("*a")
ls_handle:close()
end

local pods = {}
for container in string.gmatch(containers, "[^\n]+") do
local pod, ns = string.match(container, "([^_]+)_([^_]+)")
local id = ns .. "__" .. pod
pods[id] = true
end
return pods
end

function generate_log(message, payload)
payload = payload or {}
local json = '{"timestamp":"'
.. os.date("%Y-%m-%dT%H:%M:%S")
.. '","message":"'
.. message
.. '","payload":'
.. table_to_json(payload)
.. "}"

return {
log = {
message = json,
timestamp = os.date("!*t"),
},
}
end

function generate_metric(name, namespace, value)
return {
metric = {
name = name,
namespace = "vector",
tags = {
component_id = "kubernetes_logs",
component_kind = "source",
component_type = "kubernetes_logs",
pod_namespace = namespace,
},
kind = "incremental",
counter = {
value = value,
},
timestamp = os.date("!*t"),
},
}
end

function table_to_json(t)
local contents = {}
for key, value in pairs(t) do
if type(value) == "table" then
table.insert(contents, '"' .. key .. '"' .. ":" .. table_to_json(value))
elseif "number" == type(value) then
table.insert(contents, string.format('"%s":%s', key, value))
elseif "string" == type(value) then
table.insert(contents, string.format('"%s":"%s"', key, value))
end
end
return "{" .. table.concat(contents, ",") .. "}"
end
48 changes: 47 additions & 1 deletion vector/node/namespaced/resources/metrics.yaml
Original file line number Diff line number Diff line change
@@ -1,9 +1,55 @@
sources:
vector_metrics:
type: internal_metrics

transforms:
# Reroute the metrics that have pod_name labels, so they can be aggregated before being exported
expensive_metrics_router:
type: route
inputs:
- vector_metrics
route:
received_bytes: .tags.component_type == "kubernetes_logs" && .name == "component_received_event_bytes_total"
received_events: .tags.component_type == "kubernetes_logs" && .name == "component_received_events_total"

# Agreggate metrics by removing pod_name and pod_namespace labels
expensive_metrics_aggregator:
type: lua
version: "2"
inputs:
- expensive_metrics_router.received_bytes
- expensive_metrics_router.received_events
source: "require('metrics')" # sources the file `metrics.lua`
hooks:
init: init
process: on_event
shutdown: on_timer
timers:
- handler: on_timer
interval_seconds: 60

# Split the generated metrics and the potential error logs, since they go to different sinks
expensive_metrics_aggregator_router:
type: route
inputs:
- expensive_metrics_aggregator
route:
logs:
type: is_log
metrics:
type: is_metric

sinks:
prometheus:
type: prometheus_exporter
address: 0.0.0.0:8080
inputs:
- vector_metrics
- expensive_metrics_router._unmatched
- expensive_metrics_aggregator_router.metrics

expensive_metrics_aggregator_logger:
type: console
encoding:
codec: text
inputs:
- expensive_metrics_aggregator_router.logs
Loading