-
Notifications
You must be signed in to change notification settings - Fork 0
/
prometheus.go
128 lines (106 loc) · 2.86 KB
/
prometheus.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package main
import (
"context"
"github.com/prometheus/client_golang/api"
v1 "github.com/prometheus/client_golang/api/prometheus/v1"
"log"
"time"
)
type Message struct {
content []interface{}
}
type Supplier interface {
Supply(channel chan<- Message)
}
func NewPrometheusSupplier(config PrometheusSupplierConfig) (Supplier, error) {
prometheusClient, err := api.NewClient(api.Config{
Address: config.host,
})
if err != nil {
return nil, err
}
prometheusApiClient := v1.NewAPI(prometheusClient)
return &prometheusSupplier{
api: prometheusApiClient,
pollingInterval: config.pollingInterval,
requestTimeout: config.requestTimeout,
}, nil
}
type prometheusSupplier struct {
api v1.API
requestTimeout time.Duration
pollingInterval time.Duration
}
type PrometheusSupplierConfig struct {
host string
pollingInterval time.Duration
requestTimeout time.Duration
}
func (ps *prometheusSupplier) Supply(mc chan<- Message) {
startTime := time.Now()
for {
time.Sleep(ps.pollingInterval)
// get all labels
timeoutCtx, cancelContext := context.WithTimeout(context.Background(), ps.requestTimeout)
//labelNames, _, err := ps.api.LabelNames(timeoutCtx)
//cancelContext()
//if err != nil {
// log.Println("Could not obtain label names from prometheus:", err)
// continue
//}
//log.Println("Found the following labelNames:", strings.Join(labelNames, "\n"))
// get metric names
timeoutCtx, cancelContext = context.WithTimeout(context.Background(), ps.requestTimeout)
metricNames, _, err := ps.api.LabelValues(timeoutCtx, "__name__")
cancelContext()
if err != nil {
log.Println("Could not obtain metric names from prometheus:", err)
continue
}
//log.Println("Found the following metric names:", metricNames)
// get metric values
endTime := time.Now()
for _, metricName := range metricNames {
//go func() {
// log.Println("Requesting values for ", string(metricName))
timeoutCtx, cancelContext = context.WithTimeout(context.Background(), ps.requestTimeout)
metricValues, _, err := ps.api.QueryRange(timeoutCtx, string(metricName), v1.Range{
Start: startTime,
End: endTime,
Step: 10 * time.Second,
})
cancelContext()
if err != nil {
log.Println("Could not obtain metric values from prometheus:", err)
return
}
metricValues.Type()
mc <- Message{content: []interface{}{metricName, ":", metricValues.Type()},}
//}()
}
startTime = endTime
}
}
type Consumer interface {
Consume(<-chan Message)
}
func NewLoggerConsumer() (Consumer, error) {
return &loggerConsumer{}, nil
}
type loggerConsumer struct {
}
func (lc *loggerConsumer) Consume(mc <-chan Message) {
for {
select {
case m, ok := <-mc:
if !ok {
log.Println("Events channel closed.")
return
}
log.Println(m.content...)
}
}
}
type Mapper interface {
Map()
}