Skip to content

Commit

Permalink
Added prometheus http metrics: requests, errors and availbility
Browse files Browse the repository at this point in the history
  • Loading branch information
tsvtitan committed Oct 11, 2023
1 parent 1adacc4 commit 41efa6c
Show file tree
Hide file tree
Showing 3 changed files with 230 additions and 28 deletions.
21 changes: 21 additions & 0 deletions plugins/inputs/prometheus_http/counter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package prometheus_http

import "sync/atomic"

// A Counter is a thread-safe counter implementation
type Counter int64

// Incr method increments the counter by some value
func (c *Counter) Incr(val int64) {
atomic.AddInt64((*int64)(c), val)
}

// Reset method resets the counter's value to zero
func (c *Counter) Reset() {
atomic.StoreInt64((*int64)(c), 0)
}

// Value method returns the counter's current value
func (c *Counter) Value() int64 {
return atomic.LoadInt64((*int64)(c))
}
111 changes: 83 additions & 28 deletions plugins/inputs/prometheus_http/prometheus_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,18 +74,23 @@ type PrometheusHttp struct {
HttpPassword string `toml:"http_password"`
Metrics []*PrometheusHttpMetric `toml:"metric"`
Duration config.Duration `toml:"duration"`
Interval config.Duration `toml:"interval"`
From string `toml:"from"`
Timeout config.Duration `toml:"timeout"`
Version string `toml:"version"`
Step string `toml:"step"`
Params string `toml:"params"`
Prefix string `toml:"prefix"`
SkipEmptyTags bool `toml:"skip_empty_tags"`
Files []*PrometheusHttpFile `toml:"file"`
//Availability config.Duration `toml:"availability,omitempty"`
Files []*PrometheusHttpFile `toml:"file"`

Log telegraf.Logger `toml:"-"`
acc telegraf.Accumulator
cache map[uint64]map[string]interface{}
Log telegraf.Logger `toml:"-"`
acc telegraf.Accumulator

requests *RateCounter
errors *RateCounter
cache map[uint64]map[string]interface{}
}

