Skip to content

Commit

Permalink
Merge pull request #713 from atlassian/hstan/respect-metrics-per-batc…
Browse files Browse the repository at this point in the history
…h-limit

Support batch sending metrics for OTLP backend
  • Loading branch information
hstan authored Jul 30, 2024
2 parents 4e082d4 + 1cb2ff3 commit f714186
Show file tree
Hide file tree
Showing 8 changed files with 190 additions and 61 deletions.
39 changes: 23 additions & 16 deletions pkg/backends/otlp/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ type Backend struct {
logger logrus.FieldLogger
client *http.Client
requestsBufferSem chan struct{}

// metricsPerBatch is the maximum number of metrics to send in a single batch.
metricsPerBatch int
}

var _ gostatsd.Backend = (*Backend)(nil)
Expand Down Expand Up @@ -72,6 +75,7 @@ func NewClientFromViper(v *viper.Viper, logger logrus.FieldLogger, pool *transpo
client: tc.Client,
logger: logger,
requestsBufferSem: make(chan struct{}, cfg.MaxRequests),
metricsPerBatch: cfg.MetricsPerBatch,
}, nil
}

Expand Down Expand Up @@ -115,7 +119,7 @@ func (b *Backend) SendEvent(ctx context.Context, event *gostatsd.Event) error {
}

func (bd *Backend) SendMetricsAsync(ctx context.Context, mm *gostatsd.MetricMap, cb gostatsd.SendCallback) {
group := make(Group)
group := newGroups(bd.metricsPerBatch)

mm.Counters.Each(func(name, _ string, cm gostatsd.Counter) {
resources, attributes := data.SplitMetricTagsByKeysAndConvert(cm.Tags, bd.resourceKeys)
Expand All @@ -140,8 +144,8 @@ func (bd *Backend) SendMetricsAsync(ctx context.Context, mm *gostatsd.MetricMap,
),
)

group.Insert(bd.is, resources, rate)
group.Insert(bd.is, resources, m)
group.insert(bd.is, resources, rate)
group.insert(bd.is, resources, m)
})

mm.Gauges.Each(func(name, _ string, gm gostatsd.Gauge) {
Expand All @@ -157,7 +161,7 @@ func (bd *Backend) SendMetricsAsync(ctx context.Context, mm *gostatsd.MetricMap,
),
)

group.Insert(bd.is, resources, m)
group.insert(bd.is, resources, m)
})

mm.Sets.Each(func(name, _ string, sm gostatsd.Set) {
Expand All @@ -173,7 +177,7 @@ func (bd *Backend) SendMetricsAsync(ctx context.Context, mm *gostatsd.MetricMap,
),
)

group.Insert(bd.is, resources, m)
group.insert(bd.is, resources, m)
})

