Skip to content

Commit

Permalink
#59 async sending of batches
Browse files Browse the repository at this point in the history
  • Loading branch information
GreenRover committed Dec 15, 2023
1 parent 4922d02 commit 742b762
Show file tree
Hide file tree
Showing 4 changed files with 173 additions and 18 deletions.
100 changes: 100 additions & 0 deletions exporter/asyncFetcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package exporter

import (
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"sync"
"time"
)

const (
// Capacity for the channel to collect metrics and descriptors.
capMetricChan = 1000
)

func NewAsyncFetcher(urlPath string, dataSource []DataSource, conf Config, logger log.Logger, version float64) *AsyncFetcher {

var fetcher = &AsyncFetcher{
dataSource: dataSource,
conf: conf,
logger: logger,
metrics: make([]prometheus.Metric, 0, capMetricChan),
exporter: NewExporter(logger, &conf, &dataSource, version),
}

collectWorker := func() {
for {
_ = level.Debug(logger).Log("msg", "Fetching for handler", "handler", "/"+urlPath)

readMetrics(fetcher)

// _ = level.Debug(logger).Log("msg", "Finished fetching for handler", "handler", "/"+urlPath)
time.Sleep(conf.PrefetchInterval)
}
}

go collectWorker()

return fetcher
}

type AsyncFetcher struct {
mutex sync.Mutex
dataSource []DataSource
conf Config
logger log.Logger
metrics []prometheus.Metric
exporter *Exporter
}

func readMetrics(f *AsyncFetcher) {
var metricsChan = make(chan prometheus.Metric, capMetricChan)
var wg sync.WaitGroup
wg.Add(1)

collectWorker := func() {
f.exporter.Collect(metricsChan)
wg.Done()
}
go collectWorker()

go func() {
wg.Wait()
close(metricsChan)
}()

// Drain checkedMetricChan and uncheckedMetricChan in case of premature return.
defer func() {
if metricsChan != nil {
for range metricsChan {
}
}
}()

// read from chanel until the channel is closed
metrics := make([]prometheus.Metric, 0, capMetricChan)
for {
metric, ok := <-metricsChan
if !ok {
break
}
metrics = append(metrics, metric)
}

f.mutex.Lock()
f.metrics = metrics
f.mutex.Unlock()
}

func (f *AsyncFetcher) Describe(descs chan<- *prometheus.Desc) {
f.exporter.Describe(descs)
}