type PrometheusHttpPushFunc = func(when time.Time, tags map[string]string, stamp time.Time, value float64)
Expand All @@ -97,6 +102,8 @@ type PrometheusHttpDatasource interface {
var description = "Collect data from Prometheus http api"
var globalFiles = sync.Map{}

const pluginName = "prometheus_http"

// Description will return a short string to explain what the plugin does.
func (*PrometheusHttp) Description() string {
return description
Expand Down Expand Up @@ -210,17 +217,6 @@ func (p *PrometheusHttp) getTemplateValue(t *toolsRender.TextTemplate, value flo
return f, nil
}

/*func (p *PrometheusHttp) mergeMaps(maps ...map[string]interface{}) map[string]interface{} {
r := make(map[string]interface{})
for _, m := range maps {
for k, v := range m {
r[k] = v
}
}
return r
}*/

func (p *PrometheusHttp) fRenderMetricTag(template string, obj interface{}) interface{} {

t, err := toolsRender.NewTextTemplate(toolsRender.TemplateOptions{
Expand Down Expand Up @@ -441,7 +437,15 @@ func (p *PrometheusHttp) uniqueHash(pm *PrometheusHttpMetric, tgs map[string]str
return byteHash64(byteSha512([]byte(hash)))
}

func (p *PrometheusHttp) setMetrics(w *sync.WaitGroup, pm *PrometheusHttpMetric, ds PrometheusHttpDatasource) {
func (p *PrometheusHttp) addFields(name string, value interface{}) map[string]interface{} {

m := make(map[string]interface{})
m[name] = value
return m
}

func (p *PrometheusHttp) setMetrics(w *sync.WaitGroup, pm *PrometheusHttpMetric,
ds PrometheusHttpDatasource, callback func(err error)) {

gid := utils.GetRoutineID()
p.Log.Debugf("[%d] %s start gathering %s...", gid, p.Name, pm.Name)
Expand Down Expand Up @@ -479,9 +483,6 @@ func (p *PrometheusHttp) setMetrics(w *sync.WaitGroup, pm *PrometheusHttpMetric,
return
}

fields := make(map[string]interface{})
fields[pm.Name] = v

millis := when.UTC().UnixMilli()
tags := make(map[string]string)
tags["timestamp"] = strconv.Itoa(int(millis))
Expand All @@ -501,7 +502,7 @@ func (p *PrometheusHttp) setMetrics(w *sync.WaitGroup, pm *PrometheusHttpMetric,
p.Log.Debugf("[%d] %s skipped NaN/Inf value for: %v[%v]", gid, p.Name, pm.Name, string(bs))
return
}
p.acc.AddFields(p.Prefix, fields, tags, stamp)
p.acc.AddFields(p.Prefix, p.addFields(pm.Name, v), tags, stamp)
}

if ds == nil {
Expand All @@ -517,13 +518,18 @@ func (p *PrometheusHttp) setMetrics(w *sync.WaitGroup, pm *PrometheusHttpMetric,
if err != nil {
p.Log.Error(err)
}
callback(err)
}
}

func (p *PrometheusHttp) gatherMetrics(gid uint64, ds PrometheusHttpDatasource) error {

var wg sync.WaitGroup

tags := make(map[string]string)
tags[fmt.Sprintf("%s_name", pluginName)] = p.Name
tags[fmt.Sprintf("%s_url", pluginName)] = p.URL

for _, m := range p.Metrics {

if m.Name == "" {
Expand All @@ -533,9 +539,33 @@ func (p *PrometheusHttp) gatherMetrics(gid uint64, ds PrometheusHttpDatasource)
}

wg.Add(1)
go p.setMetrics(&wg, m, ds)

go p.setMetrics(&wg, m, ds, func(err error) {

p.requests.Incr(1)
if err != nil {
p.errors.Incr(1)
}
})
}
wg.Wait()

// availability = (requests - errors) / requests * 100
// availability = (100 - 0) / 100 * 100 = 100%
// availability = (100 - 1) / 100 * 100 = 99%
// availability = (100 - 10) / 100 * 100 = 90%
// availability = (100 - 100) / 100 * 100 = 0%

fields := p.addFields("requests", p.requests.counter.Value())
fields["errors"] = p.errors.counter.Value()

r1 := float64(p.requests.counter.Value())
r2 := float64(p.errors.counter.Value())
if r1 > 0 {
fields["availability"] = (r1 - r2) / r1 * 100
}
p.acc.AddFields(pluginName, fields, tags, time.Now())

return nil
}

Expand Down Expand Up @@ -766,11 +796,7 @@ func (p *PrometheusHttp) Gather(acc telegraf.Accumulator) error {
gid := utils.GetRoutineID()
// Gather data
err := p.gatherMetrics(gid, ds)
if err != nil {
return err
}

return nil
return err
}

func (p *PrometheusHttp) Printf(format string, v ...interface{}) {
Expand All @@ -794,7 +820,7 @@ func (p *PrometheusHttp) Init() error {
p.Step = "60"
}
if p.Prefix == "" {
p.Prefix = "prometheus_http"
p.Prefix = pluginName
}

if len(p.Metrics) == 0 {
Expand All @@ -814,11 +840,40 @@ func (p *PrometheusHttp) Init() error {
p.readFiles(gid, &globalFiles)
}

p.requests = NewRateCounter(time.Duration(p.Interval))
p.errors = NewRateCounter(time.Duration(p.Interval))

return nil
}

/*
type PrometheusHttpInput struct {
TagInclude []string `toml:"taginclude,omitempty"`
common.InputOptions
}
func migrate(tbl *ast.Table) ([]byte, string, error) {
var old PrometheusHttpInput
if err := toml.UnmarshalTable(tbl, &old); err != nil {
return nil, "", err
}
cfg := migrations.CreateTOMLStruct("inputs", "jolokia2_agent")
// Marshal the new configuration
buf, err := toml.Marshal(cfg)
if err != nil {
return nil, "", err
}
buf = append(buf, []byte("\n")...)
// Create the new content to output
return buf, "", nil
}
*/

func init() {
inputs.Add("prometheus_http", func() telegraf.Input {
//migrations.AddPluginMigration(pluginName, migrate)
inputs.Add(pluginName, func() telegraf.Input {
return &PrometheusHttp{}
})
}
126 changes: 126 additions & 0 deletions plugins/inputs/prometheus_http/ratecounter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package prometheus_http

import (
"strconv"
"sync"
"sync/atomic"
"time"
)

// A RateCounter is a thread-safe counter which returns the number of times
// 'Incr' has been called in the last interval
type RateCounter struct {
counter Counter
interval time.Duration
resolution int
partials []Counter
current int32
running int32
onStop func(r *RateCounter)
onStopLock sync.RWMutex
}

// NewRateCounter Constructs a new RateCounter, for the interval provided
func NewRateCounter(intrvl time.Duration) *RateCounter {
ratecounter := &RateCounter{
interval: intrvl,
running: 0,
}

return ratecounter.WithResolution(20)
}

// NewRateCounterWithResolution Constructs a new RateCounter, for the provided interval and resolution
func NewRateCounterWithResolution(intrvl time.Duration, resolution int) *RateCounter {
ratecounter := &RateCounter{
interval: intrvl,
running: 0,
}

return ratecounter.WithResolution(resolution)
}

// WithResolution determines the minimum resolution of this counter, default is 20
func (r *RateCounter) WithResolution(resolution int) *RateCounter {
if resolution < 1 {
panic("RateCounter resolution cannot be less than 1")
}

r.resolution = resolution
r.partials = make([]Counter, resolution)
r.current = 0

return r
}

// OnStop allow to specify a function that will be called each time the counter
// reaches 0. Useful for removing it.
func (r *RateCounter) OnStop(f func(*RateCounter)) {
r.onStopLock.Lock()
r.onStop = f
r.onStopLock.Unlock()
}

func (r *RateCounter) run() {
if ok := atomic.CompareAndSwapInt32(&r.running, 0, 1); !ok {
return
}

go func() {
ticker := time.NewTicker(time.Duration(float64(r.interval) / float64(r.resolution)))

for range ticker.C {
current := atomic.LoadInt32(&r.current)
next := (int(current) + 1) % r.resolution
r.counter.Incr(-1 * r.partials[next].Value())
r.partials[next].Reset()
atomic.CompareAndSwapInt32(&r.current, current, int32(next))
if r.counter.Value() == 0 {
atomic.StoreInt32(&r.running, 0)
ticker.Stop()

r.onStopLock.RLock()
if r.onStop != nil {
r.onStop(r)
}
r.onStopLock.RUnlock()

return
}
}
}()
}

// Incr Add an event into the RateCounter
func (r *RateCounter) Incr(val int64) {
r.counter.Incr(val)
r.partials[atomic.LoadInt32(&r.current)].Incr(val)
r.run()
}

// Rate Return the current number of events in the last interval
func (r *RateCounter) Rate() int64 {
return r.counter.Value()
}

// MaxRate counts the maximum instantaneous change in rate.
//
// This is useful to calculate number of events in last period without
// "averaging" effect. i.e. currently if counter is set for 30 seconds
// duration, and events fire 10 times per second, it'll take 30 seconds for
// "Rate" to show 300 (or 10 per second). The "MaxRate" will show 10
// immediately, and it'll stay this way for the next 30 seconds, even if rate
// drops below it.
func (r *RateCounter) MaxRate() int64 {
max := int64(0)
for i := 0; i < r.resolution; i++ {
if value := r.partials[i].Value(); max < value {
max = value
}
}
return max
}

func (r *RateCounter) String() string {
return strconv.FormatInt(r.counter.Value(), 10)
}

0 comments on commit 41efa6c

Please sign in to comment.