Skip to content

Commit

Permalink
Trace sempahore acquire (#190)
Browse files Browse the repository at this point in the history
* elasticapm: create span in semAcquire

* otlp: trace semaphore acquire across consumers
  • Loading branch information
endorama authored Dec 6, 2023
1 parent 3fce460 commit 6ed3343
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 3 deletions.
3 changes: 3 additions & 0 deletions input/elasticapm/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,9 @@ func (p *Processor) getStreamReader(r io.Reader) *streamReader {
}

func (p *Processor) semAcquire(ctx context.Context, async bool) error {
sp, ctx := apm.StartSpan(ctx, "Semaphore.Acquire", "Reporter")
defer sp.End()

if async {
if ok := p.sem.TryAcquire(1); !ok {
return ErrQueueFull
Expand Down
2 changes: 1 addition & 1 deletion input/otlp/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (c *Consumer) ConsumeLogs(ctx context.Context, logs plog.Logs) error {
// ConsumeLogsWithResult consumes OpenTelemetry log data, converting into
// the Elastic APM log model and sending to the reporter.
func (c *Consumer) ConsumeLogsWithResult(ctx context.Context, logs plog.Logs) (ConsumeLogsResult, error) {
if err := c.sem.Acquire(ctx, 1); err != nil {
if err := semAcquire(ctx, c.sem, 1); err != nil {
return ConsumeLogsResult{}, err
}
defer c.sem.Release(1)
Expand Down
2 changes: 1 addition & 1 deletion input/otlp/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (c *Consumer) ConsumeMetrics(ctx context.Context, metrics pmetric.Metrics)
func (c *Consumer) ConsumeMetricsWithResult(ctx context.Context, metrics pmetric.Metrics) (ConsumeMetricsResult, error) {
totalDataPoints := int64(metrics.DataPointCount())
totalMetrics := int64(metrics.MetricCount())
if err := c.sem.Acquire(ctx, 1); err != nil {
if err := semAcquire(ctx, c.sem, 1); err != nil {
return ConsumeMetricsResult{}, err
}
defer c.sem.Release(1)
Expand Down
33 changes: 33 additions & 0 deletions input/otlp/trace-semaphore.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package otlp

import (
"context"

"go.elastic.co/apm/v2"

"github.com/elastic/apm-data/input"
)

func semAcquire(ctx context.Context, sem input.Semaphore, i int64) error {
sp, ctx := apm.StartSpan(ctx, "Semaphore.Acquire", "Reporter")
defer sp.End()

return sem.Acquire(ctx, i)
}
2 changes: 1 addition & 1 deletion input/otlp/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (c *Consumer) ConsumeTraces(ctx context.Context, traces ptrace.Traces) erro
// ConsumeTracesWithResult consumes OpenTelemetry trace data,
// converting into Elastic APM events and reporting to the Elastic APM schema.
func (c *Consumer) ConsumeTracesWithResult(ctx context.Context, traces ptrace.Traces) (ConsumeTracesResult, error) {
if err := c.sem.Acquire(ctx, 1); err != nil {
if err := semAcquire(ctx, c.sem, 1); err != nil {
return ConsumeTracesResult{}, err
}
defer c.sem.Release(1)
Expand Down

0 comments on commit 6ed3343

Please sign in to comment.