From 60d221ea8d15e3d382baac6aaa5d2a482426159a Mon Sep 17 00:00:00 2001 From: Hector Huertas Date: Thu, 5 Dec 2024 11:17:53 +0100 Subject: [PATCH 01/12] Aggregate vector metrics that expose a pod_name label --- vector/node/namespaced/kustomization.yaml | 1 + vector/node/namespaced/resources/metrics.lua | 137 ++++++++++++++++++ vector/node/namespaced/resources/metrics.yaml | 48 +++++- 3 files changed, 185 insertions(+), 1 deletion(-) create mode 100644 vector/node/namespaced/resources/metrics.lua diff --git a/vector/node/namespaced/kustomization.yaml b/vector/node/namespaced/kustomization.yaml index 1e93dd34..ffd2c4a4 100644 --- a/vector/node/namespaced/kustomization.yaml +++ b/vector/node/namespaced/kustomization.yaml @@ -11,3 +11,4 @@ configMapGenerator: - metrics.yaml=resources/metrics.yaml - pods.yaml=resources/pods.yaml - systemd.yaml=resources/systemd.yaml + - resources/metrics.lua diff --git a/vector/node/namespaced/resources/metrics.lua b/vector/node/namespaced/resources/metrics.lua new file mode 100644 index 00000000..5492971d --- /dev/null +++ b/vector/node/namespaced/resources/metrics.lua @@ -0,0 +1,137 @@ +function init() + -- Initialize the global Data table + Data = { + inactive = { + component_received_events_total = 0, + component_received_event_bytes_total = 0, + }, + active = { + component_received_events_total = {}, + component_received_event_bytes_total = {}, + }, + } +end + +function on_event(event, emit) + if not pcall(upsert_metric, event) then + emit(generate_log("ERROR on upsert_metric", event)) + error() -- delegates on vector generating and increasing the error metric + end +end + +function on_timer(emit) + if not pcall(emit_metrics, emit) then + emit(generate_log("ERROR on emit_metrics", Data)) + error() -- delegates on vector generating and increasing the error metric + end +end + +function upsert_metric(event) + -- ensure that we don't mess with custom kube sources like "kubernetes_events" + if event.metric.tags.component_id ~= "kubernetes_logs" then + error() + end + + local name = event.metric.name + local ns = event.metric.tags.pod_namespace + local pod = event.metric.tags.pod_name + local value = event.metric.counter.value + + -- ensure that the metric type hasn't changed + if event.metric.kind ~= "absolute" then + error() + end + + Data["active"][name][ns .. "__" .. pod] = value +end + +function emit_metrics(emit) + cleanup_inactive_pods() + emit(generate_metric("component_received_events_total")) + emit(generate_metric("component_received_event_bytes_total")) +end + +function cleanup_inactive_pods() + active = active_pods() + + for metric, pods in pairs(Data.active) do + for pod, value in pairs(pods) do + if not active[pod] then + Data["inactive"][metric] = Data["inactive"][metric] + value + Data["active"][metric][pod] = nil + end + end + end +end + +function active_pods() + local ls_handle = io.popen("ls /var/log/containers") + local containers + if ls_handle then + containers = ls_handle:read("*a") + ls_handle:close() + end + + local pods = {} + for container in string.gmatch(containers, "[^\n]+") do + local pod, ns = string.match(container, "([^_]+)_([^_]+)") + local id = ns .. "__" .. pod + pods[id] = true + end + return pods +end + +function generate_log(message, payload) + local json = '{"timestamp":"' + .. os.date("%Y-%m-%dT%H:%M:%S") + .. '","message":"' + .. message + .. '","payload":' + .. table_to_json(payload) + .. "}" + + return { + log = { + message = json, + timestamp = os.date("!*t"), + }, + } +end + +function generate_metric(name) + local active = 0 + for _, value in pairs(Data["active"][name]) do + active = active + tostring(value) + end + local total = active + Data["inactive"][name] + return { + metric = { + name = name, + namespace = "vector", + tags = { + component_id = "kubernetes_logs", + component_kind = "source", + component_type = "kubernetes_logs", + }, + kind = "absolute", + counter = { + value = total, + }, + timestamp = os.date("!*t"), + }, + } +end + +function table_to_json(t) + local contents = {} + for key, value in pairs(t) do + if type(value) == "table" then + table.insert(contents, '"' .. key .. '"' .. ":" .. table_to_json(value)) + elseif "number" == type(value) then + table.insert(contents, string.format('"%s":%s', key, value)) + elseif "string" == type(value) then + table.insert(contents, string.format('"%s":"%s"', key, value)) + end + end + return "{" .. table.concat(contents, ",") .. "}" +end diff --git a/vector/node/namespaced/resources/metrics.yaml b/vector/node/namespaced/resources/metrics.yaml index f31162f7..52daa7eb 100644 --- a/vector/node/namespaced/resources/metrics.yaml +++ b/vector/node/namespaced/resources/metrics.yaml @@ -1,9 +1,55 @@ sources: vector_metrics: type: internal_metrics + +transforms: + # Reroute the metrics that have pod_name labels, so they can be aggregated before being exported + expensive_metrics_router: + type: route + inputs: + - vector_metrics + route: + received_bytes: .tags.component_type == "kubernetes_logs" && .name == "component_received_event_bytes_total" + received_events: .tags.component_type == "kubernetes_logs" && .name == "component_received_events_total" + + # Agreggate metrics by removing pod_name and pod_namespace labels + expensive_metrics_aggregator: + type: lua + version: "2" + inputs: + - expensive_metrics_router.received_bytes + - expensive_metrics_router.received_events + source: "require('metrics')" # sources the file `metrics.lua` + hooks: + init: init + process: on_event + shutdown: on_timer + timers: + - handler: on_timer + interval_seconds: 5 + + # Split the generated metrics and the potential error logs, since they go to different sinks + expensive_metrics_aggregator_router: + type: route + inputs: + - expensive_metrics_aggregator + route: + logs: + type: is_log + metrics: + type: is_metric + sinks: prometheus: type: prometheus_exporter address: 0.0.0.0:8080 inputs: - - vector_metrics + - expensive_metrics_router._unmatched + - expensive_metrics_aggregator_router.metrics + + expensive_metrics_aggregator_logger: + type: console + encoding: + codec: text + inputs: + - expensive_metrics_aggregator_router.logs From 460c1b9de702842f440820edc915ca9a934abc2f Mon Sep 17 00:00:00 2001 From: Hector Huertas Date: Fri, 13 Dec 2024 08:47:46 +0100 Subject: [PATCH 02/12] Emit increments instead of absolutes --- vector/node/namespaced/resources/metrics.lua | 55 ++++++++------------ 1 file changed, 22 insertions(+), 33 deletions(-) diff --git a/vector/node/namespaced/resources/metrics.lua b/vector/node/namespaced/resources/metrics.lua index 5492971d..8a1923cc 100644 --- a/vector/node/namespaced/resources/metrics.lua +++ b/vector/node/namespaced/resources/metrics.lua @@ -1,32 +1,26 @@ function init() - -- Initialize the global Data table - Data = { - inactive = { - component_received_events_total = 0, - component_received_event_bytes_total = 0, - }, - active = { - component_received_events_total = {}, - component_received_event_bytes_total = {}, - }, + -- Initialize the global LastValue table + LastValue = { + component_received_events_total = {}, + component_received_event_bytes_total = {}, } end function on_event(event, emit) - if not pcall(upsert_metric, event) then - emit(generate_log("ERROR on upsert_metric", event)) + if not pcall(process_event, event, emit) then + emit(generate_log("ERROR on process_event", event)) error() -- delegates on vector generating and increasing the error metric end end function on_timer(emit) - if not pcall(emit_metrics, emit) then - emit(generate_log("ERROR on emit_metrics", Data)) + if not pcall(cleanup_inactive_pods) then + emit(generate_log("ERROR on cleanup_inactive_pods", LastValue)) error() -- delegates on vector generating and increasing the error metric end end -function upsert_metric(event) +function process_event(event, emit) -- ensure that we don't mess with custom kube sources like "kubernetes_events" if event.metric.tags.component_id ~= "kubernetes_logs" then error() @@ -42,23 +36,21 @@ function upsert_metric(event) error() end - Data["active"][name][ns .. "__" .. pod] = value -end + local last_value = LastValue[name][ns .. "__" .. pod] or 0 + local inc = value - last_value -function emit_metrics(emit) - cleanup_inactive_pods() - emit(generate_metric("component_received_events_total")) - emit(generate_metric("component_received_event_bytes_total")) + emit(generate_metric(name, ns, inc)) + + LastValue[name][ns .. "__" .. pod] = value end function cleanup_inactive_pods() active = active_pods() - for metric, pods in pairs(Data.active) do - for pod, value in pairs(pods) do + for metric, pods in pairs(LastValue) do + for pod, _ in pairs(pods) do if not active[pod] then - Data["inactive"][metric] = Data["inactive"][metric] + value - Data["active"][metric][pod] = nil + LastValue[metric][pod] = nil end end end @@ -82,6 +74,7 @@ function active_pods() end function generate_log(message, payload) + payload = payload or {} local json = '{"timestamp":"' .. os.date("%Y-%m-%dT%H:%M:%S") .. '","message":"' @@ -98,12 +91,7 @@ function generate_log(message, payload) } end -function generate_metric(name) - local active = 0 - for _, value in pairs(Data["active"][name]) do - active = active + tostring(value) - end - local total = active + Data["inactive"][name] +function generate_metric(name, namespace, value) return { metric = { name = name, @@ -112,10 +100,11 @@ function generate_metric(name) component_id = "kubernetes_logs", component_kind = "source", component_type = "kubernetes_logs", + pod_namespace = namespace, }, - kind = "absolute", + kind = "incremental", counter = { - value = total, + value = value, }, timestamp = os.date("!*t"), }, From 95a83540c7b6e18ff6baeae6e9ac3819334375e9 Mon Sep 17 00:00:00 2001 From: Hector Huertas Date: Fri, 13 Dec 2024 08:52:17 +0100 Subject: [PATCH 03/12] Remove (broken) metrics expiration --- vector/node/namespaced/resources/global.yaml | 1 - 1 file changed, 1 deletion(-) delete mode 100644 vector/node/namespaced/resources/global.yaml diff --git a/vector/node/namespaced/resources/global.yaml b/vector/node/namespaced/resources/global.yaml deleted file mode 100644 index 316097c9..00000000 --- a/vector/node/namespaced/resources/global.yaml +++ /dev/null @@ -1 +0,0 @@ -expire_metrics_secs: 600 From 1ab2a02a09fa9f445e61c39833f5b4c899ecd9ce Mon Sep 17 00:00:00 2001 From: Hector Huertas Date: Fri, 13 Dec 2024 09:18:04 +0100 Subject: [PATCH 04/12] Run on_timer every minute Because now it only does the cleanup --- vector/node/namespaced/resources/metrics.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vector/node/namespaced/resources/metrics.yaml b/vector/node/namespaced/resources/metrics.yaml index 52daa7eb..41a29412 100644 --- a/vector/node/namespaced/resources/metrics.yaml +++ b/vector/node/namespaced/resources/metrics.yaml @@ -26,7 +26,7 @@ transforms: shutdown: on_timer timers: - handler: on_timer - interval_seconds: 5 + interval_seconds: 60 # Split the generated metrics and the potential error logs, since they go to different sinks expensive_metrics_aggregator_router: From 6e15afb855e2486dc47ab011a5dbb59d49825e9c Mon Sep 17 00:00:00 2001 From: Hector Huertas Date: Fri, 13 Dec 2024 16:05:32 +0100 Subject: [PATCH 05/12] Filter by component_id and skip the check --- vector/node/namespaced/resources/metrics.lua | 5 ----- vector/node/namespaced/resources/metrics.yaml | 4 ++-- 2 files changed, 2 insertions(+), 7 deletions(-) diff --git a/vector/node/namespaced/resources/metrics.lua b/vector/node/namespaced/resources/metrics.lua index 8a1923cc..eb74824c 100644 --- a/vector/node/namespaced/resources/metrics.lua +++ b/vector/node/namespaced/resources/metrics.lua @@ -21,11 +21,6 @@ function on_timer(emit) end function process_event(event, emit) - -- ensure that we don't mess with custom kube sources like "kubernetes_events" - if event.metric.tags.component_id ~= "kubernetes_logs" then - error() - end - local name = event.metric.name local ns = event.metric.tags.pod_namespace local pod = event.metric.tags.pod_name diff --git a/vector/node/namespaced/resources/metrics.yaml b/vector/node/namespaced/resources/metrics.yaml index 41a29412..8d43d9bd 100644 --- a/vector/node/namespaced/resources/metrics.yaml +++ b/vector/node/namespaced/resources/metrics.yaml @@ -9,8 +9,8 @@ transforms: inputs: - vector_metrics route: - received_bytes: .tags.component_type == "kubernetes_logs" && .name == "component_received_event_bytes_total" - received_events: .tags.component_type == "kubernetes_logs" && .name == "component_received_events_total" + received_bytes: .tags.component_id == "kubernetes_logs" && .name == "component_received_event_bytes_total" + received_events: .tags.component_id == "kubernetes_logs" && .name == "component_received_events_total" # Agreggate metrics by removing pod_name and pod_namespace labels expensive_metrics_aggregator: From 416e5751fad36bab550eef27d0ffa1ca25a6f0c4 Mon Sep 17 00:00:00 2001 From: Hector Huertas Date: Mon, 16 Dec 2024 15:51:09 +0100 Subject: [PATCH 06/12] Commit the temporary test changes --- vector/node/namespaced/resources/metrics.lua | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/vector/node/namespaced/resources/metrics.lua b/vector/node/namespaced/resources/metrics.lua index eb74824c..02a123ab 100644 --- a/vector/node/namespaced/resources/metrics.lua +++ b/vector/node/namespaced/resources/metrics.lua @@ -7,6 +7,8 @@ function init() end function on_event(event, emit) + --TODO: remove + emit(event) if not pcall(process_event, event, emit) then emit(generate_log("ERROR on process_event", event)) error() -- delegates on vector generating and increasing the error metric @@ -90,7 +92,8 @@ function generate_metric(name, namespace, value) return { metric = { name = name, - namespace = "vector", + --TODO: change to vector + namespace = "hh", tags = { component_id = "kubernetes_logs", component_kind = "source", From 068677e4b7fd86c6d7c83bcb5b472463613ad1da Mon Sep 17 00:00:00 2001 From: Ashok Siyani Date: Mon, 23 Dec 2024 11:37:36 +0000 Subject: [PATCH 07/12] Revert "Remove (broken) metrics expiration" This reverts commit 95a83540c7b6e18ff6baeae6e9ac3819334375e9. --- vector/node/namespaced/resources/global.yaml | 1 + 1 file changed, 1 insertion(+) create mode 100644 vector/node/namespaced/resources/global.yaml diff --git a/vector/node/namespaced/resources/global.yaml b/vector/node/namespaced/resources/global.yaml new file mode 100644 index 00000000..316097c9 --- /dev/null +++ b/vector/node/namespaced/resources/global.yaml @@ -0,0 +1 @@ +expire_metrics_secs: 600 From a7457c6c9ebd8ea79d3afb07f8ac35a3d1269550 Mon Sep 17 00:00:00 2001 From: Ashok Siyani Date: Mon, 23 Dec 2024 11:44:21 +0000 Subject: [PATCH 08/12] fix global exp config --- vector/node/namespaced/kustomization.yaml | 1 + 1 file changed, 1 insertion(+) diff --git a/vector/node/namespaced/kustomization.yaml b/vector/node/namespaced/kustomization.yaml index ffd2c4a4..21505d87 100644 --- a/vector/node/namespaced/kustomization.yaml +++ b/vector/node/namespaced/kustomization.yaml @@ -12,3 +12,4 @@ configMapGenerator: - pods.yaml=resources/pods.yaml - systemd.yaml=resources/systemd.yaml - resources/metrics.lua + - resources/global.yaml From 030204e02f2d8de8787dd32d354dca0a5fa2543f Mon Sep 17 00:00:00 2001 From: Ashok Siyani Date: Mon, 23 Dec 2024 11:44:36 +0000 Subject: [PATCH 09/12] add debug log --- vector/node/namespaced/resources/metrics.lua | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/vector/node/namespaced/resources/metrics.lua b/vector/node/namespaced/resources/metrics.lua index 02a123ab..8e22bc5e 100644 --- a/vector/node/namespaced/resources/metrics.lua +++ b/vector/node/namespaced/resources/metrics.lua @@ -33,6 +33,10 @@ function process_event(event, emit) error() end + if ns == "labs" then + emit(generate_log("ERROR received labs event", event)) + end + local last_value = LastValue[name][ns .. "__" .. pod] or 0 local inc = value - last_value From 1e5ca217b0e5e74e5845f72e0aaa2e83bc03fb9c Mon Sep 17 00:00:00 2001 From: Ashok Siyani Date: Mon, 23 Dec 2024 12:15:43 +0000 Subject: [PATCH 10/12] set corret value --- vector/node/namespaced/resources/metrics.yaml | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/vector/node/namespaced/resources/metrics.yaml b/vector/node/namespaced/resources/metrics.yaml index 8d43d9bd..acbe8d45 100644 --- a/vector/node/namespaced/resources/metrics.yaml +++ b/vector/node/namespaced/resources/metrics.yaml @@ -23,10 +23,12 @@ transforms: hooks: init: init process: on_event - shutdown: on_timer timers: - handler: on_timer - interval_seconds: 60 + # on_timer func cleans up inactive pod's metrics + # since vector by default keeps these metrics, global config flag `expire_metrics_secs` must be set + # and 'on_timer' interval should be higher then `expire_metrics_secs` + interval_seconds: 900 # Split the generated metrics and the potential error logs, since they go to different sinks expensive_metrics_aggregator_router: From 6cc404252a42d87d494cce01f520b8092598019e Mon Sep 17 00:00:00 2001 From: Ashok Siyani Date: Mon, 23 Dec 2024 13:55:29 +0000 Subject: [PATCH 11/12] expire metrics based on last udpated time --- vector/node/namespaced/resources/metrics.lua | 54 +++++++++++++++---- vector/node/namespaced/resources/metrics.yaml | 5 +- 2 files changed, 44 insertions(+), 15 deletions(-) diff --git a/vector/node/namespaced/resources/metrics.lua b/vector/node/namespaced/resources/metrics.lua index 8e22bc5e..5f904a77 100644 --- a/vector/node/namespaced/resources/metrics.lua +++ b/vector/node/namespaced/resources/metrics.lua @@ -4,6 +4,10 @@ function init() component_received_events_total = {}, component_received_event_bytes_total = {}, } + -- since vector by default keeps these metrics, global config flag ` + -- expire_metrics_secs` must be set + -- this interval should be higher then `expire_metrics_secs` + ExpireMetricSecs = 900 end function on_event(event, emit) @@ -16,33 +20,61 @@ function on_event(event, emit) end function on_timer(emit) - if not pcall(cleanup_inactive_pods) then - emit(generate_log("ERROR on cleanup_inactive_pods", LastValue)) + if not pcall(cleanup_inactive_metrics) then + emit(generate_log("ERROR on cleanup_inactive_metrics", LastValue)) error() -- delegates on vector generating and increasing the error metric end end function process_event(event, emit) - local name = event.metric.name - local ns = event.metric.tags.pod_namespace - local pod = event.metric.tags.pod_name - local value = event.metric.counter.value - -- ensure that the metric type hasn't changed if event.metric.kind ~= "absolute" then error() end + + local name = event.metric.name + local ns = event.metric.tags.pod_namespace + local pod = event.metric.tags.pod_name + local newValue = event.metric.counter.value + local key = ns .. "__" .. pod + if ns == "labs" then emit(generate_log("ERROR received labs event", event)) end - local last_value = LastValue[name][ns .. "__" .. pod] or 0 - local inc = value - last_value + if LastValue[name][key] == nil then + LastValue[name][key] = { value = 0, updatedAt = os.time() } + end + + local inc = newValue - LastValue[name][key].value - emit(generate_metric(name, ns, inc)) + if inc > 0 then + emit(generate_metric(name, ns, inc)) + elseif inc < 0 then + emit(generate_log("ERROR negative inc value inc:" .. inc .. ", oldValue:" .. LastValue[name][key].value, event)) + end + + -- since vector by default persists inactive metrics, global config flag + -- `expire_metrics_secs` must be set to expire stale metrics. + -- since vector will remove these metrics based on last updated time + -- script needs to maintain its own timestamp for clean up + if LastValue[name][key].value ~= newValue then + LastValue[name][key].value = newValue + LastValue[name][key].updatedAt = os.time() + end +end + +function cleanup_inactive_metrics() + local currentTime = os.time() - LastValue[name][ns .. "__" .. pod] = value + for metric, pods in pairs(LastValue) do + for pod, _ in pairs(pods) do + if (currentTime - pod.updatedAt) > ExpireMetricSecs then + LastValue[metric][pod] = nil + end + end + end end function cleanup_inactive_pods() diff --git a/vector/node/namespaced/resources/metrics.yaml b/vector/node/namespaced/resources/metrics.yaml index acbe8d45..18d53546 100644 --- a/vector/node/namespaced/resources/metrics.yaml +++ b/vector/node/namespaced/resources/metrics.yaml @@ -25,10 +25,7 @@ transforms: process: on_event timers: - handler: on_timer - # on_timer func cleans up inactive pod's metrics - # since vector by default keeps these metrics, global config flag `expire_metrics_secs` must be set - # and 'on_timer' interval should be higher then `expire_metrics_secs` - interval_seconds: 900 + interval_seconds: 300 # Split the generated metrics and the potential error logs, since they go to different sinks expensive_metrics_aggregator_router: From 030308eee74a69f5e8e520b7b04d8546726bf601 Mon Sep 17 00:00:00 2001 From: Ashok Siyani Date: Fri, 27 Dec 2024 12:54:07 +0000 Subject: [PATCH 12/12] create prom exporter for exp metrics --- vector/node/namespaced/daemonset.yaml | 6 +- vector/node/namespaced/resources/metrics.lua | 77 ++++++++----------- vector/node/namespaced/resources/metrics.yaml | 12 ++- 3 files changed, 47 insertions(+), 48 deletions(-) diff --git a/vector/node/namespaced/daemonset.yaml b/vector/node/namespaced/daemonset.yaml index 8e5e665b..a589b546 100644 --- a/vector/node/namespaced/daemonset.yaml +++ b/vector/node/namespaced/daemonset.yaml @@ -17,8 +17,6 @@ spec: app.kubernetes.io/instance: vector-node annotations: prometheus.io/scrape: "true" - prometheus.io/path: /metrics - prometheus.io/port: "8080" spec: tolerations: - key: node-role.kubernetes.io/master @@ -56,6 +54,10 @@ spec: - name: var-log mountPath: /var/log readOnly: true + ports: + - name: metrics + containerPort: 8080 + protocol: TCP resources: requests: cpu: 0m diff --git a/vector/node/namespaced/resources/metrics.lua b/vector/node/namespaced/resources/metrics.lua index 5f904a77..85922861 100644 --- a/vector/node/namespaced/resources/metrics.lua +++ b/vector/node/namespaced/resources/metrics.lua @@ -7,21 +7,24 @@ function init() -- since vector by default keeps these metrics, global config flag ` -- expire_metrics_secs` must be set -- this interval should be higher then `expire_metrics_secs` - ExpireMetricSecs = 900 + ExpireMetricSecs = 600 end function on_event(event, emit) --TODO: remove - emit(event) - if not pcall(process_event, event, emit) then - emit(generate_log("ERROR on process_event", event)) + -- emit(event) + + local status, err = pcall(process_event, event, emit) + if not status then + emit(generate_log("ERROR on process_event" .. err, event)) error() -- delegates on vector generating and increasing the error metric end end function on_timer(emit) - if not pcall(cleanup_inactive_metrics) then - emit(generate_log("ERROR on cleanup_inactive_metrics", LastValue)) + local status, err = pcall(cleanup_inactive_metrics, emit) + if not status then + emit(generate_log("ERROR on cleanup_inactive_metrics" .. err, LastValue)) error() -- delegates on vector generating and increasing the error metric end end @@ -29,18 +32,24 @@ end function process_event(event, emit) -- ensure that the metric type hasn't changed if event.metric.kind ~= "absolute" then + emit(generate_log("ERROR only absolute events can be aggregated", event)) error() end - local name = event.metric.name - local ns = event.metric.tags.pod_namespace - local pod = event.metric.tags.pod_name + local ns = event.metric.tags.pod_namespace or "" + local pod = event.metric.tags.pod_name or "" local newValue = event.metric.counter.value local key = ns .. "__" .. pod - if ns == "labs" then - emit(generate_log("ERROR received labs event", event)) + if ns == "" then + emit(generate_log("ERROR empty namespace not allowed", event)) + error() + end + + if pod == "" then + emit(generate_log("ERROR empty pod name not allowed", event)) + error() end if LastValue[name][key] == nil then @@ -52,7 +61,12 @@ function process_event(event, emit) if inc > 0 then emit(generate_metric(name, ns, inc)) elseif inc < 0 then - emit(generate_log("ERROR negative inc value inc:" .. inc .. ", oldValue:" .. LastValue[name][key].value, event)) + emit(generate_log("ERROR adjusting negative diff inc:" .. inc .. ", old:" .. table_to_json(LastValue[name][key]), + event)) + -- since metrics are counters if new value is < old value then we can + -- assume metrics has been expired on vector end. + -- hence we can take newValue as "new" initial value + emit(generate_metric(name, ns, newValue)) end -- since vector by default persists inactive metrics, global config flag @@ -65,52 +79,23 @@ function process_event(event, emit) end end -function cleanup_inactive_metrics() +function cleanup_inactive_metrics(emit) local currentTime = os.time() for metric, pods in pairs(LastValue) do for pod, _ in pairs(pods) do - if (currentTime - pod.updatedAt) > ExpireMetricSecs then - LastValue[metric][pod] = nil - end - end - end -end - -function cleanup_inactive_pods() - active = active_pods() - - for metric, pods in pairs(LastValue) do - for pod, _ in pairs(pods) do - if not active[pod] then + if (currentTime - LastValue[metric][pod].updatedAt) > ExpireMetricSecs then LastValue[metric][pod] = nil end end end end -function active_pods() - local ls_handle = io.popen("ls /var/log/containers") - local containers - if ls_handle then - containers = ls_handle:read("*a") - ls_handle:close() - end - - local pods = {} - for container in string.gmatch(containers, "[^\n]+") do - local pod, ns = string.match(container, "([^_]+)_([^_]+)") - local id = ns .. "__" .. pod - pods[id] = true - end - return pods -end - function generate_log(message, payload) payload = payload or {} local json = '{"timestamp":"' .. os.date("%Y-%m-%dT%H:%M:%S") - .. '","message":"' + .. '","message":" [metrics.lua] ' .. message .. '","payload":' .. table_to_json(payload) @@ -146,6 +131,10 @@ function generate_metric(name, namespace, value) end function table_to_json(t) + if t == nil then + return "null" + end + local contents = {} for key, value in pairs(t) do if type(value) == "table" then diff --git a/vector/node/namespaced/resources/metrics.yaml b/vector/node/namespaced/resources/metrics.yaml index 18d53546..9932cb22 100644 --- a/vector/node/namespaced/resources/metrics.yaml +++ b/vector/node/namespaced/resources/metrics.yaml @@ -25,7 +25,7 @@ transforms: process: on_event timers: - handler: on_timer - interval_seconds: 300 + interval_seconds: 30 # Split the generated metrics and the potential error logs, since they go to different sinks expensive_metrics_aggregator_router: @@ -43,9 +43,17 @@ sinks: type: prometheus_exporter address: 0.0.0.0:8080 inputs: - - expensive_metrics_router._unmatched + - vector_metrics + # - expensive_metrics_router._unmatched - expensive_metrics_aggregator_router.metrics + expensive_metrics_prometheus: + type: prometheus_exporter + address: 0.0.0.0:8090 + inputs: + - expensive_metrics_router.received_bytes + - expensive_metrics_router.received_events + expensive_metrics_aggregator_logger: type: console encoding: