Skip to content

Commit

Permalink
rough implementation of batching
Browse files Browse the repository at this point in the history
  • Loading branch information
hstan committed Jul 29, 2024
1 parent 67c82ad commit ebdb1bf
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 20 deletions.
21 changes: 12 additions & 9 deletions pkg/backends/otlp/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type Backend struct {
requestsBufferSem chan struct{}

// metricsPerBatch is the maximum number of metrics to send in a single batch.
metricsPerBatch uint
metricsPerBatch int
}

var _ gostatsd.Backend = (*Backend)(nil)
Expand Down Expand Up @@ -119,7 +119,7 @@ func (b *Backend) SendEvent(ctx context.Context, event *gostatsd.Event) error {
}

func (bd *Backend) SendMetricsAsync(ctx context.Context, mm *gostatsd.MetricMap, cb gostatsd.SendCallback) {
group := make(Group)
group := NewGroup(bd.metricsPerBatch)

mm.Counters.Each(func(name, _ string, cm gostatsd.Counter) {
resources, attributes := data.SplitMetricTagsByKeysAndConvert(cm.Tags, bd.resourceKeys)
Expand Down Expand Up @@ -264,16 +264,19 @@ func (bd *Backend) SendMetricsAsync(ctx context.Context, mm *gostatsd.MetricMap,
data.NewHistogram(data.NewHistogramDataPoint(uint64(t.Timestamp), opts...)),
))
}

})

err := bd.postMetrics(ctx, group.Values())
if err != nil {
bd.logger.WithError(err).WithFields(logrus.Fields{
"endpoint": bd.metricsEndpoint,
}).Error("Issues trying to submit data")
var errs error
for _, b := range group.batches {
err := bd.postMetrics(ctx, b.Values())
if err != nil {
bd.logger.WithError(err).WithFields(logrus.Fields{
"endpoint": bd.metricsEndpoint,
}).Error("Issues trying to submit data")
errs = multierr.Append(errs, err)
}
}
cb(multierr.Errors(err))
cb(multierr.Errors(errs))
}

func (c *Backend) postMetrics(ctx context.Context, resourceMetrics []data.ResourceMetrics) error {
Expand Down
3 changes: 1 addition & 2 deletions pkg/backends/otlp/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type Config struct {
// MaxRequests (Optional, default: cpu.count * 2) is the upper limit on the number of inflight requests
MaxRequests int `mapstructure:"max_requests"`
// MetricsPerBatch (Optional, default: 1000) is the maximum number of metrics to send in a single batch.
MetricsPerBatch uint `mapstructure:"metrics_per_batch"`
MetricsPerBatch int `mapstructure:"metrics_per_batch"`
// ResourceKeys (Optional) is used to extract values from provided tags
// to apply to all values within a resource instead within each attribute.
// Strongly encouraged to allow down stream consumers to
Expand Down Expand Up @@ -71,7 +71,6 @@ func NewConfig(v *viper.Viper) (*Config, error) {
util.GetSubViper(v, BackendName).Unmarshal(cfg),
cfg.Validate(),
)

if err != nil {
return nil, err
}
Expand Down
28 changes: 21 additions & 7 deletions pkg/backends/otlp/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,31 +6,45 @@ import (
"github.com/atlassian/gostatsd/pkg/backends/otlp/internal/data"
)

type group map[uint64]data.ResourceMetrics

// Group is used to ensure metrics that have the same resource attributes
// are grouped together and it uses a fixed values to reduce potential memory
// allocations compared to using a string value
type Group map[uint64]data.ResourceMetrics
type Group struct {
batches []group
metricsInserted int
batchSize int
}

func NewGroup() Group {
return make(Group)
func NewGroup(batchSize int) Group {
return Group{
batches: []group{make(group)},
batchSize: batchSize,
}
}

func (g *Group) Values() []data.ResourceMetrics {
func (g *group) Values() []data.ResourceMetrics {
return maps.Values(*g)
}

func (g *Group) Insert(is data.InstrumentationScope, resources data.Map, m data.Metric) {
key := resources.Hash()

entry, exist := (*g)[key]
currentBatch := g.batches[len(g.batches)-1]
entry, exist := (currentBatch)[key]
if !exist {
entry = data.NewResourceMetrics(
data.NewResource(
data.WithResourceMap(resources),
),
)
(*g)[key] = entry
(currentBatch)[key] = entry
}

entry.AppendMetric(is, m)
currentBatch[key] = entry

if len(currentBatch) == g.batchSize {
g.batches = append(g.batches, make(group))
}
}
52 changes: 50 additions & 2 deletions pkg/backends/otlp/group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
func TestGroupInsert(t *testing.T) {
t.Parallel()

g := NewGroup()
g := NewGroup(1000)

is := data.NewInstrumentationScope("gostatsd/aggregation", "v1.0.0")

Expand Down Expand Up @@ -52,5 +52,53 @@ func TestGroupInsert(t *testing.T) {
data.NewMetric("my-metric"),
)

assert.Len(t, g.Values(), 2, "Must have two distinct value")
assert.Len(t, g.batches[0].Values(), 2, "Must have two distinct value")
}

func TestGroupBatch(t *testing.T) {
t.Parallel()

g := NewGroup(2)

is := data.NewInstrumentationScope("gostatsd/aggregation", "v1.0.0")

g.Insert(
is,
data.NewMap(
data.WithStatsdDelimitedTags(
[]string{
"service.name:my-awesome-service",
"service.region:local",
},
),
),
data.NewMetric("my-metric"),
)
g.Insert(
is,
data.NewMap(
data.WithStatsdDelimitedTags(
[]string{
"service.name:my-awesome-service",
"service.region:local",
},
),
),
data.NewMetric("my-metric"),
)
g.Insert(
is,
data.NewMap(
data.WithStatsdDelimitedTags(
[]string{
"service.name:my-other-service",
"service.region:local",
},
),
),
data.NewMetric("my-metric"),
)

batches := g.batches
assert.Equal(t, 2, len(batches))
}
8 changes: 8 additions & 0 deletions pkg/backends/otlp/internal/data/resource_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,11 @@ func (rm ResourceMetrics) AppendMetric(is InstrumentationScope, m Metric) {
}
rm.raw.ScopeMetrics[0].Metrics = append(rm.raw.ScopeMetrics[0].Metrics, m.raw)
}

func (rm ResourceMetrics) CouneMetrics() int {
c := 0
for _, sm := range rm.raw.ScopeMetrics {
c += len(sm.Metrics)
}
return c
}
4 changes: 4 additions & 0 deletions pkg/backends/otlp/internal/data/resource_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,14 @@ func TestNewResourceMetrics(t *testing.T) {
NewResource(),
NewScopeMetrics(
NewInstrumentationScope("gostatsd/aggregation", "test"),
NewMetric("a metric"),
NewMetric("another metric"),
),
NewScopeMetrics(
NewInstrumentationScope("gostatsd/aggregation", "test"),
NewMetric("some metric"),
),
)
assert.Len(t, rm.raw.ScopeMetrics, 2, "Must have two scope metrics defined")
assert.Equal(t, 3, rm.CouneMetrics())
}

0 comments on commit ebdb1bf

Please sign in to comment.