func (f *AsyncFetcher) Collect(metrics chan<- prometheus.Metric) {
f.mutex.Lock()
for _, metric := range f.metrics {
metrics <- metric
}
f.mutex.Unlock()
}
41 changes: 30 additions & 11 deletions exporter/config.struct.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,18 @@ import (

// Collection of configs
type Config struct {
ListenAddr string
EnableTLS bool
Certificate string
PrivateKey string
ScrapeURI string
Username string
Password string
DefaultVpn string
SslVerify bool
useSystemProxy bool
Timeout time.Duration
ListenAddr string
EnableTLS bool
Certificate string
PrivateKey string
ScrapeURI string
Username string
Password string
DefaultVpn string
SslVerify bool
useSystemProxy bool
Timeout time.Duration
PrefetchInterval time.Duration
}

// getListenURI returns the `listenAddr` with proper protocol (http/https),
Expand Down Expand Up @@ -91,6 +92,10 @@ func ParseConfig(configFile string) (map[string][]DataSource, *Config, error) {
if err != nil {
return nil, nil, err
}
conf.PrefetchInterval, err = parseConfigDurationOptional(cfg, "solace", "prefetchInterval", "PREFETCH_INTERVAL")
if err != nil {
return nil, nil, err
}
conf.SslVerify, err = parseConfigBool(cfg, "solace", "sslVerify", "SOLACE_SSL_VERIFY")
if err != nil {
return nil, nil, err
Expand Down Expand Up @@ -146,6 +151,20 @@ func parseConfigBool(cfg *ini.File, iniSection string, iniKey string, envKey str
return val, nil
}

func parseConfigDurationOptional(cfg *ini.File, iniSection string, iniKey string, envKey string) (time.Duration, error) {
s, err := parseConfigString(cfg, iniSection, iniKey, envKey)
if err != nil {
return time.Duration(0), nil
}

val, err := time.ParseDuration(s)
if err != nil {
return 0, fmt.Errorf("config param %q and env param %q is mandetory. Both are missing: %w", iniKey, envKey, err)
}

return val, nil
}

func parseConfigDuration(cfg *ini.File, iniSection string, iniKey string, envKey string) (time.Duration, error) {
s, err := parseConfigString(cfg, iniSection, iniKey, envKey)
if err != nil {
Expand Down
24 changes: 22 additions & 2 deletions semp/getQueueStatsSemp1.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"errors"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"io"
"time"
)

// Get rates for each individual queue of all vpn's
Expand Down Expand Up @@ -53,9 +55,11 @@ func (e *Semp) GetQueueStatsSemp1(ch chan<- prometheus.Metric, vpnFilter string,
} `xml:"execute-result"`
}

var page = 1
var lastQueueName = ""
for nextRequest := "<rpc><show><queue><name>" + itemFilter + "</name><vpn-name>" + vpnFilter + "</vpn-name><stats/><count/><num-elements>100</num-elements></queue></show></rpc>"; nextRequest != ""; {
body, err := e.postHTTP(e.brokerURI+"/SEMP", "application/xml", nextRequest)
body, err := postHTTP(e, nextRequest, &page)

if err != nil {
_ = level.Error(e.logger).Log("msg", "Can't scrape QueueStatsSemp1", "err", err, "broker", e.brokerURI)
return 0, err
Expand Down Expand Up @@ -96,8 +100,24 @@ func (e *Semp) GetQueueStatsSemp1(ch chan<- prometheus.Metric, vpnFilter string,
ch <- prometheus.MustNewConstMetric(MetricDesc["QueueStats"]["messages_max_redelivered_dmq"], prometheus.GaugeValue, queue.Stats.MessageSpoolStats.MaxRedeliveryDmq, queue.Info.MsgVpnName, queue.QueueName)
ch <- prometheus.MustNewConstMetric(MetricDesc["QueueStats"]["messages_max_redelivered_dmq_failed"], prometheus.GaugeValue, queue.Stats.MessageSpoolStats.MaxRedeliveryDmqFailed, queue.Info.MsgVpnName, queue.QueueName)
}
body.Close()
_ = body.Close()
}

return 1, nil
}

func postHTTP(e *Semp, nextRequest string, page *int) (io.ReadCloser, error) {
start := time.Now()
body, err := e.postHTTP(e.brokerURI+"/SEMP", "application/xml", nextRequest)

// 1sec
const longQuery time.Duration = 1 * 1000 * 1000 * 1000

var queryDuration = time.Since(start)
if queryDuration > longQuery {
_ = level.Warn(e.logger).Log("msg", "Scraped QueueStatsSemp1 but this took very long. Please add more cpu to your broker. Otherwise you are about to harm your broker.", "page", page, "duration", queryDuration)
}
_ = level.Debug(e.logger).Log("msg", "Scrape QueueStatsSemp1", "page", page, "duration", queryDuration)
(*page)++
return body, err
}
26 changes: 21 additions & 5 deletions solace_prometheus_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,18 @@ func main() {
})

declareHandlerFromConfig := func(urlPath string, dataSource []exporter.DataSource) {
level.Info(logger).Log("msg", "Register handler from config", "handler", "/"+urlPath, "dataSource", logDataSource(dataSource))
http.HandleFunc("/"+urlPath, func(w http.ResponseWriter, r *http.Request) {
doHandle(w, r, dataSource, *conf, logger)
})
_ = level.Info(logger).Log("msg", "Register handler from config", "handler", "/"+urlPath, "dataSource", logDataSource(dataSource))

if conf.PrefetchInterval.Seconds() > 0 {
var asyncFetcher = exporter.NewAsyncFetcher(urlPath, dataSource, *conf, logger, version)
http.HandleFunc("/"+urlPath, func(w http.ResponseWriter, r *http.Request) {
doHandleAsync(w, r, asyncFetcher, *conf, logger)
})
} else {
http.HandleFunc("/"+urlPath, func(w http.ResponseWriter, r *http.Request) {
doHandle(w, r, dataSource, *conf, logger)
})
}
}
for urlPath, dataSource := range endpoints {
declareHandlerFromConfig(urlPath, dataSource)
Expand Down Expand Up @@ -225,8 +233,16 @@ func main() {

}

func doHandle(w http.ResponseWriter, r *http.Request, dataSource []exporter.DataSource, conf exporter.Config, logger log.Logger) (resultCode string) {
func doHandleAsync(w http.ResponseWriter, r *http.Request, asyncFetcher *exporter.AsyncFetcher, conf exporter.Config, logger log.Logger) (resultCode string) {
registry := prometheus.NewRegistry()
registry.MustRegister(asyncFetcher)
handler := promhttp.HandlerFor(registry, promhttp.HandlerOpts{})
handler.ServeHTTP(w, r)

return w.Header().Get("status")
}

func doHandle(w http.ResponseWriter, r *http.Request, dataSource []exporter.DataSource, conf exporter.Config, logger log.Logger) (resultCode string) {
if dataSource == nil {
handler := promhttp.Handler()
handler.ServeHTTP(w, r)
Expand Down

0 comments on commit 742b762

Please sign in to comment.