diff --git a/collectors/monitoring_metrics.go b/collectors/monitoring_metrics.go index a1b0804..68ca73e 100644 --- a/collectors/monitoring_metrics.go +++ b/collectors/monitoring_metrics.go @@ -90,7 +90,7 @@ type ConstMetric struct { type HistogramMetric struct { FqName string LabelKeys []string - Mean float64 + Sum float64 Count uint64 Buckets map[float64]uint64 LabelValues []string @@ -100,15 +100,26 @@ type HistogramMetric struct { KeysHash uint64 } +func (h *HistogramMetric) MergeHistogram(other *HistogramMetric) { + // Increment totals based on incoming totals + h.Sum += other.Sum + h.Count += other.Count + + // Merge the buckets from existing in to current + for key, value := range other.Buckets { + h.Buckets[key] += value + } +} + func (t *timeSeriesMetrics) CollectNewConstHistogram(timeSeries *monitoring.TimeSeries, reportTime time.Time, labelKeys []string, dist *monitoring.Distribution, buckets map[float64]uint64, labelValues []string, metricKind string) { fqName := buildFQName(timeSeries) - + histogramSum := dist.Mean * float64(dist.Count) var v HistogramMetric if t.fillMissingLabels || (metricKind == "DELTA" && t.aggregateDeltas) { v = HistogramMetric{ FqName: fqName, LabelKeys: labelKeys, - Mean: dist.Mean, + Sum: histogramSum, Count: uint64(dist.Count), Buckets: buckets, LabelValues: labelValues, @@ -133,16 +144,16 @@ func (t *timeSeriesMetrics) CollectNewConstHistogram(timeSeries *monitoring.Time return } - t.ch <- t.newConstHistogram(fqName, reportTime, labelKeys, dist.Mean, uint64(dist.Count), buckets, labelValues) + t.ch <- t.newConstHistogram(fqName, reportTime, labelKeys, histogramSum, uint64(dist.Count), buckets, labelValues) } -func (t *timeSeriesMetrics) newConstHistogram(fqName string, reportTime time.Time, labelKeys []string, mean float64, count uint64, buckets map[float64]uint64, labelValues []string) prometheus.Metric { +func (t *timeSeriesMetrics) newConstHistogram(fqName string, reportTime time.Time, labelKeys []string, sum float64, count uint64, buckets map[float64]uint64, labelValues []string) prometheus.Metric { return prometheus.NewMetricWithTimestamp( reportTime, prometheus.MustNewConstHistogram( t.newMetricDesc(fqName, labelKeys), count, - mean*float64(count), // Stackdriver does not provide the sum, but we can fake it + sum, buckets, labelValues..., ), @@ -249,7 +260,7 @@ func (t *timeSeriesMetrics) completeHistogramMetrics(histograms map[string][]*Hi } } for _, v := range vs { - t.ch <- t.newConstHistogram(v.FqName, v.ReportTime, v.LabelKeys, v.Mean, v.Count, v.Buckets, v.LabelValues) + t.ch <- t.newConstHistogram(v.FqName, v.ReportTime, v.LabelKeys, v.Sum, v.Count, v.Buckets, v.LabelValues) } } } @@ -314,7 +325,7 @@ func (t *timeSeriesMetrics) completeDeltaHistogramMetrics(reportingStartTime tim collected.FqName, collected.ReportTime, collected.LabelKeys, - collected.Mean, + collected.Sum, collected.Count, collected.Buckets, collected.LabelValues, diff --git a/delta/histogram.go b/delta/histogram.go index 9672ebc..2d8d919 100644 --- a/delta/histogram.go +++ b/delta/histogram.go @@ -75,7 +75,9 @@ func (s *InMemoryHistogramStore) Increment(metricDescriptor *monitoring.MetricDe if existing.ReportTime.Before(currentValue.ReportTime) { level.Debug(s.logger).Log("msg", "Incrementing existing histogram", "fqName", currentValue.FqName, "key", key, "last_reported_time", existing.ReportTime, "incoming_time", currentValue.ReportTime) - entry.Collected[key] = mergeHistograms(existing, currentValue) + currentValue.MergeHistogram(existing) + // Replace the existing histogram by the new one after merging it. + entry.Collected[key] = currentValue return } @@ -101,27 +103,6 @@ func toHistogramKey(hist *collectors.HistogramMetric) uint64 { return h } -func mergeHistograms(existing *collectors.HistogramMetric, current *collectors.HistogramMetric) *collectors.HistogramMetric { - for key, value := range existing.Buckets { - current.Buckets[key] += value - } - - // Calculate a new mean and overall count - mean := existing.Mean - mean += current.Mean - mean /= 2 - - var count uint64 - for _, v := range current.Buckets { - count += v - } - - current.Mean = mean - current.Count = count - - return current -} - func (s *InMemoryHistogramStore) ListMetrics(metricDescriptorName string) []*collectors.HistogramMetric { var output []*collectors.HistogramMetric now := time.Now() @@ -136,7 +117,7 @@ func (s *InMemoryHistogramStore) ListMetrics(metricDescriptorName string) []*col entry.mutex.Lock() defer entry.mutex.Unlock() for key, collected := range entry.Collected { - //Scan and remove metrics which are outside the TTL + // Scan and remove metrics which are outside the TTL if ttlWindowStart.After(collected.CollectionTime) { level.Debug(s.logger).Log("msg", "Deleting histogram entry outside of TTL", "key", key, "fqName", collected.FqName) delete(entry.Collected, key) diff --git a/delta/histogram_test.go b/delta/histogram_test.go index ac0e27f..8c5cd00 100644 --- a/delta/histogram_test.go +++ b/delta/histogram_test.go @@ -29,15 +29,17 @@ var _ = Describe("HistogramStore", func() { var store *delta.InMemoryHistogramStore var histogram *collectors.HistogramMetric descriptor := &monitoring.MetricDescriptor{Name: "This is a metric"} + bucketKey := 1.00000000000000000001 + bucketValue := uint64(1000) BeforeEach(func() { store = delta.NewInMemoryHistogramStore(promlog.New(&promlog.Config{}), time.Minute) histogram = &collectors.HistogramMetric{ FqName: "histogram_name", LabelKeys: []string{"labelKey"}, - Mean: 10, + Sum: 10, Count: 100, - Buckets: map[float64]uint64{1.00000000000000000001: 1000}, + Buckets: map[float64]uint64{bucketKey: bucketValue}, LabelValues: []string{"labelValue"}, ReportTime: time.Now().Truncate(time.Second), CollectionTime: time.Now().Truncate(time.Second), @@ -53,6 +55,34 @@ var _ = Describe("HistogramStore", func() { Expect(metrics[0]).To(Equal(histogram)) }) + It("can merge histograms", func() { + store.Increment(descriptor, histogram) + + // Shallow copy and change report time so they will merge + nextValue := &collectors.HistogramMetric{ + FqName: "histogram_name", + LabelKeys: []string{"labelKey"}, + Sum: 10, + Count: 100, + Buckets: map[float64]uint64{bucketKey: bucketValue}, + LabelValues: []string{"labelValue"}, + ReportTime: time.Now().Truncate(time.Second).Add(time.Second), + CollectionTime: time.Now().Truncate(time.Second), + KeysHash: 8765, + } + + store.Increment(descriptor, nextValue) + + metrics := store.ListMetrics(descriptor.Name) + + Expect(len(metrics)).To(Equal(1)) + histogram := metrics[0] + Expect(histogram.Count).To(Equal(uint64(200))) + Expect(histogram.Sum).To(Equal(20.0)) + Expect(len(histogram.Buckets)).To(Equal(1)) + Expect(histogram.Buckets[bucketKey]).To(Equal(bucketValue * 2)) + }) + It("will remove histograms outside of TTL", func() { histogram.CollectionTime = histogram.CollectionTime.Add(-time.Hour)