Skip to content

Commit

Permalink
Merge pull request #715 from atlassian/htan/add-metrics
Browse files Browse the repository at this point in the history
Add metrics for OTLP backend
  • Loading branch information
hstan authored Aug 5, 2024
2 parents eb42b6f + 4da0132 commit 38a3277
Showing 1 changed file with 32 additions and 15 deletions.
47 changes: 32 additions & 15 deletions pkg/backends/otlp/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,14 @@ const (
// to export values as OTLP metrics.
// The zero value is not safe to use.
type Backend struct {
droppedMetrics uint64
droppedEvents uint64
batchesCreated uint64 // Accumulated number of batches created
batchesDropped uint64 // Accumulated number of batches aborted (data loss)
batchesSent uint64 // Accumulated number of batches successfully sent
seriesSent uint64 // Accumulated number of series successfully sent
seriesDropped uint64 // Accumulated number of series aborted (data loss)

eventsSent uint64 // Accumulated number of events successfully sent
eventsDropped uint64 // Accumulated number of events aborted (data loss)

metricsEndpoint string
logsEndpoint string
Expand Down Expand Up @@ -86,7 +92,8 @@ func (*Backend) Name() string {
func (b *Backend) SendEvent(ctx context.Context, event *gostatsd.Event) error {
statser := stats.FromContext(ctx).WithTags(gostatsd.Tags{"backend:otlp"})
defer func() {
statser.Gauge("backend.dropped_events", float64(atomic.LoadUint64(&b.droppedEvents)), nil)
statser.Gauge("backend.dropped_events", float64(atomic.LoadUint64(&b.eventsDropped)), nil)
statser.Gauge("backend.sent_events", float64(atomic.LoadUint64(&b.eventsSent)), nil)
}()

se, err := data.NewOtlpEvent(
Expand All @@ -100,25 +107,34 @@ func (b *Backend) SendEvent(ctx context.Context, event *gostatsd.Event) error {

req, err := data.NewEventsRequest(ctx, b.logsEndpoint, el, rt)
if err != nil {
atomic.AddUint64(&b.droppedEvents, 1)
atomic.AddUint64(&b.eventsDropped, 1)
return err
}

b.requestsBufferSem <- struct{}{}
resp, err := b.client.Do(req)
<-b.requestsBufferSem
if err != nil {
atomic.AddUint64(&b.droppedEvents, 1)
atomic.AddUint64(&b.eventsDropped, 1)
return err
}

err = data.ProcessEventsResponse(resp)
atomic.AddUint64(&b.droppedEvents, 1)
atomic.AddUint64(&b.eventsDropped, 1)

return err
}

func (bd *Backend) SendMetricsAsync(ctx context.Context, mm *gostatsd.MetricMap, cb gostatsd.SendCallback) {
statser := stats.FromContext(ctx).WithTags(gostatsd.Tags{"backend:otlp"})
defer func() {
statser.Gauge("backend.created", float64(atomic.LoadUint64(&bd.batchesCreated)), nil)
statser.Gauge("backend.dropped", float64(atomic.LoadUint64(&bd.batchesDropped)), nil)
statser.Gauge("backend.sent", float64(atomic.LoadUint64(&bd.batchesSent)), nil)
statser.Gauge("backend.series.sent", float64(atomic.LoadUint64(&bd.seriesSent)), nil)
statser.Gauge("backend.series.dropped", float64(atomic.LoadUint64(&bd.seriesDropped)), nil)
}()

group := newGroups(bd.metricsPerBatch)

mm.Counters.Each(func(name, _ string, cm gostatsd.Counter) {
Expand Down Expand Up @@ -268,38 +284,39 @@ func (bd *Backend) SendMetricsAsync(ctx context.Context, mm *gostatsd.MetricMap,

var errs error
for _, b := range group.batches {
err := bd.postMetrics(ctx, b.values())
atomic.AddUint64(&bd.batchesCreated, 1)
err := bd.postMetrics(ctx, b)
if err != nil {
bd.logger.WithError(err).WithFields(logrus.Fields{
"endpoint": bd.metricsEndpoint,
}).Error("Issues trying to submit data")
errs = multierr.Append(errs, err)
} else {
atomic.AddUint64(&bd.batchesSent, 1)
atomic.AddUint64(&bd.seriesSent, uint64(b.lenMetrics()))
}
}
cb(multierr.Errors(errs))
}

func (c *Backend) postMetrics(ctx context.Context, resourceMetrics []data.ResourceMetrics) error {
statser := stats.FromContext(ctx).WithTags(gostatsd.Tags{"backend:otlp"})
defer func() {
statser.Gauge("backend.dropped", float64(atomic.LoadUint64(&c.droppedMetrics)), nil)
}()
func (c *Backend) postMetrics(ctx context.Context, batch group) error {
resourceMetrics := batch.values()

req, err := data.NewMetricsRequest(ctx, c.metricsEndpoint, resourceMetrics)
if err != nil {
atomic.AddUint64(&c.droppedMetrics, uint64(len(resourceMetrics)))
atomic.AddUint64(&c.batchesDropped, 1)
return err
}

c.requestsBufferSem <- struct{}{}
resp, err := c.client.Do(req)
<-c.requestsBufferSem
if err != nil {
atomic.AddUint64(&c.droppedMetrics, uint64(len(resourceMetrics)))
atomic.AddUint64(&c.batchesDropped, 1)
return err
}
dropped, err := data.ProcessMetricResponse(resp)
atomic.AddUint64(&c.droppedMetrics, uint64(dropped))
atomic.AddUint64(&c.seriesDropped, uint64(dropped))

return err
}

0 comments on commit 38a3277

Please sign in to comment.