Skip to content

Commit

Permalink
Merge pull request #720 from atlassian/htan/make-async-send-async
Browse files Browse the repository at this point in the history
send batches concurrently in each flush and compress payload
  • Loading branch information
hstan authored Aug 7, 2024
2 parents da2dba9 + fa18514 commit 8c705be
Show file tree
Hide file tree
Showing 9 changed files with 69 additions and 26 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ require (
stathat.com/c/consistent v1.0.0
)

require golang.org/x/sync v0.6.0

require (
github.com/PuerkitoBio/purell v1.1.1 // indirect
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,8 @@ golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ=
golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190204203706-41f3e6584952/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down
55 changes: 31 additions & 24 deletions pkg/backends/otlp/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/sirupsen/logrus"
"github.com/spf13/viper"
"go.uber.org/multierr"
"golang.org/x/sync/errgroup"

"github.com/atlassian/gostatsd/pkg/stats"

Expand Down Expand Up @@ -50,6 +51,7 @@ type Backend struct {
client *http.Client
requestsBufferSem chan struct{}
maxRetries int
CompressPayload bool

// metricsPerBatch is the maximum number of metrics to send in a single batch.
metricsPerBatch int
Expand Down Expand Up @@ -84,6 +86,7 @@ func NewClientFromViper(v *viper.Viper, logger logrus.FieldLogger, pool *transpo
logger: logger,
requestsBufferSem: make(chan struct{}, cfg.MaxRequests),
maxRetries: cfg.MaxRetries,
CompressPayload: cfg.CompressPayload,
metricsPerBatch: cfg.MetricsPerBatch,
}, nil
}
Expand Down Expand Up @@ -139,7 +142,7 @@ func (bd *Backend) SendMetricsAsync(ctx context.Context, mm *gostatsd.MetricMap,
bd.batchesRetried.SendIfChanged(statser, "backend.retried", nil)
}()

group := newGroups(bd.metricsPerBatch)
currentGroup := newGroups(bd.metricsPerBatch)

mm.Counters.Each(func(name, _ string, cm gostatsd.Counter) {
resources, attributes := data.SplitMetricTagsByKeysAndConvert(cm.Tags, bd.resourceKeys)
Expand All @@ -164,8 +167,8 @@ func (bd *Backend) SendMetricsAsync(ctx context.Context, mm *gostatsd.MetricMap,
),
)

group.insert(bd.is, resources, rate)
group.insert(bd.is, resources, m)
currentGroup.insert(bd.is, resources, rate)
currentGroup.insert(bd.is, resources, m)
})

mm.Gauges.Each(func(name, _ string, gm gostatsd.Gauge) {
Expand All @@ -181,7 +184,7 @@ func (bd *Backend) SendMetricsAsync(ctx context.Context, mm *gostatsd.MetricMap,
),
)

group.insert(bd.is, resources, m)
currentGroup.insert(bd.is, resources, m)
})

mm.Sets.Each(func(name, _ string, sm gostatsd.Set) {
Expand All @@ -197,7 +200,7 @@ func (bd *Backend) SendMetricsAsync(ctx context.Context, mm *gostatsd.MetricMap,
),
)

group.insert(bd.is, resources, m)
currentGroup.insert(bd.is, resources, m)
})

