Skip to content

Commit

Permalink
Merge branch 'main' into guard-expo-hist-meas
Browse files Browse the repository at this point in the history
  • Loading branch information
MrAlias committed Aug 17, 2023
2 parents a53995c + 9b47674 commit db94497
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 70 deletions.
104 changes: 43 additions & 61 deletions sdk/metric/internal/aggregate/exponential_histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,48 +38,6 @@ const (
minInt64 int64 = math.MinInt64
)

// expoHistogramValues summarizes a set of measurements as expoHistogramDataPoints using
// dynamically scaled buckets.
type expoHistogramValues[N int64 | float64] struct {
noSum bool
noMinMax bool
maxSize int
maxScale int

values map[attribute.Set]*expoHistogramDataPoint[N]
valuesMu sync.Mutex
}

func newExpoHistValues[N int64 | float64](maxSize, maxScale int, noMinMax, noSum bool) *expoHistogramValues[N] {
return &expoHistogramValues[N]{
noSum: noSum,
noMinMax: noMinMax,
maxSize: maxSize,
maxScale: maxScale,

values: make(map[attribute.Set]*expoHistogramDataPoint[N]),
}
}

// Aggregate records the measurement, scoped by attr, and aggregates it
// into an aggregation.
func (e *expoHistogramValues[N]) measure(_ context.Context, value N, attr attribute.Set) {
// Ignore NaN and infinity.
if math.IsInf(float64(value), 0) || math.IsNaN(float64(value)) {
return
}

e.valuesMu.Lock()
defer e.valuesMu.Unlock()

v, ok := e.values[attr]
if !ok {
v = newExpoHistogramDataPoint[N](e.maxSize, e.maxScale, e.noMinMax, e.noSum)
e.values[attr] = v
}
v.record(value)
}

