Skip to content

Commit

Permalink
improvement(go.d/nats): add leafz metrics (netdata#19282)
Browse files Browse the repository at this point in the history
  • Loading branch information
ilyam8 authored Dec 24, 2024
1 parent 016b99d commit 5158928
Show file tree
Hide file tree
Showing 7 changed files with 289 additions and 31 deletions.
78 changes: 54 additions & 24 deletions src/go/plugin/go.d/collector/nats/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,17 @@

package nats

import (
"fmt"
)

func newCache() *cache {
return &cache{
accounts: make(accCache),
routes: make(routeCache),
inGateways: make(gwCache),
outGateways: make(gwCache),
leafs: make(leafCache),
}
}

Expand All @@ -16,6 +21,7 @@ type cache struct {
routes routeCache
inGateways gwCache
outGateways gwCache
leafs leafCache
}

func (c *cache) resetUpdated() {
Expand All @@ -36,73 +42,97 @@ func (c *cache) resetUpdated() {
type (
accCache map[string]*accCacheEntry
accCacheEntry struct {
accName string
hasCharts bool
updated bool

accName string
}
)

func (c *accCache) put(name string) {
acc, ok := (*c)[name]
func (c *accCache) put(ai accountInfo) {
acc, ok := (*c)[ai.Account]
if !ok {
acc = &accCacheEntry{accName: name}
(*c)[name] = acc
acc = &accCacheEntry{accName: ai.Account}
(*c)[ai.Account] = acc
}
acc.updated = true
}

type (
routeCache map[uint64]*routeCacheEntry
routeCacheEntry struct {
rid uint64
remoteId string
hasCharts bool
updated bool

rid uint64
remoteId string
}
)

func (c *routeCache) put(rid uint64, remoteId string) {
route, ok := (*c)[rid]
func (c *routeCache) put(ri routeInfo) {
route, ok := (*c)[ri.Rid]
if !ok {
route = &routeCacheEntry{rid: rid, remoteId: remoteId}
(*c)[rid] = route
route = &routeCacheEntry{rid: ri.Rid, remoteId: ri.RemoteID}
(*c)[ri.Rid] = route
}
route.updated = true
}

type (
gwCache map[string]*gwCacheEntry
gwCacheEntry struct {
gwName string
rgwName string
hasCharts bool
updated bool
conns map[uint64]*gwConnCacheEntry

gwName string
rgwName string
conns map[uint64]*gwConnCacheEntry
}
gwConnCacheEntry struct {
gwName string
rgwName string
cid uint64
hasCharts bool
updated bool

gwName string
rgwName string
cid uint64
}
)

func (c *gwCache) put(gwName, rgwName string) {
func (c *gwCache) put(gwName, rgwName string, rgi *remoteGatewayInfo) {
gw, ok := (*c)[gwName]
if !ok {
gw = &gwCacheEntry{gwName: gwName, rgwName: rgwName, conns: make(map[uint64]*gwConnCacheEntry)}
(*c)[gwName] = gw
}
gw.updated = true
}

func (c *gwCache) putConn(gwName, rgwName string, cid uint64) {
c.put(gwName, rgwName)
conn, ok := (*c)[gwName].conns[cid]
conn, ok := gw.conns[rgi.Connection.Cid]
if !ok {
conn = &gwConnCacheEntry{gwName: gwName, rgwName: rgwName, cid: cid}
(*c)[gwName].conns[cid] = conn
conn = &gwConnCacheEntry{gwName: gwName, rgwName: rgwName, cid: rgi.Connection.Cid}
gw.conns[rgi.Connection.Cid] = conn
}
conn.updated = true
}

type (
leafCache map[string]*leafCacheEntry
leafCacheEntry struct {
hasCharts bool
updated bool

leafName string
account string
ip string
port int
}
)

func (c *leafCache) put(li leafInfo) {
key := fmt.Sprintf("%s_%s_%s_%d", li.Name, li.Account, li.IP, li.Port)
leaf, ok := (*c)[key]
if !ok {
leaf = &leafCacheEntry{leafName: li.Name, account: li.Account, ip: li.IP, port: li.Port}
(*c)[key] = leaf
}
leaf.updated = true
}
109 changes: 109 additions & 0 deletions src/go/plugin/go.d/collector/nats/charts.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ const (
prioGatewayConnMessages
prioGatewayConnSubscriptions
prioGatewayConnUptime

prioLeafConnTraffic
prioLeafConnMessages
prioLeafConnSubscriptions
prioLeafRTT
)

var serverCharts = func() module.Charts {
Expand Down Expand Up @@ -391,6 +396,65 @@ var (
}
)

var leafConnChartsTmpl = module.Charts{
leafConnTrafficTmpl.Copy(),
leafConnMessagesTmpl.Copy(),
leafConnSubscriptionsTmpl.Copy(),
leafConnRTT.Copy(),
}

var (
leafConnTrafficTmpl = module.Chart{
ID: "leaf_node_conn_%s_%s_%s_%d_traffic",
Title: "Leaf Node Connection Traffic",
Units: "bytes/s",
Fam: "leaf traffic",
Ctx: "nats.leaf_node_conn_traffic",
Priority: prioLeafConnTraffic,
Type: module.Area,
Dims: module.Dims{
{ID: "leafz_leaf_%s_%s_%s_%d_in_bytes", Name: "in", Algo: module.Incremental},
{ID: "leafz_leaf_%s_%s_%s_%d_out_bytes", Name: "out", Mul: -1, Algo: module.Incremental},
},
}
leafConnMessagesTmpl = module.Chart{
ID: "leaf_node_conn_%s_%s_%s_%d_messages",
Title: "Leaf Node Connection Messages",
Units: "messages/s",
Fam: "leaf traffic",
Ctx: "nats.leaf_node_conn_messages",
Priority: prioLeafConnMessages,
Type: module.Line,
Dims: module.Dims{
{ID: "leafz_leaf_%s_%s_%s_%d_in_msgs", Name: "in", Algo: module.Incremental},
{ID: "leafz_leaf_%s_%s_%s_%d_out_msgs", Name: "out", Mul: -1, Algo: module.Incremental},
},
}
leafConnSubscriptionsTmpl = module.Chart{
ID: "leaf_node_conn_%s_%s_%s_%d_subscriptions",
Title: "Leaf Node Connection Active Subscriptions",
Units: "subscriptions",
Fam: "leaf subscriptions",
Ctx: "nats.leaf_node_conn_subscriptions",
Priority: prioLeafConnSubscriptions,
Type: module.Line,
Dims: module.Dims{
{ID: "leafz_leaf_%s_%s_%s_%d_num_subs", Name: "active"},
},
}
leafConnRTT = module.Chart{
ID: "leaf_node_conn_%s_%s_%s_%d_rtt",
Title: "Leaf Node Connection RTT",
Units: "microseconds",
Fam: "leaf rtt",
Ctx: "nats.leaf_node_conn_rtt",
Priority: prioLeafRTT,
Dims: module.Dims{
{ID: "leafz_leaf_%s_%s_%s_%d_rtt", Name: "rtt"},
},
}
)

func (c *Collector) updateCharts() {
c.onceAddSrvCharts.Do(c.addServerCharts)

Expand Down Expand Up @@ -444,6 +508,17 @@ func (c *Collector) updateCharts() {
})
return false
})
maps.DeleteFunc(c.cache.leafs, func(_ string, leaf *leafCacheEntry) bool {
if !leaf.updated {
c.removeLeafCharts(leaf)
return true
}
if !leaf.hasCharts {
leaf.hasCharts = true
c.addLeafCharts(leaf)
}
return false
})
}

func (c *Collector) addServerCharts() {
Expand Down Expand Up @@ -545,6 +620,35 @@ func (c *Collector) removeGatewayConnCharts(gwConn *gwConnCacheEntry, isInbound
c.removeCharts(px)
}

func (c *Collector) addLeafCharts(leaf *leafCacheEntry) {
charts := leafConnChartsTmpl.Copy()

for _, chart := range *charts {
chart.ID = fmt.Sprintf(chart.ID, leaf.leafName, leaf.account, leaf.ip, leaf.port)
chart.ID = cleanChartID(chart.ID)
chart.Labels = []module.Label{
{Key: "server_id", Value: c.srvMeta.id},
{Key: "remote_name", Value: leaf.leafName},
{Key: "account", Value: leaf.account},
{Key: "ip", Value: leaf.ip},
{Key: "port", Value: strconv.Itoa(leaf.port)},
}
for _, dim := range chart.Dims {
dim.ID = fmt.Sprintf(dim.ID, leaf.leafName, leaf.account, leaf.ip, leaf.port)
}
}

if err := c.Charts().Add(*charts...); err != nil {
c.Warningf("failed to add charts for leaf %s: %s", leaf.leafName, err)
}
}

func (c *Collector) removeLeafCharts(leaf *leafCacheEntry) {
px := fmt.Sprintf("leaf_node_conn_%s_%s_%s_%d_", leaf.leafName, leaf.account, leaf.ip, leaf.port)
cleanChartID(px)
c.removeCharts(px)
}

func (c *Collector) removeCharts(prefix string) {
for _, chart := range *c.Charts() {
if strings.HasPrefix(chart.ID, prefix) {
Expand All @@ -553,3 +657,8 @@ func (c *Collector) removeCharts(prefix string) {
}
}
}

func cleanChartID(id string) string {
r := strings.NewReplacer(".", "_", " ", "_")
return strings.ToLower(r.Replace(id))
}
40 changes: 34 additions & 6 deletions src/go/plugin/go.d/collector/nats/collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ func (c *Collector) collect() (map[string]int64, error) {
if err := c.collectGatewayz(mx); err != nil {
return mx, err
}
if err := c.collectLeafz(mx); err != nil {
return mx, err
}

c.updateCharts()

Expand Down Expand Up @@ -142,7 +145,7 @@ func (c *Collector) collectAccstatz(mx map[string]int64) error {
}

for _, acc := range resp.AccStats {
c.cache.accounts.put(acc.Account)
c.cache.accounts.put(acc)

px := fmt.Sprintf("accstatz_acc_%s_", acc.Account)

Expand Down Expand Up @@ -172,7 +175,7 @@ func (c *Collector) collectRoutez(mx map[string]int64) error {
}

for _, route := range resp.Routes {
c.cache.routes.put(route.Rid, route.RemoteID)
c.cache.routes.put(route)

px := fmt.Sprintf("routez_route_id_%d_", route.Rid)

Expand All @@ -198,8 +201,7 @@ func (c *Collector) collectGatewayz(mx map[string]int64) error {
}

for name, ogw := range resp.OutboundGateways {
c.cache.outGateways.put(resp.Name, name)
c.cache.outGateways.putConn(resp.Name, name, ogw.Connection.Cid)
c.cache.outGateways.put(resp.Name, name, ogw)

px := fmt.Sprintf("gatewayz_outbound_gw_%s_cid_%d_", name, ogw.Connection.Cid)

Expand All @@ -213,9 +215,8 @@ func (c *Collector) collectGatewayz(mx map[string]int64) error {
}

for name, igws := range resp.InboundGateways {
c.cache.inGateways.put(resp.Name, name)
for _, igw := range igws {
c.cache.inGateways.putConn(resp.Name, name, igw.Connection.Cid)
c.cache.inGateways.put(resp.Name, name, igw)

px := fmt.Sprintf("gatewayz_inbound_gw_%s_cid_%d_", name, igw.Connection.Cid)

Expand All @@ -232,6 +233,33 @@ func (c *Collector) collectGatewayz(mx map[string]int64) error {
return nil
}

func (c *Collector) collectLeafz(mx map[string]int64) error {
req, err := web.NewHTTPRequestWithPath(c.RequestConfig, urlPathLeafz)
if err != nil {
return err
}

var resp leafzResponse
if err := web.DoHTTP(c.httpClient).RequestJSON(req, &resp); err != nil {
return err
}

for _, leaf := range resp.Leafs {
c.cache.leafs.put(leaf)
px := fmt.Sprintf("leafz_leaf_%s_%s_%s_%d_", leaf.Name, leaf.Account, leaf.IP, leaf.Port)

mx[px+"in_bytes"] = leaf.InBytes
mx[px+"out_bytes"] = leaf.OutBytes
mx[px+"in_msgs"] = leaf.InMsgs
mx[px+"out_msgs"] = leaf.OutMsgs
mx[px+"num_subs"] = int64(leaf.NumSubs)
rtt, _ := time.ParseDuration(leaf.RTT)
mx[px+"rtt"] = rtt.Microseconds()
}

return nil
}

func parseUptime(uptime string) (time.Duration, error) {
// https://github.com/nats-io/nats-server/blob/v2.10.24/server/monitor.go#L1354

Expand Down
Loading

0 comments on commit 5158928

Please sign in to comment.