Skip to content

Commit

Permalink
Simplify the histogram implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
MrAlias committed Jul 26, 2023
1 parent 830fae8 commit bae7b5b
Show file tree
Hide file tree
Showing 3 changed files with 260 additions and 206 deletions.
13 changes: 3 additions & 10 deletions sdk/metric/internal/aggregate/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,19 +127,12 @@ func (b Builder[N]) Sum(monotonic bool) (Measure[N], ComputeAggregation) {
// ExplicitBucketHistogram returns a histogram aggregate function input and
// output.
func (b Builder[N]) ExplicitBucketHistogram(cfg aggregation.ExplicitBucketHistogram, noSum bool) (Measure[N], ComputeAggregation) {
var h aggregator[N]
h := newHistogram[N](cfg, noSum)
switch b.Temporality {
case metricdata.DeltaTemporality:
h = newDeltaHistogram[N](cfg, noSum)
return b.filter(h.measure), h.delta
default:
h = newCumulativeHistogram[N](cfg, noSum)
}
return b.input(h), func(dest *metricdata.Aggregation) int {
// TODO (#4220): optimize memory reuse here.
*dest = h.Aggregation()

hData, _ := (*dest).(metricdata.Histogram[N])
return len(hData.DataPoints)
return b.filter(h.measure), h.cumulative
}
}

Expand Down
141 changes: 62 additions & 79 deletions sdk/metric/internal/aggregate/histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"

import (
"context"
"sort"
"sync"
"time"
Expand Down Expand Up @@ -75,7 +76,7 @@ func newHistValues[N int64 | float64](bounds []float64, noSum bool) *histValues[

// Aggregate records the measurement value, scoped by attr, and aggregates it
// into a histogram.
func (s *histValues[N]) Aggregate(value N, attr attribute.Set) {
func (s *histValues[N]) measure(_ context.Context, value N, attr attribute.Set) {
// This search will return an index in the range [0, len(s.bounds)], where
// it will return len(s.bounds) if value is greater than the last element
// of s.bounds. This aligns with the buckets in that the length of buckets
Expand Down Expand Up @@ -106,111 +107,89 @@ func (s *histValues[N]) Aggregate(value N, attr attribute.Set) {
}
}

// newDeltaHistogram returns an Aggregator that summarizes a set of
// measurements as an histogram. Each histogram is scoped by attributes and
// the aggregation cycle the measurements were made in.
//
// Each aggregation cycle is treated independently. When the returned
// Aggregator's Aggregations method is called it will reset all histogram
// counts to zero.
func newDeltaHistogram[N int64 | float64](cfg aggregation.ExplicitBucketHistogram, noSum bool) aggregator[N] {
return &deltaHistogram[N]{
// newHistogram returns an Aggregator that summarizes a set of measurements as
// an histogram.
func newHistogram[N int64 | float64](cfg aggregation.ExplicitBucketHistogram, noSum bool) *histogram[N] {
return &histogram[N]{
histValues: newHistValues[N](cfg.Boundaries, noSum),
noMinMax: cfg.NoMinMax,
start: now(),
}
}

// deltaHistogram summarizes a set of measurements made in a single
// aggregation cycle as an histogram with explicitly defined buckets.
type deltaHistogram[N int64 | float64] struct {
// histogram summarizes a set of measurements as an histogram with explicitly
// defined buckets.
type histogram[N int64 | float64] struct {
*histValues[N]

noMinMax bool
start time.Time
}

func (s *deltaHistogram[N]) Aggregation() metricdata.Aggregation {
func (s *histogram[N]) delta(dest *metricdata.Aggregation) int {
t := now()

h, _ := (*dest).(metricdata.Histogram[N])
h.Temporality = metricdata.DeltaTemporality

s.valuesMu.Lock()
defer s.valuesMu.Unlock()

if len(s.values) == 0 {
return nil
}

t := now()
// Do not allow modification of our copy of bounds.
bounds := make([]float64, len(s.bounds))
copy(bounds, s.bounds)
h := metricdata.Histogram[N]{
Temporality: metricdata.DeltaTemporality,
DataPoints: make([]metricdata.HistogramDataPoint[N], 0, len(s.values)),
}

n := len(s.values)
hDPts := reset(h.DataPoints, n, n)

var i int
for a, b := range s.values {
hdp := metricdata.HistogramDataPoint[N]{
Attributes: a,
StartTime: s.start,
Time: t,
Count: b.count,
Bounds: bounds,
BucketCounts: b.counts,
}
hDPts[i].Attributes = a
hDPts[i].StartTime = s.start
hDPts[i].Time = t
hDPts[i].Count = b.count
hDPts[i].Bounds = bounds
hDPts[i].BucketCounts = b.counts

if !s.noSum {
hdp.Sum = b.total
hDPts[i].Sum = b.total
}

if !s.noMinMax {
hdp.Min = metricdata.NewExtrema(b.min)
hdp.Max = metricdata.NewExtrema(b.max)
hDPts[i].Min = metricdata.NewExtrema(b.min)
hDPts[i].Max = metricdata.NewExtrema(b.max)
}
h.DataPoints = append(h.DataPoints, hdp)

// Unused attribute sets do not report.
delete(s.values, a)
i++
}
// The delta collection cycle resets.
s.start = t
return h
}

// newCumulativeHistogram returns an Aggregator that summarizes a set of
// measurements as an histogram. Each histogram is scoped by attributes.
//
// Each aggregation cycle builds from the previous, the histogram counts are
// the bucketed counts of all values aggregated since the returned Aggregator
// was created.
func newCumulativeHistogram[N int64 | float64](cfg aggregation.ExplicitBucketHistogram, noSum bool) aggregator[N] {
return &cumulativeHistogram[N]{
histValues: newHistValues[N](cfg.Boundaries, noSum),
noMinMax: cfg.NoMinMax,
start: now(),
}
h.DataPoints = hDPts
*dest = h

return n
}

// cumulativeHistogram summarizes a set of measurements made over all
// aggregation cycles as an histogram with explicitly defined buckets.
type cumulativeHistogram[N int64 | float64] struct {
*histValues[N]
func (s *histogram[N]) cumulative(dest *metricdata.Aggregation) int {
t := now()

noMinMax bool
start time.Time
}
h, _ := (*dest).(metricdata.Histogram[N])
h.Temporality = metricdata.CumulativeTemporality

func (s *cumulativeHistogram[N]) Aggregation() metricdata.Aggregation {
s.valuesMu.Lock()
defer s.valuesMu.Unlock()

if len(s.values) == 0 {
return nil
}

t := now()
// Do not allow modification of our copy of bounds.
bounds := make([]float64, len(s.bounds))
copy(bounds, s.bounds)
h := metricdata.Histogram[N]{
Temporality: metricdata.CumulativeTemporality,
DataPoints: make([]metricdata.HistogramDataPoint[N], 0, len(s.values)),
}

n := len(s.values)
hDPts := reset(h.DataPoints, n, n)

var i int
for a, b := range s.values {
// The HistogramDataPoint field values returned need to be copies of
// the buckets value as we will keep updating them.
Expand All @@ -220,26 +199,30 @@ func (s *cumulativeHistogram[N]) Aggregation() metricdata.Aggregation {
counts := make([]uint64, len(b.counts))
copy(counts, b.counts)

hdp := metricdata.HistogramDataPoint[N]{
Attributes: a,
StartTime: s.start,
Time: t,
Count: b.count,
Bounds: bounds,
BucketCounts: counts,
}
hDPts[i].Attributes = a
hDPts[i].StartTime = s.start
hDPts[i].Time = t
hDPts[i].Count = b.count
hDPts[i].Bounds = bounds
hDPts[i].BucketCounts = counts

if !s.noSum {
hdp.Sum = b.total
hDPts[i].Sum = b.total
}

if !s.noMinMax {
hdp.Min = metricdata.NewExtrema(b.min)
hdp.Max = metricdata.NewExtrema(b.max)
hDPts[i].Min = metricdata.NewExtrema(b.min)
hDPts[i].Max = metricdata.NewExtrema(b.max)
}
h.DataPoints = append(h.DataPoints, hdp)
i++
// TODO (#3006): This will use an unbounded amount of memory if there
// are unbounded number of attribute sets being aggregated. Attribute
// sets that become "stale" need to be forgotten so this will not
// overload the system.
}
return h

h.DataPoints = hDPts
*dest = h

return n
}
Loading

0 comments on commit bae7b5b

Please sign in to comment.