forked from facebookarchive/flashback
-
Notifications
You must be signed in to change notification settings - Fork 0
/
stats_analyser.go
193 lines (171 loc) · 5.11 KB
/
stats_analyser.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
package flashback
import (
"sort"
"time"
)
var (
latencyPercentiles = []int{50, 60, 70, 80, 90, 95, 99, 100}
emptyLatencies = make([]int64, len(latencyPercentiles))
)
// Percentiles
const (
P50 = iota
P60 = iota
P70 = iota
P80 = iota
P90 = iota
P95 = iota
P99 = iota
P100 = iota
)
func NewStatsAnalyzer(
statsCollectors []*StatsCollector,
opsExecuted *int64,
latencyChan chan Latency,
latenciesSize int) *StatsAnalyzer {
latencies := map[OpType][]int64{}
lastEndPos := map[OpType]int{}
counts := make(map[OpType]int64)
countsLast := make(map[OpType]int64)
for _, opType := range AllOpTypes {
latencies[opType] = make([]int64, 0, latenciesSize)
lastEndPos[opType] = 0
}
go func() {
for {
op, ok := <-latencyChan
if !ok {
break
}
latencies[op.OpType] = append(
latencies[op.OpType], int64(op.Latency),
)
}
}()
return &StatsAnalyzer{
statsCollectors: statsCollectors,
opsExecuted: opsExecuted,
opsExecutedLast: 0,
latencyChan: latencyChan,
latencies: latencies,
epoch: time.Now(),
timeLast: time.Now(),
lastEndPos: lastEndPos,
counts: counts,
countsLast: countsLast,
}
}
// ExecutionStatus encapsulates the aggregated information for the execution
type ExecutionStatus struct {
OpsExecuted int64
OpsExecutedLast int64
// OpsPerSec stores ops/sec averaged across the entire workload
OpsPerSec float64
// OpsPerSecLast stores the ops/sec since the last call to GetStatus()
OpsPerSecLast float64
Duration time.Duration
AllTimeLatencies map[OpType][]int64
SinceLastLatencies map[OpType][]int64
Counts map[OpType]int64
CountsLast map[OpType]int64
TypeOpsSec map[OpType]float64
TypeOpsSecLast map[OpType]float64
}
type StatsAnalyzer struct {
statsCollectors []*StatsCollector
// store total ops executed during the run
opsExecuted *int64
// store ops executed at the time of the last GetStatus() call
opsExecutedLast int64
latencyChan chan Latency
latencies map[OpType][]int64
// Store the start of the run
epoch time.Time
// Store the time of the last GetStatus() call
timeLast time.Time
lastEndPos map[OpType]int
counts map[OpType]int64
countsLast map[OpType]int64
}
func (s *StatsAnalyzer) GetStatus() *ExecutionStatus {
// Basics
duration := time.Now().Sub(s.epoch)
opsPerSec := 0.0
if duration != 0 {
opsPerSec = float64(*s.opsExecuted) * float64(time.Second) / float64(duration)
}
// Calculate ops/sec since last call to GetStatus()
lastDuration := time.Now().Sub(s.timeLast)
opsPerSecLast := 0.0
if lastDuration != 0 {
opsPerSecLast = float64(*s.opsExecuted-s.opsExecutedLast) * float64(time.Second) / float64(lastDuration)
}
s.timeLast = time.Now()
// Latencies
stats := CombineStats(s.statsCollectors...)
allTimeLatencies := make(map[OpType][]int64)
sinceLastLatencies := make(map[OpType][]int64)
typeOpsSec := make(map[OpType]float64)
typeOpsSecLast := make(map[OpType]float64)
for _, opType := range AllOpTypes {
// take a snapshot of current status since the latency list keeps
// increasing.
length := len(s.latencies[opType])
snapshot := s.latencies[opType][:length]
lastEndPos := s.lastEndPos[opType]
s.lastEndPos[opType] = length
sinceLastLatencies[opType] =
CalculateLatencyStats(snapshot[lastEndPos:])
allTimeLatencies[opType] = CalculateLatencyStats(snapshot)
s.counts[opType] = stats.Count(opType)
typeOpsSec[opType] = 0.0
typeOpsSecLast[opType] = 0.0
if duration != 0 {
typeOpsSec[opType] = float64(s.counts[opType]) * float64(time.Second) / float64(duration)
}
if lastDuration != 0 {
typeOpsSecLast[opType] = float64(s.counts[opType]-s.countsLast[opType]) * float64(time.Second) / float64(lastDuration)
}
}
// have to copy values for countsLast into a new object before returning them
countsLast := make(map[OpType]int64)
for _, opType := range AllOpTypes {
countsLast[opType] = s.countsLast[opType]
}
status := ExecutionStatus{
OpsExecuted: *s.opsExecuted,
OpsExecutedLast: s.opsExecutedLast,
Duration: duration,
OpsPerSec: opsPerSec,
OpsPerSecLast: opsPerSecLast,
AllTimeLatencies: allTimeLatencies,
SinceLastLatencies: sinceLastLatencies,
Counts: s.counts,
CountsLast: countsLast,
TypeOpsSec: typeOpsSec,
TypeOpsSecLast: typeOpsSecLast,
}
// store the latest values in the "last" variables
s.opsExecutedLast = *s.opsExecuted
for _, opType := range AllOpTypes {
s.countsLast[opType] = s.counts[opType]
}
return &status
}
// Sorting facilities
type int64Slice []int64
func (p int64Slice) Len() int { return len(p) }
func (p int64Slice) Less(i, j int) bool { return p[i] < p[j] }
func (p int64Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
func CalculateLatencyStats(latencies []int64) []int64 {
result := make([]int64, 0, len(latencyPercentiles))
length := len(latencies)
if length == 0 {
return emptyLatencies
}
sort.Sort(int64Slice(latencies))
for _, perc := range latencyPercentiles {
result = append(result, latencies[(length-1)*perc/100])
}
return result
}