From 5158928b9e3c6726e50675a374d9c3b37bbaddfb Mon Sep 17 00:00:00 2001 From: Ilya Mashchenko Date: Tue, 24 Dec 2024 13:29:21 +0200 Subject: [PATCH] improvement(go.d/nats): add leafz metrics (#19282) --- src/go/plugin/go.d/collector/nats/cache.go | 78 +++++++++---- src/go/plugin/go.d/collector/nats/charts.go | 109 ++++++++++++++++++ src/go/plugin/go.d/collector/nats/collect.go | 40 ++++++- .../go.d/collector/nats/collector_test.go | 13 ++- .../plugin/go.d/collector/nats/metadata.yaml | 38 ++++++ src/go/plugin/go.d/collector/nats/restapi.go | 21 ++++ .../nats/testdata/v2.10.24/leafz.json | 21 ++++ 7 files changed, 289 insertions(+), 31 deletions(-) create mode 100644 src/go/plugin/go.d/collector/nats/testdata/v2.10.24/leafz.json diff --git a/src/go/plugin/go.d/collector/nats/cache.go b/src/go/plugin/go.d/collector/nats/cache.go index 93ddbdef3ab07c..96c6ed048cd19a 100644 --- a/src/go/plugin/go.d/collector/nats/cache.go +++ b/src/go/plugin/go.d/collector/nats/cache.go @@ -2,12 +2,17 @@ package nats +import ( + "fmt" +) + func newCache() *cache { return &cache{ accounts: make(accCache), routes: make(routeCache), inGateways: make(gwCache), outGateways: make(gwCache), + leafs: make(leafCache), } } @@ -16,6 +21,7 @@ type cache struct { routes routeCache inGateways gwCache outGateways gwCache + leafs leafCache } func (c *cache) resetUpdated() { @@ -36,17 +42,18 @@ func (c *cache) resetUpdated() { type ( accCache map[string]*accCacheEntry accCacheEntry struct { - accName string hasCharts bool updated bool + + accName string } ) -func (c *accCache) put(name string) { - acc, ok := (*c)[name] +func (c *accCache) put(ai accountInfo) { + acc, ok := (*c)[ai.Account] if !ok { - acc = &accCacheEntry{accName: name} - (*c)[name] = acc + acc = &accCacheEntry{accName: ai.Account} + (*c)[ai.Account] = acc } acc.updated = true } @@ -54,18 +61,19 @@ func (c *accCache) put(name string) { type ( routeCache map[uint64]*routeCacheEntry routeCacheEntry struct { - rid uint64 - remoteId string hasCharts bool updated bool + + rid uint64 + remoteId string } ) -func (c *routeCache) put(rid uint64, remoteId string) { - route, ok := (*c)[rid] +func (c *routeCache) put(ri routeInfo) { + route, ok := (*c)[ri.Rid] if !ok { - route = &routeCacheEntry{rid: rid, remoteId: remoteId} - (*c)[rid] = route + route = &routeCacheEntry{rid: ri.Rid, remoteId: ri.RemoteID} + (*c)[ri.Rid] = route } route.updated = true } @@ -73,36 +81,58 @@ func (c *routeCache) put(rid uint64, remoteId string) { type ( gwCache map[string]*gwCacheEntry gwCacheEntry struct { - gwName string - rgwName string hasCharts bool updated bool - conns map[uint64]*gwConnCacheEntry + + gwName string + rgwName string + conns map[uint64]*gwConnCacheEntry } gwConnCacheEntry struct { - gwName string - rgwName string - cid uint64 hasCharts bool updated bool + + gwName string + rgwName string + cid uint64 } ) -func (c *gwCache) put(gwName, rgwName string) { +func (c *gwCache) put(gwName, rgwName string, rgi *remoteGatewayInfo) { gw, ok := (*c)[gwName] if !ok { gw = &gwCacheEntry{gwName: gwName, rgwName: rgwName, conns: make(map[uint64]*gwConnCacheEntry)} (*c)[gwName] = gw } gw.updated = true -} -func (c *gwCache) putConn(gwName, rgwName string, cid uint64) { - c.put(gwName, rgwName) - conn, ok := (*c)[gwName].conns[cid] + conn, ok := gw.conns[rgi.Connection.Cid] if !ok { - conn = &gwConnCacheEntry{gwName: gwName, rgwName: rgwName, cid: cid} - (*c)[gwName].conns[cid] = conn + conn = &gwConnCacheEntry{gwName: gwName, rgwName: rgwName, cid: rgi.Connection.Cid} + gw.conns[rgi.Connection.Cid] = conn } conn.updated = true } + +type ( + leafCache map[string]*leafCacheEntry + leafCacheEntry struct { + hasCharts bool + updated bool + + leafName string + account string + ip string + port int + } +) + +func (c *leafCache) put(li leafInfo) { + key := fmt.Sprintf("%s_%s_%s_%d", li.Name, li.Account, li.IP, li.Port) + leaf, ok := (*c)[key] + if !ok { + leaf = &leafCacheEntry{leafName: li.Name, account: li.Account, ip: li.IP, port: li.Port} + (*c)[key] = leaf + } + leaf.updated = true +} diff --git a/src/go/plugin/go.d/collector/nats/charts.go b/src/go/plugin/go.d/collector/nats/charts.go index 50767f1cea285c..5772dab90377c1 100644 --- a/src/go/plugin/go.d/collector/nats/charts.go +++ b/src/go/plugin/go.d/collector/nats/charts.go @@ -41,6 +41,11 @@ const ( prioGatewayConnMessages prioGatewayConnSubscriptions prioGatewayConnUptime + + prioLeafConnTraffic + prioLeafConnMessages + prioLeafConnSubscriptions + prioLeafRTT ) var serverCharts = func() module.Charts { @@ -391,6 +396,65 @@ var ( } ) +var leafConnChartsTmpl = module.Charts{ + leafConnTrafficTmpl.Copy(), + leafConnMessagesTmpl.Copy(), + leafConnSubscriptionsTmpl.Copy(), + leafConnRTT.Copy(), +} + +var ( + leafConnTrafficTmpl = module.Chart{ + ID: "leaf_node_conn_%s_%s_%s_%d_traffic", + Title: "Leaf Node Connection Traffic", + Units: "bytes/s", + Fam: "leaf traffic", + Ctx: "nats.leaf_node_conn_traffic", + Priority: prioLeafConnTraffic, + Type: module.Area, + Dims: module.Dims{ + {ID: "leafz_leaf_%s_%s_%s_%d_in_bytes", Name: "in", Algo: module.Incremental}, + {ID: "leafz_leaf_%s_%s_%s_%d_out_bytes", Name: "out", Mul: -1, Algo: module.Incremental}, + }, + } + leafConnMessagesTmpl = module.Chart{ + ID: "leaf_node_conn_%s_%s_%s_%d_messages", + Title: "Leaf Node Connection Messages", + Units: "messages/s", + Fam: "leaf traffic", + Ctx: "nats.leaf_node_conn_messages", + Priority: prioLeafConnMessages, + Type: module.Line, + Dims: module.Dims{ + {ID: "leafz_leaf_%s_%s_%s_%d_in_msgs", Name: "in", Algo: module.Incremental}, + {ID: "leafz_leaf_%s_%s_%s_%d_out_msgs", Name: "out", Mul: -1, Algo: module.Incremental}, + }, + } + leafConnSubscriptionsTmpl = module.Chart{ + ID: "leaf_node_conn_%s_%s_%s_%d_subscriptions", + Title: "Leaf Node Connection Active Subscriptions", + Units: "subscriptions", + Fam: "leaf subscriptions", + Ctx: "nats.leaf_node_conn_subscriptions", + Priority: prioLeafConnSubscriptions, + Type: module.Line, + Dims: module.Dims{ + {ID: "leafz_leaf_%s_%s_%s_%d_num_subs", Name: "active"}, + }, + } + leafConnRTT = module.Chart{ + ID: "leaf_node_conn_%s_%s_%s_%d_rtt", + Title: "Leaf Node Connection RTT", + Units: "microseconds", + Fam: "leaf rtt", + Ctx: "nats.leaf_node_conn_rtt", + Priority: prioLeafRTT, + Dims: module.Dims{ + {ID: "leafz_leaf_%s_%s_%s_%d_rtt", Name: "rtt"}, + }, + } +) + func (c *Collector) updateCharts() { c.onceAddSrvCharts.Do(c.addServerCharts) @@ -444,6 +508,17 @@ func (c *Collector) updateCharts() { }) return false }) + maps.DeleteFunc(c.cache.leafs, func(_ string, leaf *leafCacheEntry) bool { + if !leaf.updated { + c.removeLeafCharts(leaf) + return true + } + if !leaf.hasCharts { + leaf.hasCharts = true + c.addLeafCharts(leaf) + } + return false + }) } func (c *Collector) addServerCharts() { @@ -545,6 +620,35 @@ func (c *Collector) removeGatewayConnCharts(gwConn *gwConnCacheEntry, isInbound c.removeCharts(px) } +func (c *Collector) addLeafCharts(leaf *leafCacheEntry) { + charts := leafConnChartsTmpl.Copy() + + for _, chart := range *charts { + chart.ID = fmt.Sprintf(chart.ID, leaf.leafName, leaf.account, leaf.ip, leaf.port) + chart.ID = cleanChartID(chart.ID) + chart.Labels = []module.Label{ + {Key: "server_id", Value: c.srvMeta.id}, + {Key: "remote_name", Value: leaf.leafName}, + {Key: "account", Value: leaf.account}, + {Key: "ip", Value: leaf.ip}, + {Key: "port", Value: strconv.Itoa(leaf.port)}, + } + for _, dim := range chart.Dims { + dim.ID = fmt.Sprintf(dim.ID, leaf.leafName, leaf.account, leaf.ip, leaf.port) + } + } + + if err := c.Charts().Add(*charts...); err != nil { + c.Warningf("failed to add charts for leaf %s: %s", leaf.leafName, err) + } +} + +func (c *Collector) removeLeafCharts(leaf *leafCacheEntry) { + px := fmt.Sprintf("leaf_node_conn_%s_%s_%s_%d_", leaf.leafName, leaf.account, leaf.ip, leaf.port) + cleanChartID(px) + c.removeCharts(px) +} + func (c *Collector) removeCharts(prefix string) { for _, chart := range *c.Charts() { if strings.HasPrefix(chart.ID, prefix) { @@ -553,3 +657,8 @@ func (c *Collector) removeCharts(prefix string) { } } } + +func cleanChartID(id string) string { + r := strings.NewReplacer(".", "_", " ", "_") + return strings.ToLower(r.Replace(id)) +} diff --git a/src/go/plugin/go.d/collector/nats/collect.go b/src/go/plugin/go.d/collector/nats/collect.go index 0ae46da810815d..b287911691c2f5 100644 --- a/src/go/plugin/go.d/collector/nats/collect.go +++ b/src/go/plugin/go.d/collector/nats/collect.go @@ -42,6 +42,9 @@ func (c *Collector) collect() (map[string]int64, error) { if err := c.collectGatewayz(mx); err != nil { return mx, err } + if err := c.collectLeafz(mx); err != nil { + return mx, err + } c.updateCharts() @@ -142,7 +145,7 @@ func (c *Collector) collectAccstatz(mx map[string]int64) error { } for _, acc := range resp.AccStats { - c.cache.accounts.put(acc.Account) + c.cache.accounts.put(acc) px := fmt.Sprintf("accstatz_acc_%s_", acc.Account) @@ -172,7 +175,7 @@ func (c *Collector) collectRoutez(mx map[string]int64) error { } for _, route := range resp.Routes { - c.cache.routes.put(route.Rid, route.RemoteID) + c.cache.routes.put(route) px := fmt.Sprintf("routez_route_id_%d_", route.Rid) @@ -198,8 +201,7 @@ func (c *Collector) collectGatewayz(mx map[string]int64) error { } for name, ogw := range resp.OutboundGateways { - c.cache.outGateways.put(resp.Name, name) - c.cache.outGateways.putConn(resp.Name, name, ogw.Connection.Cid) + c.cache.outGateways.put(resp.Name, name, ogw) px := fmt.Sprintf("gatewayz_outbound_gw_%s_cid_%d_", name, ogw.Connection.Cid) @@ -213,9 +215,8 @@ func (c *Collector) collectGatewayz(mx map[string]int64) error { } for name, igws := range resp.InboundGateways { - c.cache.inGateways.put(resp.Name, name) for _, igw := range igws { - c.cache.inGateways.putConn(resp.Name, name, igw.Connection.Cid) + c.cache.inGateways.put(resp.Name, name, igw) px := fmt.Sprintf("gatewayz_inbound_gw_%s_cid_%d_", name, igw.Connection.Cid) @@ -232,6 +233,33 @@ func (c *Collector) collectGatewayz(mx map[string]int64) error { return nil } +func (c *Collector) collectLeafz(mx map[string]int64) error { + req, err := web.NewHTTPRequestWithPath(c.RequestConfig, urlPathLeafz) + if err != nil { + return err + } + + var resp leafzResponse + if err := web.DoHTTP(c.httpClient).RequestJSON(req, &resp); err != nil { + return err + } + + for _, leaf := range resp.Leafs { + c.cache.leafs.put(leaf) + px := fmt.Sprintf("leafz_leaf_%s_%s_%s_%d_", leaf.Name, leaf.Account, leaf.IP, leaf.Port) + + mx[px+"in_bytes"] = leaf.InBytes + mx[px+"out_bytes"] = leaf.OutBytes + mx[px+"in_msgs"] = leaf.InMsgs + mx[px+"out_msgs"] = leaf.OutMsgs + mx[px+"num_subs"] = int64(leaf.NumSubs) + rtt, _ := time.ParseDuration(leaf.RTT) + mx[px+"rtt"] = rtt.Microseconds() + } + + return nil +} + func parseUptime(uptime string) (time.Duration, error) { // https://github.com/nats-io/nats-server/blob/v2.10.24/server/monitor.go#L1354 diff --git a/src/go/plugin/go.d/collector/nats/collector_test.go b/src/go/plugin/go.d/collector/nats/collector_test.go index c073dcade26988..ec5047e7cbff72 100644 --- a/src/go/plugin/go.d/collector/nats/collector_test.go +++ b/src/go/plugin/go.d/collector/nats/collector_test.go @@ -25,6 +25,7 @@ var ( dataVer210Accstatz, _ = os.ReadFile("testdata/v2.10.24/accstatz.json") dataVer210Routez, _ = os.ReadFile("testdata/v2.10.24/routez.json") dataVer210Gatewayz, _ = os.ReadFile("testdata/v2.10.24/gatewayz.json") + dataVer210Leafz, _ = os.ReadFile("testdata/v2.10.24/leafz.json") ) func Test_testDataIsValid(t *testing.T) { @@ -36,6 +37,7 @@ func Test_testDataIsValid(t *testing.T) { "dataVer210Accstatz": dataVer210Accstatz, "dataVer210Routez": dataVer210Routez, "dataVer210Gatewayz": dataVer210Gatewayz, + "dataVer210Leafz": dataVer210Leafz, } { require.NotNil(t, data, name) } @@ -134,7 +136,8 @@ func TestCollector_Collect(t *testing.T) { wantNumOfCharts: len(serverCharts) + len(accountChartsTmpl)*3 + len(routeChartsTmpl)*1 + - len(gatewayConnChartsTmpl)*5, + len(gatewayConnChartsTmpl)*5 + + len(leafConnChartsTmpl)*1, wantMetrics: map[string]int64{ "accstatz_acc_$G_conns": 0, "accstatz_acc_$G_leaf_nodes": 0, @@ -193,6 +196,12 @@ func TestCollector_Collect(t *testing.T) { "gatewayz_outbound_gw_region3_cid_5_out_bytes": 0, "gatewayz_outbound_gw_region3_cid_5_out_msgs": 0, "gatewayz_outbound_gw_region3_cid_5_uptime": 6, + "leafz_leaf__$G_127.0.0.1_6223_in_bytes": 0, + "leafz_leaf__$G_127.0.0.1_6223_in_msgs": 0, + "leafz_leaf__$G_127.0.0.1_6223_num_subs": 1, + "leafz_leaf__$G_127.0.0.1_6223_out_bytes": 1280000, + "leafz_leaf__$G_127.0.0.1_6223_out_msgs": 10000, + "leafz_leaf__$G_127.0.0.1_6223_rtt": 0, "routez_route_id_1_in_bytes": 4, "routez_route_id_1_in_msgs": 1, "routez_route_id_1_num_subs": 1, @@ -284,6 +293,8 @@ func caseOk(t *testing.T) (*Collector, func()) { _, _ = w.Write(dataVer210Routez) case urlPathGatewayz: _, _ = w.Write(dataVer210Gatewayz) + case urlPathLeafz: + _, _ = w.Write(dataVer210Leafz) default: w.WriteHeader(http.StatusNotFound) } diff --git a/src/go/plugin/go.d/collector/nats/metadata.yaml b/src/go/plugin/go.d/collector/nats/metadata.yaml index 39476e8ca439f0..ac040ac5295a61 100644 --- a/src/go/plugin/go.d/collector/nats/metadata.yaml +++ b/src/go/plugin/go.d/collector/nats/metadata.yaml @@ -412,3 +412,41 @@ modules: chart_type: line dimensions: - name: uptime + - name: leaf node connection + description: These metrics refer to [Leaf Node Connections](https://docs.nats.io/running-a-nats-service/nats_admin/monitoring#leaf-node-information). + labels: + - name: remote_name + description: "Unique identifier of the remote leaf node server, either its configured name or automatically assigned ID." + - name: account + description: "Name of the associated account." + - name: ip + description: "IP address of the remote server." + - name: port + description: "Port used for the connection to the remote server." + metrics: + - name: nats.leaf_node_conn_traffic + description: Leaf Node Connection Traffic + unit: bytes/s + chart_type: area + dimensions: + - name: in + - name: out + - name: nats.leaf_node_conn_messages + description: Leaf Node Connection Messages + unit: messages/s + chart_type: line + dimensions: + - name: in + - name: out + - name: nats.leaf_node_conn_subscriptions + description: Leaf Node Connection Active Subscriptions + unit: subscriptions + chart_type: line + dimensions: + - name: active + - name: nats.leaf_node_conn_rtt + description: Leaf Node Connection RTT + unit: microseconds + chart_type: line + dimensions: + - name: rtt diff --git a/src/go/plugin/go.d/collector/nats/restapi.go b/src/go/plugin/go.d/collector/nats/restapi.go index 7de48ea1409788..c2a07cf8469c4c 100644 --- a/src/go/plugin/go.d/collector/nats/restapi.go +++ b/src/go/plugin/go.d/collector/nats/restapi.go @@ -19,6 +19,8 @@ const ( urlPathRoutez = "/routez" // https://docs.nats.io/running-a-nats-service/nats_admin/monitoring#gateway-information urlPathGatewayz = "/gatewayz" + // https://docs.nats.io/running-a-nats-service/nats_admin/monitoring#leaf-node-information + urlPathLeafz = "/leafz" ) var ( @@ -138,3 +140,22 @@ type ( NumSubs uint32 `json:"subscriptions"` } ) + +type ( + // https://github.com/nats-io/nats-server/blob/v2.10.24/server/monitor.go#L2163 + leafzResponse struct { + Leafs []leafInfo `json:"leafs"` + } + leafInfo struct { + Name string `json:"name"` // remote server name or id + Account string `json:"account"` + IP string `json:"ip"` + Port int `json:"port"` + RTT string `json:"rtt,omitempty"` + InMsgs int64 `json:"in_msgs"` + OutMsgs int64 `json:"out_msgs"` + InBytes int64 `json:"in_bytes"` + OutBytes int64 `json:"out_bytes"` + NumSubs uint32 `json:"subscriptions"` + } +) diff --git a/src/go/plugin/go.d/collector/nats/testdata/v2.10.24/leafz.json b/src/go/plugin/go.d/collector/nats/testdata/v2.10.24/leafz.json new file mode 100644 index 00000000000000..7d438072fc183b --- /dev/null +++ b/src/go/plugin/go.d/collector/nats/testdata/v2.10.24/leafz.json @@ -0,0 +1,21 @@ +{ + "server_id": "NC2FJCRMPBE5RI5OSRN7TKUCWQONCKNXHKJXCJIDVSAZ6727M7MQFVT3", + "now": "2019-08-27T09:07:05.841132-06:00", + "leafnodes": 1, + "leafs": [ + { + "account": "$G", + "ip": "127.0.0.1", + "port": 6223, + "rtt": "200µs", + "in_msgs": 0, + "out_msgs": 10000, + "in_bytes": 0, + "out_bytes": 1280000, + "subscriptions": 1, + "subscriptions_list": [ + "foo" + ] + } + ] +}