diff --git a/pkg/backends/otlp/group.go b/pkg/backends/otlp/group.go index 77bfd3f1..aefa991d 100644 --- a/pkg/backends/otlp/group.go +++ b/pkg/backends/otlp/group.go @@ -1,13 +1,24 @@ package otlp import ( - "golang.org/x/exp/maps" - "github.com/atlassian/gostatsd/pkg/backends/otlp/internal/data" + "golang.org/x/exp/maps" ) type group map[uint64]data.ResourceMetrics +func (g *group) Values() []data.ResourceMetrics { + return maps.Values(*g) +} + +func (g *group) LenMetrics() int { + count := 0 + for _, rm := range *g { + count += rm.CouneMetrics() + } + return count +} + // Group is used to ensure metrics that have the same resource attributes // are grouped together and it uses a fixed values to reduce potential memory // allocations compared to using a string value @@ -24,10 +35,6 @@ func NewGroup(batchSize int) Group { } } -func (g *group) Values() []data.ResourceMetrics { - return maps.Values(*g) -} - func (g *Group) Insert(is data.InstrumentationScope, resources data.Map, m data.Metric) { key := resources.Hash() @@ -44,7 +51,7 @@ func (g *Group) Insert(is data.InstrumentationScope, resources data.Map, m data. entry.AppendMetric(is, m) currentBatch[key] = entry - if len(currentBatch) == g.batchSize { + if currentBatch.LenMetrics() == g.batchSize { g.batches = append(g.batches, make(group)) } } diff --git a/pkg/backends/otlp/group_test.go b/pkg/backends/otlp/group_test.go index b9fe8ebd..c8494447 100644 --- a/pkg/backends/otlp/group_test.go +++ b/pkg/backends/otlp/group_test.go @@ -58,47 +58,75 @@ func TestGroupInsert(t *testing.T) { func TestGroupBatch(t *testing.T) { t.Parallel() - g := NewGroup(2) + for _, tc := range []struct { + name string + batchSize int + metricsAdded map[string][]string // map[resourceMetrics][]metricNames + wantNumOfBatches int + }{ + { + name: "no metrics", + batchSize: 2, + metricsAdded: map[string][]string{}, + wantNumOfBatches: 1, // Must have at least one batch + }, + { + name: "metrics in one resource metrics exceeds batch limit should be split into two groups", + batchSize: 2, + metricsAdded: map[string][]string{ + "r1": {"m1", "m2", "m3", "m4", "m5"}, + }, + wantNumOfBatches: 3, + }, + { + name: "metrics in multiple resource metrics doesn't exceeds batch limit stay in one group", + batchSize: 10, + metricsAdded: map[string][]string{ + "r1": {"m1", "m2", "m3"}, + "r2": {"m1", "m2", "m3"}, + "r3": {"m1", "m2", "m3"}, + }, + wantNumOfBatches: 1, + }, + { + name: "should properly split metrics into groups according to batch size", + batchSize: 5, + metricsAdded: map[string][]string{ + "r1": {"m1", "m2", "m3", "m4", "m5", "m6", "m7", "m8", "m9", "m10"}, + "r2": {"m1"}, + "r3": {"m1", "m2", "m3", "m4", "m5"}, + "r4": {"m1", "m2", "m3"}, + }, + wantNumOfBatches: 4, + }, + } { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + g := NewGroup(tc.batchSize) + is := data.NewInstrumentationScope("gostatsd/aggregation", "v1.0.0") + for rm, metricNames := range tc.metricsAdded { + for _, metricName := range metricNames { + insertMetric(&g, is, rm, metricName) + } + } - is := data.NewInstrumentationScope("gostatsd/aggregation", "v1.0.0") + assert.Len(t, g.batches, tc.wantNumOfBatches, "Must have %d groups", tc.wantNumOfBatches) + }) + } +} +func insertMetric(g *Group, is data.InstrumentationScope, serviceName string, metricName string) { g.Insert( is, data.NewMap( data.WithStatsdDelimitedTags( []string{ - "service.name:my-awesome-service", + "service.name:" + serviceName, "service.region:local", }, ), ), - data.NewMetric("my-metric"), + data.NewMetric(metricName), ) - g.Insert( - is, - data.NewMap( - data.WithStatsdDelimitedTags( - []string{ - "service.name:my-awesome-service", - "service.region:local", - }, - ), - ), - data.NewMetric("my-metric"), - ) - g.Insert( - is, - data.NewMap( - data.WithStatsdDelimitedTags( - []string{ - "service.name:my-other-service", - "service.region:local", - }, - ), - ), - data.NewMetric("my-metric"), - ) - - batches := g.batches - assert.Equal(t, 2, len(batches)) }