This repository has been archived by the owner on Sep 8, 2020. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathhelper.go
154 lines (140 loc) · 4.01 KB
/
helper.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
package main
import (
"expvar"
"github.com/uber-go/zap"
"sync"
"sync/atomic"
"time"
"unsafe"
)
var logger zap.Logger
// TODO: Fix config variable names
// TODO: Make use of config files
// Config is a structure with general configuration data
var Config = struct {
Senders int
Endpoint string
GraphiteDB string
GraphiteTreeDB string
Interval time.Duration
ClickhouseSendInterval int
QueueLimitElements int
GraphiteHost string
ResetMetrics bool
}{
Senders: 6,
Endpoint: "http://localhost:8123",
GraphiteDB: "graphite",
GraphiteTreeDB: "graphite_tree",
Interval: 60 * time.Second,
ClickhouseSendInterval: 10000,
QueueLimitElements: 3000000,
GraphiteHost: "",
ResetMetrics: false,
}
// Metrics is a structure that store all internal metrics
var Metrics = struct {
MetricsReceived *expvar.Int
MetricsSent *expvar.Int
MetricsErrors *expvar.Int
MetricsDropped *expvar.Int
ReceiveErrors *expvar.Int
ParseErrors *expvar.Int
QueueErrors *expvar.Int
SendErrors *expvar.Int
SendTimeNS *expvar.Int
SendRequests *expvar.Int
QueueSize *expvar.Int
TreeUpdates *expvar.Int
TreeUpdateErrors *expvar.Int
TreeUpdateRequests *expvar.Int
}{
MetricsReceived: expvar.NewInt("metrics_received"),
MetricsSent: expvar.NewInt("metrics_sent"),
MetricsErrors: expvar.NewInt("metrics_errors"),
MetricsDropped: expvar.NewInt("metrics_dropped"),
ParseErrors: expvar.NewInt("parse_errors"),
ReceiveErrors: expvar.NewInt("receive_errors"),
QueueErrors: expvar.NewInt("queue_errors"),
SendErrors: expvar.NewInt("send_errors"),
SendTimeNS: expvar.NewInt("send_time_ns"),
SendRequests: expvar.NewInt("send_requests"),
QueueSize: expvar.NewInt("queue_size"),
TreeUpdates: expvar.NewInt("tree_updates"),
TreeUpdateErrors: expvar.NewInt("tree_update_errors"),
TreeUpdateRequests: expvar.NewInt("tree_update_requests"),
}
func unsafeString(b []byte) string {
return *(*string)(unsafe.Pointer(&b))
}
type queue struct {
sync.RWMutex
data [][]byte
}
var queues []queue
var writerTime uint32
func timeUpdater(exit <-chan struct{}) {
tick := time.Tick(1 * time.Second)
ticks := 0
for {
select {
case <-exit:
return
case <-tick:
if ticks < 60*60 {
atomic.AddUint32(&writerTime, 1)
} else {
currentTime := uint32(time.Now().Unix())
atomic.StoreUint32(&writerTime, currentTime)
ticks = 0
}
ticks++
}
}
}
func flushMetrics() {
Metrics.MetricsReceived.Set(0)
Metrics.MetricsErrors.Set(0)
Metrics.MetricsDropped.Set(0)
Metrics.ParseErrors.Set(0)
Metrics.QueueErrors.Set(0)
Metrics.ReceiveErrors.Set(0)
Metrics.SendErrors.Set(0)
Metrics.SendRequests.Set(0)
Metrics.MetricsSent.Set(0)
Metrics.SendTimeNS.Set(0)
}
func stats(exit <-chan struct{}) {
tick := time.Tick(Config.Interval)
for {
select {
case <-exit:
return
case <-tick:
cnt := int64(0)
for idx := range queues {
cnt += int64(len(queues[idx].data))
}
Metrics.QueueSize.Set(cnt)
if Config.GraphiteHost == "" {
logger.Info("Stats",
zap.String("metrics_received", Metrics.MetricsReceived.String()),
zap.String("metrics_errors", Metrics.MetricsErrors.String()),
zap.String("metrics_dropped", Metrics.MetricsDropped.String()),
zap.String("metrics_sent", Metrics.MetricsSent.String()),
zap.String("parse_errors", Metrics.ParseErrors.String()),
zap.String("queue_errors", Metrics.QueueErrors.String()),
zap.String("receive_errors", Metrics.ReceiveErrors.String()),
zap.String("send_errors", Metrics.SendErrors.String()),
zap.String("send_requests", Metrics.SendRequests.String()),
zap.String("send_time_ns", Metrics.SendTimeNS.String()),
zap.Int64("queue_size", cnt),
)
}
if Config.ResetMetrics {
flushMetrics()
}
// logfile.Sync()
}
}
}