Skip to content

Commit

Permalink
#59 add semp v2 for queue stats
Browse files Browse the repository at this point in the history
  • Loading branch information
GreenRover committed Dec 10, 2023
1 parent e8b1054 commit 0c3dd2c
Show file tree
Hide file tree
Showing 11 changed files with 441 additions and 79 deletions.
137 changes: 88 additions & 49 deletions README.md

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions exporter/config.struct.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type Config struct {
ScrapeURI string
Username string
Password string
DefaultVpn string
SslVerify bool
useSystemProxy bool
Timeout time.Duration
Expand Down Expand Up @@ -82,6 +83,10 @@ func ParseConfig(configFile string) (map[string][]DataSource, *Config, error) {
if err != nil {
return nil, nil, err
}
conf.DefaultVpn, err = parseConfigString(cfg, "solace", "defaultVpn", "SOLACE_DEFAULT_VPN")
if err != nil {
return nil, nil, err
}
conf.Timeout, err = parseConfigDuration(cfg, "solace", "timeout", "SOLACE_TIMEOUT")
if err != nil {
return nil, nil, err
Expand Down
14 changes: 9 additions & 5 deletions exporter/dataSource.struct.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
package exporter

import "fmt"
import (
"fmt"
"strings"
)

type DataSource struct {
Name string
VpnFilter string
ItemFilter string
Name string
VpnFilter string
ItemFilter string
MetricFilter []string
}

func (dataSource DataSource) String() string {
return fmt.Sprintf("%s=%s|%s", dataSource.Name, dataSource.VpnFilter, dataSource.ItemFilter)
return fmt.Sprintf("%s=%s|%s|%s", dataSource.Name, dataSource.VpnFilter, dataSource.ItemFilter, strings.Join(dataSource.MetricFilter, ","))
}
22 changes: 20 additions & 2 deletions exporter/exporter.collect.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
package exporter

import (
"solace_exporter/semp"

"errors"
"github.com/prometheus/client_golang/prometheus"
"solace_exporter/semp"
"strings"
)

// Collect fetches the stats from configured Solace location and delivers them
// as Prometheus metrics. It implements prometheus.Collector.
func (e *Exporter) Collect(ch chan<- prometheus.Metric) {
var up float64 = 1
var err error = nil
var vpnName = ""

for _, dataSource := range *e.dataSource {
if up < 1 {
Expand Down Expand Up @@ -76,6 +78,11 @@ func (e *Exporter) Collect(ch chan<- prometheus.Metric) {
up, err = e.semp.GetQueueRatesSemp1(ch, dataSource.VpnFilter, dataSource.ItemFilter)
case "QueueStats":
up, err = e.semp.GetQueueStatsSemp1(ch, dataSource.VpnFilter, dataSource.ItemFilter)
case "QueueStatsV2":
vpnName, err = e.getVpnName(dataSource.VpnFilter)
if err == nil {
up, err = e.semp.GetQueueStatsSemp2(ch, vpnName, dataSource.ItemFilter, dataSource.MetricFilter)
}
case "QueueDetails":
up, err = e.semp.GetQueueDetailsSemp1(ch, dataSource.VpnFilter, dataSource.ItemFilter)
case "TopicEndpointRates":
Expand All @@ -88,3 +95,14 @@ func (e *Exporter) Collect(ch chan<- prometheus.Metric) {
}
ch <- prometheus.MustNewConstMetric(semp.MetricDesc["Global"]["up"], prometheus.GaugeValue, 1, "")
}

func (e *Exporter) getVpnName(vpnFilter string) (vpnName string, err error) {
if vpnFilter == "*" {
if len(strings.TrimSpace(e.config.DefaultVpn)) == 0 {
return "", errors.New("Can't scrape Semp2 As vpnFilter was an * given and the defaultVpn is not set in configuration")
}
return e.config.DefaultVpn, nil
}

return vpnFilter, nil
}
13 changes: 11 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
github.com/alecthomas/kingpin/v2 v2.4.0 h1:f48lwail6p8zpO1bC4TxtqACaGqHYA22qkHjHpqDjYY=
github.com/alecthomas/kingpin/v2 v2.4.0/go.mod h1:0gyi0zQnjuFk8xrkNKamJoyUo382HRL7ATRpFZCw6tE=
github.com/alecthomas/kingpin/v2 v2.3.2 h1:H0aULhgmSzN8xQ3nX1uxtdlTHYoPLu5AhHxWrKI6ocU=
github.com/alecthomas/kingpin/v2 v2.3.2/go.mod h1:0gyi0zQnjuFk8xrkNKamJoyUo382HRL7ATRpFZCw6tE=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafoB+tBA3gMyHYHrpOtNuDiK/uB5uXxq5wM=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20231202071711-9a357b53e9c9 h1:ez/4by2iGztzR4L0zgAOR8lTQK9VlyBVVd7G4omaOQs=
Expand All @@ -10,6 +10,7 @@ github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/go-kit/kit v0.13.0 h1:OoneCcHKHQ03LfBpoQCUfCluwd2Vt3ohz+kvbJneZAU=
github.com/go-kit/kit v0.13.0/go.mod h1:phqEHMMUbyrCFCTgH48JueqrM3md2HcAZ8N3XE4FKDg=
github.com/go-kit/log v0.2.1 h1:MRVx0/zhvdseW+Gza6N9rVzU/IVzaeE1SFI4raAhmBU=
Expand All @@ -19,8 +20,15 @@ github.com/go-logfmt/logfmt v0.6.0/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KE
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM=
github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 h1:jWpvCLoY8Z/e3VKvlsiIGKtc+UG6U5vzxaoagmhXfyg=
github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0/go.mod h1:QUyp042oQthUoa9bqDv0ER0wrtXnBruoNd7aNjkbP+k=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_golang v1.17.0 h1:rl2sfwZMtSthVU752MqfjQozy7blglC+1SOtjMAMh+Q=
Expand Down Expand Up @@ -49,4 +57,5 @@ gopkg.in/ini.v1 v1.67.0 h1:Dgnx+6+nfE+IfzjUEISNeydPJh9AXNNsWbGP9KzCsOA=
gopkg.in/ini.v1 v1.67.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
129 changes: 129 additions & 0 deletions semp/getQueueStatsSemp2.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package semp

import (
"encoding/json"
"errors"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"strings"
)

// Get rates for each individual queue of all vpn's
// This can result in heavy system load for lots of queues
func (e *Semp) GetQueueStatsSemp2(ch chan<- prometheus.Metric, vpnName string, itemFilter string, metricFilter []string) (ok float64, err error) {
type Response struct {
Queue []struct {
QueueName string `json:"queueName"`
MsgVpnName string `json:""`
TotalByteSpooled float64 `json:"spooledByteCount"`
TotalMsgSpooled float64 `json:"spooledMsgCount"`
MsgRedelivered float64 `json:"redeliveredMsgCount"`
MsgRetransmit float64 `json:"transportRetransmitMsgCount"`
SpoolUsageExceeded float64 `json:"maxMsgSpoolUsageExceededDiscardedMsgCount"`
MsgSizeExceeded float64 `json:"maxMsgSizeExceededDiscardedMsgCount"`
SpoolShutdownDiscard float64 `json:"disabledDiscardedMsgCount"`
DestinationGroupError float64 `json:"destinationGroupErrorDiscardedMsgCount"`
LowPrioMsgDiscard float64 `json:"lowPriorityMsgCongestionDiscardedMsgCount"`
Deleted float64 `json:"deletedMsgCount"`
TtlDiscarded float64 `json:"maxTtlExpiredDiscardedMsgCount"`
TtlDmq float64 `json:"maxTtlExpiredToDmqMsgCount"`
TtlDmqFailed float64 `json:"maxTtlExpiredToDmqFailedMsgCount"`
MaxRedeliveryDiscarded float64 `json:"maxRedeliveryExceededDiscardedMsgCount"`
MaxRedeliveryDmq float64 `json:"maxRedeliveryExceededToDmqMsgCount"`
MaxRedeliveryDmqFailed float64 `json:"maxRedeliveryExceededToDmqFailedMsgCount"`
TxUnackedMsg float64 `json:"txUnackedMsgCount"`
TransactionNotSupportedDiscardedMsg float64 `json:"xaTransactionNotSupportedDiscardedMsgCount"`
} `json:"data"`
Meta struct {
Count int64 `json:"count"`
ResponseCode int `json:"responseCode"`
Paging struct {
CursorQuery string `json:"cursorQuery"`
NextPageUri string `json:"nextPageUri"`
} `json:",paging"`
Error struct {
Code int `json:"code"`
Description string `json:"description"`
Status string `json:"status"`
} `json:",error"`
} `json:"meta"`
}

var getParameter = "count=100"
if len(strings.TrimSpace(itemFilter)) > 0 && itemFilter != "*" {
if strings.Contains(itemFilter, "=") {
getParameter += "&where=" + itemFilter
} else {
getParameter += "&where=queueName==" + itemFilter
}
}

var fieldsToSelect []string
if len(metricFilter) > 0 {
fieldsToSelect, err = getSempV2FieldsToSelect(
metricFilter,
[]string{"queueName", "msgVpnName"},
QueueStatsSempV2,
)

if err != nil {
_ = level.Error(e.logger).Log("msg", "Unable to map metric filter", "err", err, "broker", e.brokerURI)
return 0, err
}
getParameter += "&select=" + strings.Join(fieldsToSelect, ",")
}

var lastQueueName = ""
for nextUrl := e.brokerURI + "/SEMP/v2/monitor/msgVpns/" + vpnName + "/queues?" + getParameter; nextUrl != ""; {
body, err := e.getHTTPbytes(nextUrl, "application/json ")
if err != nil {
_ = level.Error(e.logger).Log("msg", "Can't scrape QueueStatsSemp2", "command", nextUrl, "err", err, "broker", e.brokerURI)
return 0, err
}

var response Response
err = json.Unmarshal(body, &response)
if err != nil {
_ = level.Error(e.logger).Log("msg", "Can't decode QueueStatsSemp2", "err", err, "broker", e.brokerURI)
return 0, err
}
if response.Meta.ResponseCode != 200 {
_ = level.Error(e.logger).Log("msg", "unexpected result", "command", nextUrl, "remoteError", response.Meta.Error.Description, "broker", e.brokerURI)
return 0, errors.New("unexpected result: see log")
}

//fmt.Printf("Next request: %v\n", response.Meta.Paging.NextPageUri)
nextUrl = response.Meta.Paging.NextPageUri
for _, queue := range response.Queue {
queueKey := queue.MsgVpnName + "___" + queue.QueueName
if queueKey == lastQueueName {
continue
}
lastQueueName = queueKey

var values = []SempV2Result{
{v2Desc: QueueStatsSempV2["total_bytes_spooled"], value: queue.TotalByteSpooled},
{v2Desc: QueueStatsSempV2["messages_redelivered"], value: queue.MsgRedelivered},
{v2Desc: QueueStatsSempV2["messages_transport_retransmited"], value: queue.MsgRetransmit},
{v2Desc: QueueStatsSempV2["spool_usage_exceeded"], value: queue.SpoolUsageExceeded},
{v2Desc: QueueStatsSempV2["max_message_size_exceeded"], value: queue.MsgSizeExceeded},
{v2Desc: QueueStatsSempV2["total_deleted_messages"], value: queue.Deleted},
{v2Desc: QueueStatsSempV2["messages_shutdown_discarded"], value: queue.SpoolShutdownDiscard},
{v2Desc: QueueStatsSempV2["messages_ttl_discarded"], value: queue.TtlDiscarded},
{v2Desc: QueueStatsSempV2["messages_ttl_dmq"], value: queue.TtlDmq},
{v2Desc: QueueStatsSempV2["messages_ttl_dmq_failed"], value: queue.TtlDmqFailed},
{v2Desc: QueueStatsSempV2["messages_max_redelivered_discarded"], value: queue.MaxRedeliveryDiscarded},
{v2Desc: QueueStatsSempV2["messages_max_redelivered_dmq"], value: queue.MaxRedeliveryDmq},
{v2Desc: QueueStatsSempV2["messages_max_redelivered_dmq_failed"], value: queue.MaxRedeliveryDmqFailed},
}

for _, v := range values {
if v.v2Desc.isSelected(fieldsToSelect) {
ch <- prometheus.MustNewConstMetric(v.v2Desc.NewPrometheusDesc(), prometheus.GaugeValue, v.value, queue.MsgVpnName, queue.QueueName)
}
}
}
}

return 1, nil
}
45 changes: 45 additions & 0 deletions semp/helper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package semp

import (
"fmt"
"strings"
)

func mapItems(items []string, translateMap map[string]string) ([]string, error) {
validRawItems := make(map[string]bool, len(translateMap))
translated := make([]string, 0, len(items))
validItems := make([]string, 0, len(translateMap)*2)

for key, validRawItem := range translateMap {
validRawItems[validRawItem] = true

validItems = append(validItems, key)
validItems = append(validItems, validRawItem)
}

for _, item := range items {
if translatedItem, ok := translateMap[item]; ok {
translated = append(translated, translatedItem)
} else if _, ok := validRawItems[item]; ok {
translated = append(translated, item)
} else {
return nil, fmt.Errorf(
"Item \"%s\" is not valid. Pleaee choose from: %s",
item,
strings.Join(validItems, ","),
)
}
}

return translated, nil
}

func sliceContains(slice []string, lookUp string) bool {
for _, selectedField := range slice {
if selectedField == lookUp {
return true
}
}

return false
}
26 changes: 26 additions & 0 deletions semp/postHttp.go → semp/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,29 @@ func (s *Semp) postHTTP(uri string, _ string, body string) (io.ReadCloser, error
}
return resp.Body, nil
}

func (s *Semp) getHTTPbytes(uri string, _ string) ([]byte, error) {
req, err := http.NewRequest("GET", uri, nil)
if err != nil {
return nil, err
}

s.httpRequestVisitor(req)

resp, err := s.httpClient.Do(req)
if err != nil {
return nil, err
}

if !(resp.StatusCode >= 200 && resp.StatusCode < 500) {
_ = resp.Body.Close()
return nil, fmt.Errorf("HTTP status %d (%s)", resp.StatusCode, http.StatusText(resp.StatusCode))
}

body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}

return body, nil
}
Loading

0 comments on commit 0c3dd2c

Please sign in to comment.