Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Aggregate vector metrics that expose a pod_name label #419

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
6 changes: 4 additions & 2 deletions vector/node/namespaced/daemonset.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ spec:
app.kubernetes.io/instance: vector-node
annotations:
prometheus.io/scrape: "true"
prometheus.io/path: /metrics
prometheus.io/port: "8080"
spec:
tolerations:
- key: node-role.kubernetes.io/master
Expand Down Expand Up @@ -56,6 +54,10 @@ spec:
- name: var-log
mountPath: /var/log
readOnly: true
ports:
- name: metrics
containerPort: 8080
protocol: TCP
resources:
requests:
cpu: 0m
Expand Down
2 changes: 2 additions & 0 deletions vector/node/namespaced/kustomization.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,5 @@ configMapGenerator:
- metrics.yaml=resources/metrics.yaml
- pods.yaml=resources/pods.yaml
- systemd.yaml=resources/systemd.yaml
- resources/metrics.lua
- resources/global.yaml
149 changes: 149 additions & 0 deletions vector/node/namespaced/resources/metrics.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
function init()
-- Initialize the global LastValue table
LastValue = {
component_received_events_total = {},
component_received_event_bytes_total = {},
}
-- since vector by default keeps these metrics, global config flag `
-- expire_metrics_secs` must be set
-- this interval should be higher then `expire_metrics_secs`
ExpireMetricSecs = 600
end

function on_event(event, emit)
--TODO: remove
-- emit(event)

local status, err = pcall(process_event, event, emit)
if not status then
emit(generate_log("ERROR on process_event" .. err, event))
error() -- delegates on vector generating and increasing the error metric
end
end

function on_timer(emit)
local status, err = pcall(cleanup_inactive_metrics, emit)
if not status then
emit(generate_log("ERROR on cleanup_inactive_metrics" .. err, LastValue))
error() -- delegates on vector generating and increasing the error metric
end
end

function process_event(event, emit)
-- ensure that the metric type hasn't changed
if event.metric.kind ~= "absolute" then
emit(generate_log("ERROR only absolute events can be aggregated", event))
error()
end

local name = event.metric.name
local ns = event.metric.tags.pod_namespace or ""
local pod = event.metric.tags.pod_name or ""
local newValue = event.metric.counter.value
local key = ns .. "__" .. pod

if ns == "" then
emit(generate_log("ERROR empty namespace not allowed", event))
error()
end

if pod == "" then
emit(generate_log("ERROR empty pod name not allowed", event))
error()
end

if LastValue[name][key] == nil then
LastValue[name][key] = { value = 0, updatedAt = os.time() }
end

local inc = newValue - LastValue[name][key].value

if inc > 0 then
emit(generate_metric(name, ns, inc))
elseif inc < 0 then
emit(generate_log("ERROR adjusting negative diff inc:" .. inc .. ", old:" .. table_to_json(LastValue[name][key]),
event))
-- since metrics are counters if new value is < old value then we can
-- assume metrics has been expired on vector end.
-- hence we can take newValue as "new" initial value
emit(generate_metric(name, ns, newValue))
end

-- since vector by default persists inactive metrics, global config flag
-- `expire_metrics_secs` must be set to expire stale metrics.
-- since vector will remove these metrics based on last updated time
-- script needs to maintain its own timestamp for clean up
if LastValue[name][key].value ~= newValue then
LastValue[name][key].value = newValue
LastValue[name][key].updatedAt = os.time()
end
end

function cleanup_inactive_metrics(emit)
local currentTime = os.time()

for metric, pods in pairs(LastValue) do
for pod, _ in pairs(pods) do
if (currentTime - LastValue[metric][pod].updatedAt) > ExpireMetricSecs then
LastValue[metric][pod] = nil
end
end
end
end

function generate_log(message, payload)
payload = payload or {}
local json = '{"timestamp":"'
.. os.date("%Y-%m-%dT%H:%M:%S")
.. '","message":" [metrics.lua] '
.. message
.. '","payload":'
.. table_to_json(payload)
.. "}"

return {
log = {
message = json,
timestamp = os.date("!*t"),
},
}
end

function generate_metric(name, namespace, value)
return {
metric = {
name = name,
--TODO: change to vector
namespace = "hh",
tags = {
component_id = "kubernetes_logs",
component_kind = "source",
component_type = "kubernetes_logs",
pod_namespace = namespace,
},
kind = "incremental",
counter = {
value = value,
},
timestamp = os.date("!*t"),
},
}
end

function table_to_json(t)
if t == nil then
return "null"
end

local contents = {}
for key, value in pairs(t) do
if type(value) == "table" then
table.insert(contents, '"' .. key .. '"' .. ":" .. table_to_json(value))
elseif "number" == type(value) then
table.insert(contents, string.format('"%s":%s', key, value))
elseif "string" == type(value) then
table.insert(contents, string.format('"%s":"%s"', key, value))
end
end
return "{" .. table.concat(contents, ",") .. "}"
end
53 changes: 53 additions & 0 deletions vector/node/namespaced/resources/metrics.yaml
Original file line number Diff line number Diff line change
@@ -1,9 +1,62 @@
sources:
vector_metrics:
type: internal_metrics

transforms:
# Reroute the metrics that have pod_name labels, so they can be aggregated before being exported
expensive_metrics_router:
type: route
inputs:
- vector_metrics
route:
received_bytes: .tags.component_id == "kubernetes_logs" && .name == "component_received_event_bytes_total"
received_events: .tags.component_id == "kubernetes_logs" && .name == "component_received_events_total"

# Agreggate metrics by removing pod_name and pod_namespace labels
expensive_metrics_aggregator:
type: lua
version: "2"
inputs:
- expensive_metrics_router.received_bytes
- expensive_metrics_router.received_events
source: "require('metrics')" # sources the file `metrics.lua`
hooks:
init: init
process: on_event
timers:
- handler: on_timer
interval_seconds: 30

# Split the generated metrics and the potential error logs, since they go to different sinks
expensive_metrics_aggregator_router:
type: route
inputs:
- expensive_metrics_aggregator
route:
logs:
type: is_log
metrics:
type: is_metric

sinks:
prometheus:
type: prometheus_exporter
address: 0.0.0.0:8080
inputs:
- vector_metrics
# - expensive_metrics_router._unmatched
- expensive_metrics_aggregator_router.metrics

expensive_metrics_prometheus:
type: prometheus_exporter
address: 0.0.0.0:8090
inputs:
- expensive_metrics_router.received_bytes
- expensive_metrics_router.received_events

expensive_metrics_aggregator_logger:
type: console
encoding:
codec: text
inputs:
- expensive_metrics_aggregator_router.logs
Loading