From d241e684628a64701e4f8da65cda424bb571c1b5 Mon Sep 17 00:00:00 2001 From: Caio Ramos Casimiro Date: Mon, 16 Sep 2024 16:50:44 +0100 Subject: [PATCH] [WIP] feat(prometheus) wasmx metrics --- .requirements | 2 +- .../kong/prometheus-wasmx-metrics.yml | 3 + kong-3.9.0-0.rockspec | 1 + kong/plugins/prometheus/exporter.lua | 8 +- kong/plugins/prometheus/wasmx.lua | 187 +++++++++++++++++ kong/runloop/wasm.lua | 17 ++ .../26-prometheus/09-wasmx_spec.lua | 193 ++++++++++++++++++ .../response_transformer/src/filter.rs | 86 +++++++- .../proxy_wasm_filters/tests/src/filter.rs | 40 +++- 9 files changed, 532 insertions(+), 5 deletions(-) create mode 100644 changelog/unreleased/kong/prometheus-wasmx-metrics.yml create mode 100644 kong/plugins/prometheus/wasmx.lua create mode 100644 spec/03-plugins/26-prometheus/09-wasmx_spec.lua diff --git a/.requirements b/.requirements index a68a68eca89f..a3bcfe440eb4 100644 --- a/.requirements +++ b/.requirements @@ -22,7 +22,7 @@ ATC_ROUTER=ffd11db657115769bf94f0c4f915f98300bc26b6 # 1.6.2 SNAPPY=23b3286820105438c5dbb9bc22f1bb85c5812c8a # 1.2.0 KONG_MANAGER=nightly -NGX_WASM_MODULE=96b4e27e10c63b07ed40ea88a91c22f23981db35 +NGX_WASM_MODULE=55f0dc2325545e6f571d4dc491874771043895fa WASMER=3.1.1 WASMTIME=23.0.2 V8=12.0.267.17 diff --git a/changelog/unreleased/kong/prometheus-wasmx-metrics.yml b/changelog/unreleased/kong/prometheus-wasmx-metrics.yml new file mode 100644 index 000000000000..422fdb16d535 --- /dev/null +++ b/changelog/unreleased/kong/prometheus-wasmx-metrics.yml @@ -0,0 +1,3 @@ +message: Expose WasmX metrics as part of the Prometheus plugin +type: feature +scope: Plugin diff --git a/kong-3.9.0-0.rockspec b/kong-3.9.0-0.rockspec index cc4a204fee05..24552d39e24f 100644 --- a/kong-3.9.0-0.rockspec +++ b/kong-3.9.0-0.rockspec @@ -532,6 +532,7 @@ build = { ["kong.plugins.prometheus.prometheus"] = "kong/plugins/prometheus/prometheus.lua", ["kong.plugins.prometheus.serve"] = "kong/plugins/prometheus/serve.lua", ["kong.plugins.prometheus.schema"] = "kong/plugins/prometheus/schema.lua", + ["kong.plugins.prometheus.wasmx"] = "kong/plugins/prometheus/wasmx.lua", ["kong.plugins.session.handler"] = "kong/plugins/session/handler.lua", ["kong.plugins.session.schema"] = "kong/plugins/session/schema.lua", diff --git a/kong/plugins/prometheus/exporter.lua b/kong/plugins/prometheus/exporter.lua index bdc5eeafcbce..33486c949e54 100644 --- a/kong/plugins/prometheus/exporter.lua +++ b/kong/plugins/prometheus/exporter.lua @@ -1,11 +1,14 @@ +local balancer = require "kong.runloop.balancer" +local yield = require("kong.tools.yield").yield +local wasm = require "kong.plugins.prometheus.wasmx" + + local kong = kong local ngx = ngx local get_phase = ngx.get_phase local lower = string.lower local ngx_timer_pending_count = ngx.timer.pending_count local ngx_timer_running_count = ngx.timer.running_count -local balancer = require("kong.runloop.balancer") -local yield = require("kong.tools.yield").yield local get_all_upstreams = balancer.get_all_upstreams if not balancer.get_all_upstreams then -- API changed since after Kong 2.5 get_all_upstreams = require("kong.runloop.balancer.upstreams").get_all_upstreams @@ -517,6 +520,7 @@ local function metric_data(write_fn) -- notify the function if prometheus plugin is enabled, -- so that it can avoid exporting unnecessary metrics if not prometheus:metric_data(write_fn, not IS_PROMETHEUS_ENABLED) + wasm.metric_data() end local function collect() diff --git a/kong/plugins/prometheus/wasmx.lua b/kong/plugins/prometheus/wasmx.lua new file mode 100644 index 000000000000..02e79786be90 --- /dev/null +++ b/kong/plugins/prometheus/wasmx.lua @@ -0,0 +1,187 @@ +local buffer = require "string.buffer" +local wasm = require "kong.runloop.wasm" +local wasmx_shm = require "resty.wasmx.shm" + + +local fmt = string.format +local str_find = string.find +local str_match = string.match +local str_sub = string.sub +local table_insert = table.insert +local table_sort = table.sort +local buf_new = buffer.new +local ngx_say = ngx.say + + +local _M = {} + + +local function sorted_iter(ctx) + local v = ctx.t[ctx.keys[ctx.i]] + ctx.i = ctx.i + 1 + + return v +end + + +local function sorted_pairs(t) + local sorted_keys = {} + + for k, _ in pairs(t) do + table_insert(sorted_keys, k) + end + + table_sort(sorted_keys) + + return sorted_iter, { t = t, keys = sorted_keys, i = 1 } +end + + +local function parse_pw_key(key) + local name = key + local labels = {} + local header_size = 3 -- pw. + local first_match = #key + + local second_dot_pos, _ = str_find(key, "%.", header_size + 1) + local filter_name = str_sub(key, header_size + 1, second_dot_pos - 1) + + local filter_config = wasm.filters_by_name[filter_name].config or {} + local patterns = filter_config.pw_metrics + and filter_config.pw_metrics.label_patterns or {} + + for _, pair in ipairs(patterns) do + local lkv, lv = str_match(key, pair.pattern) + if lkv then + local lk = str_sub(lkv, 0, str_find(lkv, "=")) + local lk_start, _ = str_find(key, lk) + + first_match = (lk_start < first_match) and lk_start or first_match + + table_insert(labels, { pair.label, lv }) + end + end + + if first_match ~= #key then + name = str_sub(key, 0, first_match - 1) + end + + return name, labels +end + + +local function parse_key(key) + local header = { pw = "pw." } + + local name = key + local labels = {} + + local is_pw = #key > #header.pw and key:sub(0, #header.pw) == header.pw + + if is_pw then + name, labels = parse_pw_key(key) + end + + name = name:gsub("%.", "_") + + return name, labels +end + + +local function serialize_labels(labels) + local buf = buf_new() + + for _, pair in ipairs(labels) do + buf:put(fmt('%s="%s",', pair[1], pair[2])) + end + + local slabels = buf:get() + + if #slabels > 0 then + return slabels:sub(0, #slabels - 1) -- discard trailing comma + end + + return slabels +end + + +local function serialize_metric(m) + local buf = buf_new() + + buf:put(fmt("# HELP %s\n# TYPE %s %s", m.name, m.name, m.type)) + + for _, pair in ipairs(m.labels) do + local labels = pair[1] + local labeled_m = pair[2] + local slabels = serialize_labels(labels) + + if m.type == "counter" or m.type == "gauge" then + if #slabels > 0 then + buf:put(fmt("\n%s{%s} %s", m.name, slabels, labeled_m.value)) + else + buf:put(fmt("\n%s %s", m.name, labeled_m.value)) + end + + elseif m.type == "histogram" then + local c = 0 + + for _, bin in ipairs(labeled_m.value) do + local ub = (bin.ub ~= 4294967295) and bin.ub or "+Inf" + local ubl = fmt('le="%s"', ub) + local llabels = (#slabels > 0) and (slabels .. "," .. ubl) or ubl + + c = c + bin.count + + buf:put(fmt("\n%s{%s} %s", m.name, llabels, c)) + end + end + end + + buf:put("\n") + + return buf:get() +end + + +_M.metric_data = function() + local i = 0 + local flush_after = 50 + local metrics = {} + local parsed = {} + local buf = buf_new() + + wasmx_shm.metrics:lock() + + for key in wasmx_shm.metrics:iterate_keys() do + table_insert(metrics, { key, wasmx_shm.metrics:get_by_name(key, { prefix = false })}) + end + + wasmx_shm.metrics:unlock() + + -- in wasmx the different labels of a metric are stored as separate metrics + -- aggregate those separate metrics into a single one + for _, pair in ipairs(metrics) do + local key = pair[1] + local m = pair[2] + local name, labels = parse_key(key) + + parsed[name] = parsed[name] or { name = name, type = m.type, labels = {} } + + table_insert(parsed[name].labels, { labels, m }) + end + + for metric_by_label in sorted_pairs(parsed) do + buf:put(serialize_metric(metric_by_label)) + + i = i + 1 + + if i % flush_after == 0 then + ngx_say(buf:get()) + end + end + + ngx_say(buf:get()) +end + + +return _M diff --git a/kong/runloop/wasm.lua b/kong/runloop/wasm.lua index 5833660c6297..74becc6d91a6 100644 --- a/kong/runloop/wasm.lua +++ b/kong/runloop/wasm.lua @@ -402,6 +402,8 @@ local function rebuild_state(db, version, old_state) for _, filter in ipairs(chain.filters) do if filter.enabled then + _M.filters_by_name[filter.name].config = cjson_decode(filter.config) or filter.config + -- Serialize all JSON configurations up front -- -- NOTE: there is a subtle difference between a raw, non-JSON filter @@ -415,6 +417,7 @@ local function rebuild_state(db, version, old_state) if filter.config ~= nil and type(filter.config) ~= "string" then filter.config = cjson_encode(filter.config) end + end end @@ -778,6 +781,13 @@ local function register_property_handlers() return ok, value, const end) + properties.add_getter("kong.route_name", function(_, _, ctx) + local value = ctx.route and ctx.route.name + local ok = value ~= nil + local const = ok + return ok, value, const + end) + properties.add_getter("kong.service.response.status", function(kong) return true, kong.service.response.get_status(), false end) @@ -789,6 +799,13 @@ local function register_property_handlers() return ok, value, const end) + properties.add_getter("kong.service_name", function(_, _, ctx) + local value = ctx.service and ctx.service.name + local ok = value ~= nil + local const = ok + return ok, value, const + end) + properties.add_getter("kong.version", function(kong) return true, kong.version, true end) diff --git a/spec/03-plugins/26-prometheus/09-wasmx_spec.lua b/spec/03-plugins/26-prometheus/09-wasmx_spec.lua new file mode 100644 index 000000000000..d9c28491e440 --- /dev/null +++ b/spec/03-plugins/26-prometheus/09-wasmx_spec.lua @@ -0,0 +1,193 @@ +local helpers = require "spec.helpers" +local cjson = require "cjson" + + +local json_encode = cjson.encode + + +local status_api_port = helpers.get_available_port() +local fixtures = { + dns_mock = helpers.dns_mock.new({ + mocks_only = true + }), + http_mock = {}, + stream_mock = {} +} + +fixtures.dns_mock:A({ + name = "mock.io", + address = "127.0.0.1" +}) + +fixtures.dns_mock:A({ + name = "status.io", + address = "127.0.0.1" +}) + +local rt_config = json_encode({ + append = { + headers = { + "X-Added-Header: true", + }, + }, + pw_metrics = { + label_patterns = { + { label = "service", pattern = "(_s_id=([0-9a-z%-]+))" }, + { label = "route", pattern = "(_r_id=([0-9a-z%-]+))" }, + } + } +}) + + +for _, strategy in helpers.each_strategy() do + describe("Plugin: prometheus (metrics) [#" .. strategy .. "]", function() + local admin_client + local proxy_client + + setup(function() + require("kong.runloop.wasm").enable({ + { name = "tests", + path = helpers.test_conf.wasm_filters_path .. "/tests.wasm", + }, + { name = "response_transformer", + path = helpers.test_conf.wasm_filters_path .. "/response_transformer.wasm", + }, + }) + + local bp = helpers.get_db_utils(strategy, { + "services", + "routes", + "plugins", + "filter_chains", + }) + + local function service_and_route(name, path) + local service = assert(bp.services:insert({ + name = name, + url = helpers.mock_upstream_url, + })) + + local route = assert(bp.routes:insert({ + name = name .. "-route", + service = { id = service.id }, + paths = { path }, + hosts = { name }, + protocols = { "https" }, + })) + + return service, route + end + + local service, _ = service_and_route("mock", "/") + local service2, _ = service_and_route("mock2", "/v2") + service_and_route("status.io", "/metrics") + + local filters = { + { name = "tests", enabled = true, config = "metrics=c1,g1,h1" }, + { name = "response_transformer", enabled = true, config = rt_config }, + } + + assert(bp.filter_chains:insert({ + service = { id = service.id }, + filters = filters, + })) + + assert(bp.filter_chains:insert({ + service = { id = service2.id }, + filters = filters, + })) + + bp.plugins:insert({ + name = "prometheus", + config = { + status_code_metrics = true, + latency_metrics = true, + bandwidth_metrics = true, + upstream_health_metrics = true, + }, + }) + + assert(helpers.start_kong({ + nginx_conf = "spec/fixtures/custom_nginx.template", + wasm = true, + plugins = "bundled,prometheus", + status_listen = '127.0.0.1:' .. status_api_port .. ' ssl', -- status api does not support h2 + status_access_log = "logs/status_access.log", + status_error_log = "logs/status_error.log" + }, nil, nil, fixtures)) + end) + + teardown(function() + if admin_client then + admin_client:close() + end + if proxy_client then + proxy_client:close() + end + + helpers.stop_kong() + end) + + before_each(function() + admin_client = helpers.admin_client() + proxy_client = helpers.proxy_ssl_client() + end) + + after_each(function() + if admin_client then + admin_client:close() + end + if proxy_client then + proxy_client:close() + end + end) + + it("expose WasmX metrics by admin API #a1.1", function() + local res = proxy_client:get("/", { + headers = { host = "mock" }, + }) + assert.res_status(200, res) + + res = proxy_client:get("/v2", { + headers = { host = "mock2" }, + }) + assert.res_status(200, res) + + res = assert(admin_client:send{ + method = "GET", + path = "/metrics" + }) + local body = assert.res_status(200, res) + + local expected_c = '# HELP pw_tests_c1\n' + .. '# TYPE pw_tests_c1 counter\n' + .. 'pw_tests_c1 0' + + local expected_g = '# HELP pw_tests_g1\n' + .. '# TYPE pw_tests_g1 gauge\n' + .. 'pw_tests_g1 0' + + local expected_h = '# HELP pw_tests_h1\n' + .. '# TYPE pw_tests_h1 histogram\n' + .. 'pw_tests_h1{le="+Inf"} 0' + + local expected_labeled = '# HELP pw_response_transformer_append\n' + .. '# TYPE pw_response_transformer_append counter\n' + .. 'pw_response_transformer_append{service="mock",route="mock-route"} 1\n' + .. 'pw_response_transformer_append{service="mock2",route="mock2-route"} 1' + + local expected_labeled_histogram = '# HELP pw_response_transformer_processing_time\n' + .. '# TYPE pw_response_transformer_processing_time histogram\n' + .. 'pw_response_transformer_processing_time{service="mock",route="mock-route",le="1"} 1\n' + .. 'pw_response_transformer_processing_time{service="mock",route="mock-route",le="+Inf"} 1\n' + .. 'pw_response_transformer_processing_time{service="mock2",route="mock2-route",le="1"} 1\n' + .. 'pw_response_transformer_processing_time{service="mock2",route="mock2-route",le="+Inf"} 1' + + assert.matches(expected_c, body, nil, true) + assert.matches(expected_g, body, nil, true) + assert.matches(expected_h, body, nil, true) + assert.matches(expected_labeled, body, nil, true) + assert.matches(expected_labeled_histogram, body, nil, true) + end) + end) +end diff --git a/spec/fixtures/proxy_wasm_filters/response_transformer/src/filter.rs b/spec/fixtures/proxy_wasm_filters/response_transformer/src/filter.rs index fb23189b3ee2..b12c6553a503 100644 --- a/spec/fixtures/proxy_wasm_filters/response_transformer/src/filter.rs +++ b/spec/fixtures/proxy_wasm_filters/response_transformer/src/filter.rs @@ -1,7 +1,10 @@ mod types; +use std::collections::HashMap; +use std::cell::RefCell; +use proxy_wasm::hostcalls::{define_metric, increment_metric, record_metric, get_current_time}; use proxy_wasm::traits::{Context, RootContext, HttpContext}; -use proxy_wasm::types::{Action, LogLevel, ContextType}; +use proxy_wasm::types::{Action, LogLevel, ContextType, MetricType}; use crate::types::*; use serde_json; use log::*; @@ -13,12 +16,75 @@ proxy_wasm::main! {{ }); }} +thread_local! { + static METRICS: Metrics = Metrics::new(); +} + +struct Metrics { + metrics: RefCell>, +} + +impl Metrics { + fn new() -> Metrics { + Metrics { + metrics: RefCell::new(HashMap::new()), + } + } + + fn get_counter(&self, name: &str, s_id: &str, r_id: &str) ->u32 { + self.get_metric(MetricType::Counter, name, s_id, r_id) + } + + fn get_histogram(&self, name: &str, s_id: &str, r_id: &str) ->u32 { + self.get_metric(MetricType::Histogram, name, s_id, r_id) + } + + fn get_metric(&self, metric_type: MetricType, name: &str, s_id: &str, r_id: &str) -> u32 { + let key = format!("{}_s_id={}_r_id={}", name, s_id, r_id); + let mut map = self.metrics.borrow_mut(); + + match map.get(&key) { + Some(m_id) => *m_id, + None => { + match define_metric(metric_type, &key) { + Ok(m_id) => { + map.insert(key, m_id); + + m_id + }, + Err(_) => 0 + } + } + } + } +} + struct ResponseTransformerContext { config: Config, } impl ResponseTransformerContext { + fn get_prop(&self, ns: &str, prop: &str) -> String { + if let Some(addr) = self.get_property(vec![ns, prop]) { + match std::str::from_utf8(&addr) { + Ok(value) => value.to_string(), + Err(_) => "".to_string(), + } + } else { + "".to_string() + } + } + + fn increment_metric(&self, name: &str, s_id: &str, r_id: &str) { + let m_id = METRICS.with(|metrics| metrics.get_counter(name, s_id, r_id)); + increment_metric(m_id, 1).unwrap(); + } + + fn record_histogram(&self, name: &str, s_id: &str, r_id: &str, value: u64) { + let m_id = METRICS.with(|metrics| metrics.get_histogram(name, s_id, r_id)); + record_metric(m_id, value).unwrap(); + } } impl RootContext for ResponseTransformerContext { @@ -55,9 +121,15 @@ impl Context for ResponseTransformerContext { impl HttpContext for ResponseTransformerContext { fn on_http_response_headers(&mut self, _num_headers: usize, _end_of_stream: bool) -> Action { + let s_id = self.get_prop("kong", "service_name"); + let r_id = self.get_prop("kong", "route_name"); + let t0 = get_current_time().unwrap(); + self.config.remove.headers.iter().for_each(|name| { info!("[response-transformer] removing header: {}", name); self.set_http_response_header(&name, None); + + self.increment_metric("remove", &s_id, &r_id); }); self.config.rename.headers.iter().for_each(|KeyValuePair(from, to)| { @@ -65,12 +137,15 @@ impl HttpContext for ResponseTransformerContext { let value = self.get_http_response_header(&from); self.set_http_response_header(&from, None); self.set_http_response_header(&to, value.as_deref()); + + self.increment_metric("rename", &s_id, &r_id); }); self.config.replace.headers.iter().for_each(|KeyValuePair(name, value)| { if self.get_http_response_header(&name).is_some() { info!("[response-transformer] updating header {} value to {}", name, value); self.set_http_response_header(&name, Some(&value)); + self.increment_metric("replace", &s_id, &r_id); } }); @@ -78,14 +153,23 @@ impl HttpContext for ResponseTransformerContext { if self.get_http_response_header(&name).is_none() { info!("[response-transformer] adding header {} => {}", name, value); self.set_http_response_header(&name, Some(&value)); + + self.increment_metric("add", &s_id, &r_id); } }); self.config.append.headers.iter().for_each(|KeyValuePair(name, value)| { info!("[response-transformer] appending header {} => {}", name, value); self.add_http_response_header(&name, &value); + + self.increment_metric("append", &s_id, &r_id); }); + let t1 = get_current_time().unwrap(); + let diff = t1.duration_since(t0).unwrap().as_millis(); + + self.record_histogram("processing_time", &s_id, &r_id, diff as u64); + Action::Continue } diff --git a/spec/fixtures/proxy_wasm_filters/tests/src/filter.rs b/spec/fixtures/proxy_wasm_filters/tests/src/filter.rs index 9251987e6966..f9abc5eeb8b4 100644 --- a/spec/fixtures/proxy_wasm_filters/tests/src/filter.rs +++ b/spec/fixtures/proxy_wasm_filters/tests/src/filter.rs @@ -7,20 +7,56 @@ use crate::test_http::*; use crate::types::*; use http::StatusCode; use log::*; +use proxy_wasm::hostcalls::*; use proxy_wasm::traits::*; use proxy_wasm::types::*; +use std::collections::HashMap; use std::str::FromStr; use std::time::Duration; proxy_wasm::main! {{ proxy_wasm::set_log_level(LogLevel::Info); proxy_wasm::set_root_context(|_| -> Box { - Box::new(TestRoot { config: None }) + Box::new(TestRoot { config: None, metrics: HashMap::new() }) }); }} struct TestRoot { config: Option, + metrics: HashMap, +} + +impl TestRoot { + fn get_config(&self, name: &str) -> Option<&str> { + match &self.config { + Some(config) => config.map.get(name).map(|s| s.as_str()), + None => None, + } + } + + fn define_metrics(&mut self) { + let config = self.get_config("metrics").map_or("c1,g1,h1".to_string(), |x| x.to_string()); + + for metric in config.split(",") { + let metric_char = metric.chars().nth(0).unwrap(); + let metric_type = match metric_char { + 'c' => MetricType::Counter, + 'g' => MetricType::Gauge, + 'h' => MetricType::Histogram, + _ => panic!("unexpected metric type"), + }; + let n = metric[1..].parse::().expect("bad metrics value"); + + for i in 1..(n + 1) { + let name = format!("{}{}", metric_char, i); + let m_id = define_metric(metric_type, &name).expect("cannot define new metric"); + + info!("defined metric {} as {:?}", &name, m_id); + + self.metrics.insert(name, m_id); + } + } + } } impl Context for TestRoot {} @@ -44,6 +80,8 @@ impl RootContext for TestRoot { self.set_tick_period(Duration::from_millis(ms)); } + + self.define_metrics(); } true