mm.Timers.Each(func(name, _ string, t gostatsd.Timer) {
Expand All @@ -214,7 +217,7 @@ func (bd *Backend) SendMetricsAsync(ctx context.Context, mm *gostatsd.MetricMap,
} else {
btags.Insert("le", strconv.FormatFloat(float64(boundry), 'f', -1, 64))
}
group.insert(
currentGroup.insert(
bd.is,
resources,
data.NewMetric(fmt.Sprintf("%s.histogram", name)).SetGauge(
Expand Down Expand Up @@ -248,7 +251,7 @@ func (bd *Backend) SendMetricsAsync(ctx context.Context, mm *gostatsd.MetricMap,
if calc.discarded {
continue
}
group.insert(
currentGroup.insert(
bd.is,
resources,
data.NewMetric(fmt.Sprintf("%s.%s", name, calc.suffix)).SetGauge(
Expand All @@ -262,7 +265,7 @@ func (bd *Backend) SendMetricsAsync(ctx context.Context, mm *gostatsd.MetricMap,
}

for _, pct := range t.Percentiles {
group.insert(bd.is, resources, data.NewMetric(fmt.Sprintf("%s.%s", name, pct.Str)).SetGauge(
currentGroup.insert(bd.is, resources, data.NewMetric(fmt.Sprintf("%s.%s", name, pct.Str)).SetGauge(
data.NewGauge(data.NewNumberDataPoint(
uint64(t.Timestamp),
data.WithNumberDataPointMap(attributes),
Expand All @@ -280,27 +283,32 @@ func (bd *Backend) SendMetricsAsync(ctx context.Context, mm *gostatsd.MetricMap,
if len(t.Histogram) != 0 {
opts = append(opts, data.WithHistogramDataPointCumulativeBucketValues(t.Histogram))
}
group.insert(bd.is, resources, data.NewMetric(name).SetHistogram(
currentGroup.insert(bd.is, resources, data.NewMetric(name).SetHistogram(
data.NewHistogram(data.NewHistogramDataPoint(uint64(t.Timestamp), opts...)),
))
}
})

var errs error
for _, b := range group.batches {
eg, ectx := errgroup.WithContext(ctx)
for _, b := range currentGroup.batches {
atomic.AddUint64(&bd.batchesCreated, 1)
err := bd.postMetrics(ctx, b)
if err != nil {
bd.logger.WithError(err).WithFields(logrus.Fields{
"endpoint": bd.metricsEndpoint,
}).Error("Issues trying to submit data")
errs = multierr.Append(errs, err)
} else {
atomic.AddUint64(&bd.batchesSent, 1)
atomic.AddUint64(&bd.seriesSent, uint64(b.lenMetrics()))
}
func(g group) {
eg.Go(func() error {
err := bd.postMetrics(ectx, b)
if err != nil {
bd.logger.WithError(err).WithFields(logrus.Fields{
"endpoint": bd.metricsEndpoint,
}).Error("Issues trying to submit data")
atomic.AddUint64(&bd.batchesDropped, 1)
} else {
atomic.AddUint64(&bd.batchesSent, 1)
atomic.AddUint64(&bd.seriesSent, uint64(b.lenMetrics()))
}
return err
})
}(b)
}
cb(multierr.Errors(errs))
cb(multierr.Errors(eg.Wait()))
}

func (c *Backend) postMetrics(ctx context.Context, batch group) error {
Expand All @@ -312,9 +320,8 @@ func (c *Backend) postMetrics(ctx context.Context, batch group) error {
)

resourceMetrics := batch.values()
req, err = data.NewMetricsRequest(ctx, c.metricsEndpoint, resourceMetrics)
req, err = data.NewMetricsRequest(ctx, c.metricsEndpoint, resourceMetrics, c.CompressPayload)
if err != nil {
atomic.AddUint64(&c.batchesDropped, 1)
return err
}

Expand Down
1 change: 1 addition & 0 deletions pkg/backends/otlp/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,7 @@ func TestBackendSendAsyncMetrics(t *testing.T) {
v := viper.New()
v.Set("otlp.metrics_endpoint", fmt.Sprintf("%s/%s", s.URL, "v1/metrics"))
v.Set("otlp.logs_endpoint", fmt.Sprintf("%s/%s", s.URL, "v1/logs"))
v.Set("otlp.compress_payload", false)
if tc.enableHistograms {
v.Set("otlp.conversion", ConversionAsHistogram)
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/backends/otlp/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ type Config struct {
MaxRequests int `mapstructure:"max_requests"`
// MaxRetries (Optional, default: 3) is the maximum number of retries to send a batch
MaxRetries int `mapstructure:"max_retries"`
// CompressPayload (Optional, default: true) is used to enable payload compression
CompressPayload bool `mapstructure:"compress_payload"`
// MetricsPerBatch (Optional, default: 1000) is the maximum number of metrics to send in a single batch.
MetricsPerBatch int `mapstructure:"metrics_per_batch"`
// ResourceKeys (Optional) is used to extract values from provided tags
Expand Down Expand Up @@ -62,6 +64,7 @@ func newDefaultConfig() *Config {
Transport: "default",
MaxRequests: runtime.NumCPU() * 2,
MaxRetries: 3,
CompressPayload: true,
MetricsPerBatch: defaultMetricsPerBatch,
Conversion: ConversionAsGauge,
UserAgent: "gostatsd",
Expand Down
2 changes: 2 additions & 0 deletions pkg/backends/otlp/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ func TestNewConfig(t *testing.T) {
v.SetDefault("otlp.logs_endpoint", "http://local/v1/logs")
v.SetDefault("otlp.max_requests", 1)
v.SetDefault("otlp.max_retries", 3)
v.SetDefault("otlp.compress_payload", true)
v.SetDefault("otlp.metrics_per_batch", 999)
return v
}(),
Expand All @@ -38,6 +39,7 @@ func TestNewConfig(t *testing.T) {
LogsEndpoint: "http://local/v1/logs",
MaxRequests: 1,
MaxRetries: 3,
CompressPayload: true,
MetricsPerBatch: 999,
Conversion: "AsGauge",
Transport: "default",
Expand Down
18 changes: 17 additions & 1 deletion pkg/backends/otlp/internal/data/metric_request.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package data

import (
"bytes"
"compress/gzip"
"context"
"net/http"

Expand All @@ -13,7 +15,7 @@ type metricsRequest struct {
raw *v1export.ExportMetricsServiceRequest
}

func NewMetricsRequest(ctx context.Context, endpoint string, metrics []ResourceMetrics) (*http.Request, error) {
func NewMetricsRequest(ctx context.Context, endpoint string, metrics []ResourceMetrics, compressPayload bool) (*http.Request, error) {
mr := metricsRequest{
raw: &v1export.ExportMetricsServiceRequest{
ResourceMetrics: make([]*v1metrics.ResourceMetrics, 0, len(metrics)),
Expand All @@ -29,5 +31,19 @@ func NewMetricsRequest(ctx context.Context, endpoint string, metrics []ResourceM
return nil, err
}

if compressPayload {
var b bytes.Buffer
w := gzip.NewWriter(&b)
if _, err = w.Write(buf); err != nil {
return nil, err
}

if err = w.Close(); err != nil {
return nil, err
}

return createProtobufRequest(ctx, endpoint, b.Bytes(), withHeader("Content-Encoding", "gzip"))
}

return createProtobufRequest(ctx, endpoint, buf)
}
1 change: 1 addition & 0 deletions pkg/backends/otlp/internal/data/metric_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ func TestMetricsRequest(t *testing.T) {
context.Background(),
"not-a-valid-url",
[]ResourceMetrics{NewResourceMetrics(NewResource())},
false,
)
assert.NoError(t, err, "Must not error creating request")
assert.NotNil(t, req, "Must have a valid request")
Expand Down
11 changes: 10 additions & 1 deletion pkg/backends/otlp/internal/data/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,13 @@ import (
"net/http"
)

func createProtobufRequest(ctx context.Context, endpoint string, buf []byte) (*http.Request, error) {
func withHeader(key, value string) func(*http.Request) {
return func(req *http.Request) {
req.Header.Set(key, value)
}
}

func createProtobufRequest(ctx context.Context, endpoint string, buf []byte, option ...func(*http.Request)) (*http.Request, error) {
req, err := http.NewRequestWithContext(
ctx,
http.MethodPost,
Expand All @@ -18,5 +24,8 @@ func createProtobufRequest(ctx context.Context, endpoint string, buf []byte) (*h
}

req.Header.Set("Content-Type", RequestContentTypeProtobuf)
for _, opt := range option {
opt(req)
}
return req, nil
}

0 comments on commit 8c705be

Please sign in to comment.