// expoHistogramDataPoint is a single data point in an exponential histogram.
type expoHistogramDataPoint[N int64 | float64] struct {
count uint64
Expand Down Expand Up @@ -139,7 +97,7 @@ func (p *expoHistogramDataPoint[N]) record(v N) {
return
}

bin := getBin(absV, p.scale)
bin := p.getBin(absV)

bucket := &p.posBuckets
if v < 0 {
Expand All @@ -148,7 +106,7 @@ func (p *expoHistogramDataPoint[N]) record(v N) {

// If the new bin would make the counts larger than maxScale, we need to
// downscale current measurements.
if scaleDelta := scaleChange(bin, bucket.startBin, len(bucket.counts), p.maxSize); scaleDelta > 0 {
if scaleDelta := p.scaleChange(bin, bucket.startBin, len(bucket.counts)); scaleDelta > 0 {
if p.scale-scaleDelta < expoMinScale {
// With a scale of -10 there is only two buckets for the whole range of float64 values.
// This can only happen if there is a max size of 1.
Expand All @@ -160,27 +118,26 @@ func (p *expoHistogramDataPoint[N]) record(v N) {
p.posBuckets.downscale(scaleDelta)
p.negBuckets.downscale(scaleDelta)

bin = getBin(absV, p.scale)
bin = p.getBin(absV)
}

bucket.record(bin)
}

// getBin returns the bin of the bucket that the value v should be recorded
// into at the given scale.
func getBin(v float64, scale int) int {
// getBin returns the bin v should be recorded into.
func (p *expoHistogramDataPoint[N]) getBin(v float64) int {
frac, exp := math.Frexp(v)
if scale <= 0 {
if p.scale <= 0 {
// Because of the choice of fraction is always 1 power of two higher than we want.
correction := 1
if frac == .5 {
// If v is an exact power of two the frac will be .5 and the exp
// will be one higher than we want.
correction = 2
}
return (exp - correction) >> (-scale)
return (exp - correction) >> (-p.scale)
}
return exp<<scale + int(math.Log(frac)*scaleFactors[scale]) - 1
return exp<<p.scale + int(math.Log(frac)*scaleFactors[p.scale]) - 1
}

// scaleFactors are constants used in calculating the logarithm index. They are
Expand Down Expand Up @@ -209,8 +166,9 @@ var scaleFactors = [21]float64{
math.Ldexp(math.Log2E, 20),
}

// scaleChange returns the magnitude of the scale change needed to fit bin in the bucket.
func scaleChange(bin, startBin, length, maxSize int) int {
// scaleChange returns the magnitude of the scale change needed to fit bin in
// the bucket. If no scale change is needed 0 is returned.
func (p *expoHistogramDataPoint[N]) scaleChange(bin, startBin, length int) int {
if length == 0 {
// No need to rescale if there are no buckets.
return 0
Expand All @@ -224,7 +182,7 @@ func scaleChange(bin, startBin, length, maxSize int) int {
}

count := 0
for high-low >= maxSize {
for high-low >= p.maxSize {
low = low >> 1
high = high >> 1
count++
Expand Down Expand Up @@ -332,24 +290,48 @@ func (b *expoBuckets) downscale(delta int) {
// and the aggregation cycle the measurements were made in.
func newExponentialHistogram[N int64 | float64](maxSize, maxScale int32, noMinMax, noSum bool) *expoHistogram[N] {
return &expoHistogram[N]{
expoHistogramValues: newExpoHistValues[N](
int(maxSize),
int(maxScale),
noMinMax,
noSum,
),
noSum: noSum,
noMinMax: noMinMax,
maxSize: int(maxSize),
maxScale: int(maxScale),

values: make(map[attribute.Set]*expoHistogramDataPoint[N]),

start: now(),
}
}

// expoHistogram summarizes a set of measurements as an histogram with exponentially
// defined buckets.
type expoHistogram[N int64 | float64] struct {
*expoHistogramValues[N]
noSum bool
noMinMax bool
maxSize int
maxScale int

values map[attribute.Set]*expoHistogramDataPoint[N]
valuesMu sync.Mutex

start time.Time
}

func (e *expoHistogram[N]) measure(_ context.Context, value N, attr attribute.Set) {
// Ignore NaN and infinity.
if math.IsInf(float64(value), 0) || math.IsNaN(float64(value)) {
return
}

e.valuesMu.Lock()
defer e.valuesMu.Unlock()

v, ok := e.values[attr]
if !ok {
v = newExpoHistogramDataPoint[N](e.maxSize, e.maxScale, e.noMinMax, e.noSum)
e.values[attr] = v
}
v.record(value)
}

func (e *expoHistogram[N]) delta(dest *metricdata.Aggregation) int {
t := now()

Expand Down
19 changes: 10 additions & 9 deletions sdk/metric/internal/aggregate/exponential_histogram_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -657,7 +657,8 @@ func TestScaleChange(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := scaleChange(tt.args.bin, tt.args.startBin, tt.args.length, tt.args.maxSize)
p := newExpoHistogramDataPoint[float64](tt.args.maxSize, 20, false, false)
got := p.scaleChange(tt.args.bin, tt.args.startBin, tt.args.length)
if got != tt.want {
t.Errorf("scaleChange() = %v, want %v", got, tt.want)
}
Expand Down Expand Up @@ -929,15 +930,15 @@ func FuzzGetBin(f *testing.F) {
t.Skip("skipping test for zero")
}

// GetBin is only used with a range of -10 to 20.
scale = (scale%31+31)%31 - 10

got := getBin(v, scale)
if v <= lowerBound(got, scale) {
t.Errorf("v=%x scale =%d had bin %d, but was below lower bound %x", v, scale, got, lowerBound(got, scale))
p := newExpoHistogramDataPoint[float64](4, 20, false, false)
// scale range is -10 to 20.
p.scale = (scale%31+31)%31 - 10
got := p.getBin(v)
if v <= lowerBound(got, p.scale) {
t.Errorf("v=%x scale =%d had bin %d, but was below lower bound %x", v, p.scale, got, lowerBound(got, p.scale))
}
if v > lowerBound(got+1, scale) {
t.Errorf("v=%x scale =%d had bin %d, but was above upper bound %x", v, scale, got, lowerBound(got+1, scale))
if v > lowerBound(got+1, p.scale) {
t.Errorf("v=%x scale =%d had bin %d, but was above upper bound %x", v, p.scale, got, lowerBound(got+1, p.scale))
}
})
}
Expand Down

0 comments on commit db94497

Please sign in to comment.