This repository has been archived by the owner on Sep 8, 2020. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmetrics_tree.go
122 lines (111 loc) · 3.11 KB
/
metrics_tree.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
package main
import (
"bytes"
"github.com/uber-go/zap"
"net/http"
"strconv"
"sync/atomic"
"time"
)
var metricsTreeUpdateQueues []queue
var GraphiteTreeDBEndpoint string
var treeNeedsUpdate int64
func metricsTreeUpdater() {
var updateList [][]byte
var ok bool
header := []byte("insert into " + Config.GraphiteTreeDB + " format TabSeparated\n")
buffer := bytes.NewBuffer(header)
ts := atomic.LoadUint32(&writerTime)
prevTs := ts
date := []byte(time.Unix(int64(ts), 0).Format("2006-01-02"))
client := http.Client{
// TODO: Remove hardcoded sleep time
Timeout: 15 * time.Second,
}
sentNames := 0
for {
haveWork := atomic.CompareAndSwapInt64(&treeNeedsUpdate, 1, 0)
if !haveWork {
time.Sleep(1 * time.Second)
continue
}
for number := range metricsTreeUpdateQueues {
metricsTreeUpdateQueues[number].Lock()
updateList = append(updateList, metricsTreeUpdateQueues[number].data...)
metricsTreeUpdateQueues[number].data = make([][]byte, 0, len(metricsTreeUpdateQueues[number].data))
metricsTreeUpdateQueues[number].Unlock()
}
if len(updateList) == 0 {
// We want faster reaction on updates, so sleep time here is different
// TODO: Remove hardcoded sleep time
time.Sleep(1 * time.Second)
continue
}
logger.Info("metricTreeUpdate: got tree update list", zap.Int("len", len(updateList)))
prefixList := make(map[string]int, 4*len(updateList))
ts = atomic.LoadUint32(&writerTime)
if ts != prevTs {
prevTs = ts
date = []byte(time.Unix(int64(ts), 0).Format("2006-01-02"))
}
for _, metric := range updateList {
level := 1
_, ok = prefixList[string(metric)]
if ok {
continue
}
prefixList[string(metric)] = 1
for idx := range metric {
if metric[idx] == '.' {
if idx != len(metric) {
idx++
}
_, ok = prefixList[string(metric[:idx])]
if ok {
level++
continue
}
// TODO: Generalize this code with a 'buffer.Write' block below
prefixList[string(metric[:idx])] = 1
buffer.Write(date)
buffer.WriteByte('\t')
buffer.Write([]byte(strconv.Itoa(level)))
buffer.WriteByte('\t')
buffer.Write(metric[:idx])
buffer.WriteByte('\n')
level++
sentNames++
}
}
buffer.Write(date)
buffer.WriteByte('\t')
buffer.Write([]byte(strconv.Itoa(level)))
buffer.WriteByte('\t')
buffer.Write(metric)
buffer.WriteByte('\n')
}
updateList = updateList[:0]
err := sendData(&client, GraphiteTreeDBEndpoint, buffer)
if err != nil {
logger.Error("Can't send data to Clickhouse", zap.Error(err))
Metrics.TreeUpdateErrors.Add(1)
} else {
Metrics.TreeUpdateRequests.Add(1)
Metrics.TreeUpdates.Add(int64(sentNames))
sentNames = 0
}
if buffer.Len() > 0 {
// TODO: We should maintain buffer if Clickhouse is not available
logger.Error("Buffer is not empty. Handling this situation is not implemented yet")
buffer.Reset()
buffer.Write(header)
// TODO: Remove hardcoded sleep time
time.Sleep(60 * time.Second)
continue
}
buffer.Write(header)
logger.Info("metricTreeUpdate: done")
// TODO: Remove hardcoded sleep time
time.Sleep(60 * time.Second)
}
}