Skip to content
This repository has been archived by the owner on Jan 10, 2023. It is now read-only.

Commit

Permalink
Merge pull request #36 from ag0n1k/DMP-3004-queues
Browse files Browse the repository at this point in the history
Added ability to track storage-engine metrics
  • Loading branch information
alicebob authored Dec 29, 2019
2 parents 46ba280 + ec56f34 commit ef82cb8
Showing 1 changed file with 86 additions and 7 deletions.
93 changes: 86 additions & 7 deletions namespaces.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"bytes"
"strings"

as "github.com/aerospike/aerospike-client-go"
Expand Down Expand Up @@ -90,8 +91,6 @@ var (
counter("client_write_error", "client write error"),
counter("client_write_success", "client write success"),
counter("client_write_timeout", "client write timeout"),
counter("defrag_reads", "defrag reads"),
counter("defrag_writes", "defrag writes"),
counter("evicted_objects", "evicted objects"),
counter("expired_objects", "expired objects"),
counter("fail_generation", "fail generation"),
Expand Down Expand Up @@ -166,7 +165,6 @@ var (
counter("xdr_write_success", "xdr write success"),
counter("xdr_write_timeout", "xdr write timeout"),
gauge("available_bin_names", "available bin names"),
// broken gauge("defrag_q", "defrag queue"),
gauge("device_available_pct", "device available pct"),
gauge("device_compression_ratio", "device compression ratio"),
gauge("device_free_pct", "device free pct"),
Expand Down Expand Up @@ -216,7 +214,6 @@ var (
gauge("prole_objects", "prole objects"),
gauge("prole_tombstones", "prole tombstones"),
gauge("replication-factor", "replication factor"),
// broken gauge("shadow_write_q", "shadow write queue"),
gauge("stop_writes", "stop writes"),
gauge("stop-writes-pct", "stop writes pct"),
gauge("tombstones", "tombstones"),
Expand All @@ -225,10 +222,16 @@ var (
gauge("dead_partitions", "dead partitions"),
gauge("unavailable_partitions", "unavailable partitions"),
gauge("rack-id", "rack id"),
// gauge("write_q", "write queue"),
// device-level stats don't appear to work
// and this plugin thinks "storage-engine.device[0].write_q" is malformed.
}
NamespaceStorageMetrics = []metric{
counter("defrag_reads", "defrag reads"),
counter("defrag_writes", "defrag writes"),
gauge("shadow_write_q", "shadow write queue"),
gauge("defrag_q", "defrag queue"),
gauge("write_q", "write queue"),
}
)

type nsCollector cmetrics
Expand All @@ -246,6 +249,18 @@ func newNSCollector() nsCollector {
),
}
}
for _, m := range NamespaceStorageMetrics {
ns[m.aeroName] = cmetric{
typ: m.typ,
desc: prometheus.NewDesc(
promkey(systemNamespace, m.aeroName),
m.desc,
[]string{"namespace", "mount"},
nil,
),
}
}

return ns
}

Expand All @@ -255,21 +270,85 @@ func (nc nsCollector) describe(ch chan<- *prometheus.Desc) {
}
}

func (nc nsCollector) parseStorage(s string, d string) (string, error) {
// the function remove the storage prefix metrics for each device:
// d is storage-engine.device[ix]
// s is all storage metrics that has been scraped
// storage-engine.device[ix].age -> age
// https://www.aerospike.com/docs/reference/metrics/#storage-engine.device[ix].age
buf := bytes.Buffer{}
for _, l := range strings.Split(s, ";") {
for _, v := range strings.Split(l, ":") {
kv := strings.SplitN(v, "=", 2)
if len(kv) > 1 {
if strings.HasPrefix(kv[0], d) {
//todo: optimize
kv[0] = strings.Replace(kv[0]+".", d, "", 1)
kv[0] = strings.Replace(kv[0], ".", "", -1)
}
buf.WriteString(kv[0] + "=" + kv[1] + ";")
}
}
}
r := buf.String()
return r, nil
}

func (nc nsCollector) splitInfo(s string) (string, string, map[string]string) {
nsStorageMounts := map[string]string{}

bufStandardMetrics := bytes.Buffer{}
bufStorageMetrics := bytes.Buffer{}

for _, l := range strings.Split(s, ";") {
for _, v := range strings.Split(l, ":") {
kv := strings.SplitN(v, "=", 2)
if strings.HasPrefix(kv[0], "storage-engine") {
bufStorageMetrics.WriteString(v + ";")
if strings.HasSuffix(kv[0], "]") {
nsStorageMounts[kv[1]] = kv[0]
}
} else {
bufStandardMetrics.WriteString(v + ";")
}
}
}
nsStandardMetrics := bufStandardMetrics.String()
nsStorageMetrics := bufStorageMetrics.String()
return nsStorageMetrics, nsStandardMetrics, nsStorageMounts
}

func (nc nsCollector) collect(conn *as.Connection) ([]prometheus.Metric, error) {
info, err := as.RequestInfo(conn, "namespaces")
if err != nil {
return nil, err
}
var metrics []prometheus.Metric
for _, ns := range strings.Split(info["namespaces"], ";") {
nsinfo, err := as.RequestInfo(conn, "namespace/"+ns)
nsInfo, err := as.RequestInfo(conn, "namespace/"+ns)
if err != nil {
return nil, err
}

nsInfoStorage, nsInfoStandard, nsInfoStorageDevices := nc.splitInfo(nsInfo["namespace/"+ns])

metrics = append(
metrics,
infoCollect(cmetrics(nc), nsinfo["namespace/"+ns], ns)...,
infoCollect(cmetrics(nc), nsInfoStandard, ns)...,
)

for mountName, metricName := range nsInfoStorageDevices {
nsInfoStorage, err = nc.parseStorage(nsInfoStorage, metricName)

if err != nil {
return nil, err
}

metrics = append(
metrics,
infoCollect(cmetrics(nc), nsInfoStorage, ns, mountName)...,
)
}
}
return metrics, nil
}

0 comments on commit ef82cb8

Please sign in to comment.