mm.Timers.Each(func(name, _ string, t gostatsd.Timer) {
Expand All @@ -190,7 +194,7 @@ func (bd *Backend) SendMetricsAsync(ctx context.Context, mm *gostatsd.MetricMap,
} else {
btags.Insert("le", strconv.FormatFloat(float64(boundry), 'f', -1, 64))
}
group.Insert(
group.insert(
bd.is,
resources,
data.NewMetric(fmt.Sprintf("%s.histogram", name)).SetGauge(
Expand Down Expand Up @@ -224,7 +228,7 @@ func (bd *Backend) SendMetricsAsync(ctx context.Context, mm *gostatsd.MetricMap,
if calc.discarded {
continue
}
group.Insert(
group.insert(
bd.is,
resources,
data.NewMetric(fmt.Sprintf("%s.%s", name, calc.suffix)).SetGauge(
Expand All @@ -238,7 +242,7 @@ func (bd *Backend) SendMetricsAsync(ctx context.Context, mm *gostatsd.MetricMap,
}

for _, pct := range t.Percentiles {
group.Insert(bd.is, resources, data.NewMetric(fmt.Sprintf("%s.%s", name, pct.Str)).SetGauge(
group.insert(bd.is, resources, data.NewMetric(fmt.Sprintf("%s.%s", name, pct.Str)).SetGauge(
data.NewGauge(data.NewNumberDataPoint(
uint64(t.Timestamp),
data.WithNumberDataPointMap(attributes),
Expand All @@ -256,20 +260,23 @@ func (bd *Backend) SendMetricsAsync(ctx context.Context, mm *gostatsd.MetricMap,
if len(t.Histogram) != 0 {
opts = append(opts, data.WithHistogramDataPointCumulativeBucketValues(t.Histogram))
}
group.Insert(bd.is, resources, data.NewMetric(name).SetHistogram(
group.insert(bd.is, resources, data.NewMetric(name).SetHistogram(
data.NewHistogram(data.NewHistogramDataPoint(uint64(t.Timestamp), opts...)),
))
}

})

err := bd.postMetrics(ctx, group.Values())
if err != nil {
bd.logger.WithError(err).WithFields(logrus.Fields{
"endpoint": bd.metricsEndpoint,
}).Error("Issues trying to submit data")
var errs error
for _, b := range group.batches {
err := bd.postMetrics(ctx, b.values())
if err != nil {
bd.logger.WithError(err).WithFields(logrus.Fields{
"endpoint": bd.metricsEndpoint,
}).Error("Issues trying to submit data")
errs = multierr.Append(errs, err)
}
}
cb(multierr.Errors(err))
cb(multierr.Errors(errs))
}

func (c *Backend) postMetrics(ctx context.Context, resourceMetrics []data.ResourceMetrics) error {
Expand Down
18 changes: 13 additions & 5 deletions pkg/backends/otlp/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ import (
const (
ConversionAsGauge = "AsGauge"
ConversionAsHistogram = "AsHistogram"

// defaultMetricsPerBatch is the default number of metrics to send in a single batch.
defaultMetricsPerBatch = 1000
)

type Config struct {
Expand All @@ -24,6 +27,8 @@ type Config struct {
LogsEndpoint string `mapstructure:"logs_endpoint"`
// MaxRequests (Optional, default: cpu.count * 2) is the upper limit on the number of inflight requests
MaxRequests int `mapstructure:"max_requests"`
// MetricsPerBatch (Optional, default: 1000) is the maximum number of metrics to send in a single batch.
MetricsPerBatch int `mapstructure:"metrics_per_batch"`
// ResourceKeys (Optional) is used to extract values from provided tags
// to apply to all values within a resource instead within each attribute.
// Strongly encouraged to allow down stream consumers to
Expand Down Expand Up @@ -52,10 +57,11 @@ type Config struct {

func newDefaultConfig() *Config {
return &Config{
Transport: "default",
MaxRequests: runtime.NumCPU() * 2,
Conversion: ConversionAsGauge,
UserAgent: "gostatsd",
Transport: "default",
MaxRequests: runtime.NumCPU() * 2,
MetricsPerBatch: defaultMetricsPerBatch,
Conversion: ConversionAsGauge,
UserAgent: "gostatsd",
}
}

Expand All @@ -65,7 +71,6 @@ func NewConfig(v *viper.Viper) (*Config, error) {
util.GetSubViper(v, BackendName).Unmarshal(cfg),
cfg.Validate(),
)

if err != nil {
return nil, err
}
Expand All @@ -83,6 +88,9 @@ func (c *Config) Validate() (errs error) {
if c.MaxRequests <= 0 {
errs = multierr.Append(errs, errors.New("max request must be a positive value"))
}
if c.MetricsPerBatch <= 0 {
errs = multierr.Append(errs, errors.New("metrics per batch must be a positive value"))
}
if c.Transport == "" {
errs = multierr.Append(errs, errors.New("no transport defined"))
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/backends/otlp/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,14 @@ func TestNewConfig(t *testing.T) {
v.SetDefault("otlp.metrics_endpoint", "http://local/v1/metrics")
v.SetDefault("otlp.logs_endpoint", "http://local/v1/logs")
v.SetDefault("otlp.max_requests", 1)
v.SetDefault("otlp.metrics_per_batch", 999)
return v
}(),
expect: &Config{
MetricsEndpoint: "http://local/v1/metrics",
LogsEndpoint: "http://local/v1/logs",
MaxRequests: 1,
MetricsPerBatch: 999,
Conversion: "AsGauge",
Transport: "default",
UserAgent: "gostatsd",
Expand Down
45 changes: 34 additions & 11 deletions pkg/backends/otlp/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,31 +6,54 @@ import (
"github.com/atlassian/gostatsd/pkg/backends/otlp/internal/data"
)

// Group is used to ensure metrics that have the same resource attributes
type group map[uint64]data.ResourceMetrics

func (g *group) values() []data.ResourceMetrics {
return maps.Values(*g)
}

func (g *group) lenMetrics() int {
count := 0
for _, rm := range *g {
count += rm.CountMetrics()
}
return count
}

// groups is used to ensure metrics that have the same resource attributes
// are grouped together and it uses a fixed values to reduce potential memory
// allocations compared to using a string value
type Group map[uint64]data.ResourceMetrics

func NewGroup() Group {
return make(Group)
type groups struct {
batches []group
metricsInserted int
batchSize int
}

func (g *Group) Values() []data.ResourceMetrics {
return maps.Values(*g)
func newGroups(batchSize int) groups {
return groups{
batches: []group{make(group)},
batchSize: batchSize,
}
}

func (g *Group) Insert(is data.InstrumentationScope, resources data.Map, m data.Metric) {
func (g *groups) insert(is data.InstrumentationScope, resources data.Map, m data.Metric) {
key := resources.Hash()

entry, exist := (*g)[key]
currentBatch := g.batches[len(g.batches)-1]
entry, exist := (currentBatch)[key]
if !exist {
entry = data.NewResourceMetrics(
data.NewResource(
data.WithResourceMap(resources),
),
)
(*g)[key] = entry
(currentBatch)[key] = entry
}

entry.AppendMetric(is, m)
currentBatch[key] = entry

if currentBatch.lenMetrics() >= g.batchSize {

g.batches = append(g.batches, make(group))
}
}
86 changes: 81 additions & 5 deletions pkg/backends/otlp/group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ import (
func TestGroupInsert(t *testing.T) {
t.Parallel()

g := NewGroup()
g := newGroups(1000)

is := data.NewInstrumentationScope("gostatsd/aggregation", "v1.0.0")

g.Insert(
g.insert(
is,
data.NewMap(
data.WithStatsdDelimitedTags(
Expand All @@ -27,7 +27,7 @@ func TestGroupInsert(t *testing.T) {
),
data.NewMetric("my-metric"),
)
g.Insert(
g.insert(
is,
data.NewMap(
data.WithStatsdDelimitedTags(
Expand All @@ -39,7 +39,7 @@ func TestGroupInsert(t *testing.T) {
),
data.NewMetric("my-metric"),
)
g.Insert(
g.insert(
is,
data.NewMap(
data.WithStatsdDelimitedTags(
Expand All @@ -52,5 +52,81 @@ func TestGroupInsert(t *testing.T) {
data.NewMetric("my-metric"),
)

assert.Len(t, g.Values(), 2, "Must have two distinct value")
assert.Len(t, g.batches[0].values(), 2, "Must have two distinct value")
}

func TestGroupBatch(t *testing.T) {
t.Parallel()

for _, tc := range []struct {
name string
batchSize int
metricsAdded map[string][]string // map[resourceMetrics][]metricNames
wantNumOfBatches int
}{
{
name: "no metrics",
batchSize: 2,
metricsAdded: map[string][]string{},
wantNumOfBatches: 1, // Must have at least one batch
},
{
name: "metrics in one resource metrics exceeds batch limit should be split into two groups",
batchSize: 2,
metricsAdded: map[string][]string{
"r1": {"m1", "m2", "m3", "m4", "m5"},
},
wantNumOfBatches: 3,
},
{
name: "metrics in multiple resource metrics doesn't exceeds batch limit stay in one group",
batchSize: 10,
metricsAdded: map[string][]string{
"r1": {"m1", "m2", "m3"},
"r2": {"m1", "m2", "m3"},
"r3": {"m1", "m2", "m3"},
},
wantNumOfBatches: 1,
},
{
name: "should properly split metrics into groups according to batch size",
batchSize: 5,
metricsAdded: map[string][]string{
"r1": {"m1", "m2", "m3", "m4", "m5", "m6", "m7", "m8", "m9", "m10"},
"r2": {"m1"},
"r3": {"m1", "m2", "m3", "m4", "m5"},
"r4": {"m1", "m2", "m3"},
},
wantNumOfBatches: 4,
},
} {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
g := newGroups(tc.batchSize)
is := data.NewInstrumentationScope("gostatsd/aggregation", "v1.0.0")
for rm, metricNames := range tc.metricsAdded {
for _, metricName := range metricNames {
insertMetric(&g, is, rm, metricName)
}
}

assert.Len(t, g.batches, tc.wantNumOfBatches, "Must have %d groups", tc.wantNumOfBatches)
})
}
}

func insertMetric(g *groups, is data.InstrumentationScope, serviceName string, metricName string) {
g.insert(
is,
data.NewMap(
data.WithStatsdDelimitedTags(
[]string{
"service.name:" + serviceName,
"service.region:local",
},
),
),
data.NewMetric(metricName),
)
}
8 changes: 8 additions & 0 deletions pkg/backends/otlp/internal/data/resource_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,11 @@ func (rm ResourceMetrics) AppendMetric(is InstrumentationScope, m Metric) {
}
rm.raw.ScopeMetrics[0].Metrics = append(rm.raw.ScopeMetrics[0].Metrics, m.raw)
}

func (rm ResourceMetrics) CountMetrics() int {
c := 0
for _, sm := range rm.raw.ScopeMetrics {
c += len(sm.Metrics)
}
return c
}
4 changes: 4 additions & 0 deletions pkg/backends/otlp/internal/data/resource_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,14 @@ func TestNewResourceMetrics(t *testing.T) {
NewResource(),
NewScopeMetrics(
NewInstrumentationScope("gostatsd/aggregation", "test"),
NewMetric("a metric"),
NewMetric("another metric"),
),
NewScopeMetrics(
NewInstrumentationScope("gostatsd/aggregation", "test"),
NewMetric("some metric"),
),
)
assert.Len(t, rm.raw.ScopeMetrics, 2, "Must have two scope metrics defined")
assert.Equal(t, 3, rm.CountMetrics())
}
Loading

0 comments on commit f714186

Please sign in to comment.