Skip to content

Commit

Permalink
fix timer count sampling, new flag -persist-timer-counts
Browse files Browse the repository at this point in the history
by putting the sampled count first in the time values array
and with new inactivTimers map

also, BenchmarkManyDifferentSensors should clear the stats maps ...
  • Loading branch information
ploxiln committed Jul 31, 2018
1 parent 9fdc27b commit c572d91
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 11 deletions.
36 changes: 33 additions & 3 deletions statsdaemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ var (
showVersion = flag.Bool("version", false, "print version string")
deleteGauges = flag.Bool("delete-gauges", true, "don't send values to graphite for inactive gauges, as opposed to sending the previous value")
persistCountKeys = flag.Uint("persist-count-keys", 60, "number of flush-intervals to persist count keys (at zero)")
persistTimerKeys = flag.Uint("persist-timer-counts", 0, "number of flush-intervals to persist timer count keys (at zero)")
receiveCounter = flag.String("receive-counter", "", "Metric name for total metrics received per interval")
percentThreshold = Percentiles{}
prefix = flag.String("prefix", "", "Prefix for all stats")
Expand All @@ -116,6 +117,7 @@ var (
timers = make(map[string]Float64Slice)
sets = make(map[string][]string)
inactivCounters = make(map[string]uint)
inactivTimers = make(map[string]uint)
)

func monitor() {
Expand Down Expand Up @@ -144,8 +146,14 @@ func packetHandler(s *Packet) {

switch s.Modifier {
case "ms":
// if missing gets nil []float64, works with append()
timers[s.Bucket] = append(timers[s.Bucket], s.ValFlt)
vals := timers[s.Bucket]
if vals == nil {
vals = make([]float64, 1, 5)
vals[0] = 0.0
}
// first slot is sampled count, following are times
vals[0] += float64(1 / s.Sampling)
timers[s.Bucket] = append(vals, s.ValFlt)
case "g":
var gaugeValue float64
if s.ValStr == "" {
Expand Down Expand Up @@ -290,8 +298,12 @@ func processSets(buffer *bytes.Buffer, now int64) int64 {

func processTimers(buffer *bytes.Buffer, now int64, pctls Percentiles) int64 {
var num int64
persist := *persistTimerKeys

for bucket, timer := range timers {
num++
sampled := timer[0]
timer = timer[1:]

sort.Sort(timer)
min := timer[0]
Expand Down Expand Up @@ -336,13 +348,31 @@ func processTimers(buffer *bytes.Buffer, now int64, pctls Percentiles) int64 {
mean_s := strconv.FormatFloat(mean, 'f', -1, 64)
max_s := strconv.FormatFloat(max, 'f', -1, 64)
min_s := strconv.FormatFloat(min, 'f', -1, 64)
count_s := strconv.FormatFloat(sampled, 'f', -1, 64)

fmt.Fprintf(buffer, "%s%s.mean%s %s %d\n", *prefix, bucket, *postfix, mean_s, now)
fmt.Fprintf(buffer, "%s%s.upper%s %s %d\n", *prefix, bucket, *postfix, max_s, now)
fmt.Fprintf(buffer, "%s%s.lower%s %s %d\n", *prefix, bucket, *postfix, min_s, now)
fmt.Fprintf(buffer, "%s%s.count%s %d %d\n", *prefix, bucket, *postfix, count, now)
fmt.Fprintf(buffer, "%s%s.count%s %s %d\n", *prefix, bucket, *postfix, count_s, now)

delete(timers, bucket)
if persist > 0 {
countKey := fmt.Sprintf("%s%s.count%s", *prefix, bucket, *postfix)
inactivTimers[countKey] = 0
}
}

// continue sending zeros for no-longer-active timer counts for configured flush-intervals
for bucket, purgeCount := range inactivTimers {
if purgeCount > 0 {
fmt.Fprintf(buffer, "%s 0 %d\n", bucket, now)
num++
}
if purgeCount >= persist {
delete(inactivTimers, bucket)
} else {
inactivTimers[bucket] = purgeCount + 1
}
}
return num
}
Expand Down
22 changes: 14 additions & 8 deletions statsdaemon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,13 +458,15 @@ func TestPacketHandlerTimer(t *testing.T) {
Sampling: float32(1),
}
packetHandler(p)
assert.Equal(t, len(timers["glork"]), 1)
assert.Equal(t, timers["glork"][0], float64(320))
assert.Equal(t, len(timers["glork"]), 2)
assert.Equal(t, timers["glork"][0], float64(1))
assert.Equal(t, timers["glork"][1], float64(320))

p.ValFlt = float64(100)
packetHandler(p)
assert.Equal(t, len(timers["glork"]), 2)
assert.Equal(t, timers["glork"][1], float64(100))
assert.Equal(t, len(timers["glork"]), 3)
assert.Equal(t, timers["glork"][0], float64(2))
assert.Equal(t, timers["glork"][2], float64(100))
}

func TestPacketHandlerSet(t *testing.T) {
Expand Down Expand Up @@ -545,7 +547,7 @@ func TestProcessCountersPrefix(t *testing.T) {
func TestProcessTimers(t *testing.T) {
// Some data with expected mean of 20
timers = make(map[string]Float64Slice)
timers["response_time"] = []float64{0, 30, 30}
timers["response_time"] = []float64{3, 0, 30, 30}

now := int64(1418052649)

Expand Down Expand Up @@ -652,7 +654,7 @@ func TestProcessSets(t *testing.T) {
func TestProcessTimersUpperPercentile(t *testing.T) {
// Some data with expected 75% of 2
timers = make(map[string]Float64Slice)
timers["response_time"] = []float64{0, 1, 2, 3}
timers["response_time"] = []float64{4, 0, 1, 2, 3}

now := int64(1418052649)

Expand All @@ -675,7 +677,7 @@ func TestProcessTimersUpperPercentilePostfix(t *testing.T) {
flag.Set("postfix", ".test")
// Some data with expected 75% of 2
timers = make(map[string]Float64Slice)
timers["postfix_response_time"] = []float64{0, 1, 2, 3}
timers["postfix_response_time"] = []float64{4, 0, 1, 2, 3}
now := int64(1418052649)

var buffer bytes.Buffer
Expand All @@ -697,7 +699,7 @@ func TestProcessTimersUpperPercentilePostfix(t *testing.T) {

func TestProcessTimesLowerPercentile(t *testing.T) {
timers = make(map[string]Float64Slice)
timers["time"] = []float64{0, 1, 2, 3}
timers["time"] = []float64{4, 0, 1, 2, 3}

now := int64(1418052649)

Expand Down Expand Up @@ -769,7 +771,11 @@ func TestMultipleUDPSends(t *testing.T) {
}

func BenchmarkManyDifferentSensors(t *testing.B) {
counters = make(map[string]float64)
gauges = make(map[string]float64)
timers = make(map[string]Float64Slice)
r := rand.New(rand.NewSource(438))

for i := 0; i < 1000; i++ {
bucket := "response_time" + strconv.Itoa(i)
for i := 0; i < 10000; i++ {
Expand Down

0 comments on commit c572d91

Please sign in to comment.