Skip to content

Commit

Permalink
Use errors.Join instead of multierror in sdk/metric
Browse files Browse the repository at this point in the history
  • Loading branch information
MrAlias committed Oct 21, 2024
1 parent d9e789a commit 1c6798f
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 53 deletions.
15 changes: 7 additions & 8 deletions sdk/metric/meter.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,21 +442,21 @@ func (m *meter) RegisterCallback(f metric.Callback, insts ...metric.Observable)
}

reg := newObserver()
var errs multierror
var err error
for _, inst := range insts {
switch o := inst.(type) {
case int64Observable:
if err := o.registerable(m); err != nil {
if !errors.Is(err, errEmptyAgg) {
errs.append(err)
if e := o.registerable(m); e != nil {
if !errors.Is(e, errEmptyAgg) {
err = errors.Join(err, e)
}
continue
}
reg.registerInt64(o.observableID)
case float64Observable:
if err := o.registerable(m); err != nil {
if !errors.Is(err, errEmptyAgg) {
errs.append(err)
if e := o.registerable(m); e != nil {
if !errors.Is(e, errEmptyAgg) {
err = errors.Join(err, e)
}
continue
}
Expand All @@ -467,7 +467,6 @@ func (m *meter) RegisterCallback(f metric.Callback, insts ...metric.Observable)
}
}

err := errs.errorOrNil()
if reg.len() == 0 {
// All insts use drop aggregation or are invalid.
return noopRegister{}, err
Expand Down
77 changes: 32 additions & 45 deletions sdk/metric/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"context"
"errors"
"fmt"
"strings"
"sync"
"sync/atomic"

Expand Down Expand Up @@ -108,11 +107,11 @@ func (p *pipeline) produce(ctx context.Context, rm *metricdata.ResourceMetrics)
p.Lock()
defer p.Unlock()

var errs multierror
var err error
for _, c := range p.callbacks {
// TODO make the callbacks parallel. ( #3034 )
if err := c(ctx); err != nil {
errs.append(err)
if e := c(ctx); e != nil {
err = errors.Join(err, e)
}
if err := ctx.Err(); err != nil {
rm.Resource = nil
Expand All @@ -123,8 +122,8 @@ func (p *pipeline) produce(ctx context.Context, rm *metricdata.ResourceMetrics)
for e := p.multiCallbacks.Front(); e != nil; e = e.Next() {
// TODO make the callbacks parallel. ( #3034 )
f := e.Value.(multiCallback)
if err := f(ctx); err != nil {
errs.append(err)
if e := f(ctx); e != nil {
err = errors.Join(err, e)
}
if err := ctx.Err(); err != nil {
// This means the context expired before we finished running callbacks.
Expand Down Expand Up @@ -160,7 +159,7 @@ func (p *pipeline) produce(ctx context.Context, rm *metricdata.ResourceMetrics)

rm.ScopeMetrics = rm.ScopeMetrics[:i]

return errs.errorOrNil()
return err
}

// inserter facilitates inserting of new instruments from a single scope into a
Expand Down Expand Up @@ -222,17 +221,17 @@ func (i *inserter[N]) Instrument(inst Instrument, readerAggregation Aggregation)
measures []aggregate.Measure[N]
)

errs := &multierror{wrapped: errCreatingAggregators}
var err error
seen := make(map[uint64]struct{})
for _, v := range i.pipeline.views {
stream, match := v(inst)
if !match {
continue
}
matched = true
in, id, err := i.cachedAggregator(inst.Scope, inst.Kind, stream, readerAggregation)
if err != nil {
errs.append(err)
in, id, e := i.cachedAggregator(inst.Scope, inst.Kind, stream, readerAggregation)
if e != nil {
err = errors.Join(err, e)
}
if in == nil { // Drop aggregation.
continue
Expand All @@ -245,8 +244,12 @@ func (i *inserter[N]) Instrument(inst Instrument, readerAggregation Aggregation)
measures = append(measures, in)
}

if err != nil {
err = errors.Join(errCreatingAggregators, err)
}

if matched {
return measures, errs.errorOrNil()
return measures, err
}

// Apply implicit default view if no explicit matched.
Expand All @@ -255,15 +258,18 @@ func (i *inserter[N]) Instrument(inst Instrument, readerAggregation Aggregation)
Description: inst.Description,
Unit: inst.Unit,
}
in, _, err := i.cachedAggregator(inst.Scope, inst.Kind, stream, readerAggregation)
if err != nil {
errs.append(err)
in, _, e := i.cachedAggregator(inst.Scope, inst.Kind, stream, readerAggregation)
if e != nil {
if err == nil {
err = errCreatingAggregators
}
err = errors.Join(err, e)
}
if in != nil {
// Ensured to have not seen given matched was false.
measures = append(measures, in)
}
return measures, errs.errorOrNil()
return measures, err
}

// addCallback registers a single instrument callback to be run when
Expand Down Expand Up @@ -608,15 +614,15 @@ func newResolver[N int64 | float64](p pipelines, vc *cache[string, instID]) reso
func (r resolver[N]) Aggregators(id Instrument) ([]aggregate.Measure[N], error) {
var measures []aggregate.Measure[N]

errs := &multierror{}
var err error
for _, i := range r.inserters {
in, err := i.Instrument(id, i.readerDefaultAggregation(id.Kind))
if err != nil {
errs.append(err)
in, e := i.Instrument(id, i.readerDefaultAggregation(id.Kind))
if e != nil {
err = errors.Join(err, e)
}
measures = append(measures, in...)
}
return measures, errs.errorOrNil()
return measures, err
}

// HistogramAggregators returns the histogram Aggregators that must be updated by the instrument
Expand All @@ -625,37 +631,18 @@ func (r resolver[N]) Aggregators(id Instrument) ([]aggregate.Measure[N], error)
func (r resolver[N]) HistogramAggregators(id Instrument, boundaries []float64) ([]aggregate.Measure[N], error) {
var measures []aggregate.Measure[N]

errs := &multierror{}
var err error
for _, i := range r.inserters {
agg := i.readerDefaultAggregation(id.Kind)
if histAgg, ok := agg.(AggregationExplicitBucketHistogram); ok && len(boundaries) > 0 {
histAgg.Boundaries = boundaries
agg = histAgg
}
in, err := i.Instrument(id, agg)
if err != nil {
errs.append(err)
in, e := i.Instrument(id, agg)
if e != nil {
err = errors.Join(err, e)
}
measures = append(measures, in...)
}
return measures, errs.errorOrNil()
}

type multierror struct {
wrapped error
errors []string
}

func (m *multierror) errorOrNil() error {
if len(m.errors) == 0 {
return nil
}
if m.wrapped == nil {
return errors.New(strings.Join(m.errors, "; "))
}
return fmt.Errorf("%w: %s", m.wrapped, strings.Join(m.errors, "; "))
}

func (m *multierror) append(err error) {
m.errors = append(m.errors, err.Error())
return measures, err
}

0 comments on commit 1c6798f

Please sign in to comment.