Skip to content

Commit

Permalink
Merge branch 'main' into add-logs-timestamp
Browse files Browse the repository at this point in the history
  • Loading branch information
LikeTheSalad authored Oct 18, 2023
2 parents fee73c8 + 83e96f4 commit 5d37ae7
Show file tree
Hide file tree
Showing 9 changed files with 256 additions and 190 deletions.
56 changes: 26 additions & 30 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,62 +3,58 @@ module github.com/elastic/apm-data
go 1.20

require (
github.com/gofrs/uuid v4.3.1+incompatible
github.com/google/go-cmp v0.5.9
github.com/jaegertracing/jaeger v1.38.1
github.com/google/go-cmp v0.6.0
github.com/jaegertracing/jaeger v1.40.0
github.com/json-iterator/go v1.1.12
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger v0.63.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger v0.66.0
github.com/pkg/errors v0.9.1
github.com/stretchr/testify v1.8.3
github.com/stretchr/testify v1.8.4
github.com/xeipuuv/gojsonschema v1.2.0
go.elastic.co/apm/v2 v2.2.0
go.elastic.co/apm/v2 v2.4.5
go.elastic.co/fastjson v1.3.0
go.opentelemetry.io/collector/consumer v0.76.1
go.opentelemetry.io/collector/pdata v1.0.0-rcv0011
go.opentelemetry.io/collector/semconv v0.76.1
go.opentelemetry.io/otel v1.16.0
go.opentelemetry.io/otel/metric v1.16.0
go.opentelemetry.io/otel/sdk v1.16.0
go.opentelemetry.io/otel/sdk/metric v0.39.0
go.opentelemetry.io/otel/trace v1.16.0
go.uber.org/zap v1.24.0
golang.org/x/sync v0.2.0
golang.org/x/tools v0.9.3
google.golang.org/grpc v1.54.0
google.golang.org/protobuf v1.30.0
go.opentelemetry.io/collector/consumer v0.87.0
go.opentelemetry.io/collector/pdata v1.0.0-rcv0016
go.opentelemetry.io/collector/semconv v0.87.0
go.opentelemetry.io/otel v1.19.0
go.opentelemetry.io/otel/metric v1.19.0
go.opentelemetry.io/otel/sdk v1.19.0
go.opentelemetry.io/otel/sdk/metric v1.19.0
go.opentelemetry.io/otel/trace v1.19.0
go.uber.org/zap v1.26.0
golang.org/x/sync v0.4.0
golang.org/x/tools v0.14.0
google.golang.org/grpc v1.58.3
google.golang.org/protobuf v1.31.0
)

require (
github.com/apache/thrift v0.17.0 // indirect
github.com/apache/thrift v0.19.0 // indirect
github.com/armon/go-radix v1.0.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/elastic/go-licenser v0.4.0 // indirect
github.com/elastic/go-sysinfo v1.7.1 // indirect
github.com/elastic/go-windows v1.0.1 // indirect
github.com/go-logr/logr v1.2.4 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/jcchavezs/porto v0.1.0 // indirect
github.com/joeshaw/multierror v0.0.0-20140124173710-69b34d4ec901 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.63.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.66.0 // indirect
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/procfs v0.8.0 // indirect
github.com/santhosh-tekuri/jsonschema v1.2.4 // indirect
github.com/prometheus/procfs v0.11.1 // indirect
github.com/uber/jaeger-client-go v2.30.0+incompatible // indirect
github.com/uber/jaeger-lib v2.4.1+incompatible // indirect
github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f // indirect
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/mod v0.10.0 // indirect
golang.org/x/net v0.10.0 // indirect
golang.org/x/sys v0.8.0 // indirect
golang.org/x/text v0.9.0 // indirect
google.golang.org/genproto v0.0.0-20230110181048-76db0878b65f // indirect
golang.org/x/mod v0.13.0 // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/sys v0.13.0 // indirect
golang.org/x/text v0.13.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230711160842-782d3b101e98 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
howett.net/plist v1.0.0 // indirect
)
149 changes: 55 additions & 94 deletions go.sum

Large diffs are not rendered by default.

23 changes: 18 additions & 5 deletions input/otlp/exceptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,14 @@ package otlp

import (
"bufio"
"crypto/rand"
"encoding/hex"
"fmt"
"io"
"regexp"
"strconv"
"strings"

"github.com/gofrs/uuid"

"github.com/elastic/apm-data/model/modelpb"
)

Expand All @@ -65,9 +66,8 @@ func convertOpenTelemetryExceptionSpanEvent(
exceptionError.Exception.Message = exceptionMessage
exceptionError.Exception.Type = exceptionType
exceptionError.Exception.Handled = &exceptionHandled
// TODO(axw) replace github.com/gofrs/uuid, not worth having the dependency just for this.
if id, err := uuid.NewV4(); err == nil {
exceptionError.Id = id.String()
if id, err := newUniqueID(); err == nil {
exceptionError.Id = id
}
if exceptionStacktrace != "" {
if err := setExceptionStacktrace(exceptionStacktrace, language, exceptionError.Exception); err != nil {
Expand Down Expand Up @@ -215,3 +215,16 @@ func parseJavaStacktraceFrame(s string, out *modelpb.Exception) error {
func isNotTab(r rune) bool {
return r != '\t'
}

func newUniqueID() (string, error) {
var u [16]byte
if _, err := io.ReadFull(rand.Reader, u[:]); err != nil {
return "", err
}

// convert to string
buf := make([]byte, 32)
hex.Encode(buf, u[:])

return string(buf), nil
}
16 changes: 13 additions & 3 deletions input/otlp/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,16 @@ import (
"github.com/elastic/apm-data/model/modelpb"
)

func (c *Consumer) ConsumeLogs(ctx context.Context, logs plog.Logs) error {
type ConsumeLogsResult struct {
RejectedLogRecords int64
}

// ConsumeLogs consumes OpenTelemetry log data, converting into
// the Elastic APM log model and sending to the reporter.
// The returned ConsumeLogsResult contains the number of rejected log records.
func (c *Consumer) ConsumeLogs(ctx context.Context, logs plog.Logs) (ConsumeLogsResult, error) {
if err := c.sem.Acquire(ctx, 1); err != nil {
return err
return ConsumeLogsResult{}, err
}
defer c.sem.Release(1)

Expand All @@ -60,7 +67,10 @@ func (c *Consumer) ConsumeLogs(ctx context.Context, logs plog.Logs) error {
for i := 0; i < resourceLogs.Len(); i++ {
c.convertResourceLogs(resourceLogs.At(i), receiveTimestamp, &batch)
}
return c.config.Processor.ProcessBatch(ctx, &batch)
if err := c.config.Processor.ProcessBatch(ctx, &batch); err != nil {
return ConsumeLogsResult{}, err
}
return ConsumeLogsResult{RejectedLogRecords: 0}, nil
}

func (c *Consumer) convertResourceLogs(resourceLogs plog.ResourceLogs, receiveTimestamp time.Time, out *modelpb.Batch) {
Expand Down
29 changes: 21 additions & 8 deletions input/otlp/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@ func TestConsumerConsumeLogs(t *testing.T) {
Semaphore: semaphore.NewWeighted(100),
})
logs := plog.NewLogs()
assert.NoError(t, consumer.ConsumeLogs(context.Background(), logs))
result, err := consumer.ConsumeLogs(context.Background(), logs)
assert.NoError(t, err)
assert.Equal(t, otlp.ConsumeLogsResult{}, result)
})

commonEvent := modelpb.APMEvent{
Expand Down Expand Up @@ -192,7 +194,9 @@ func TestConsumerConsumeLogs(t *testing.T) {
Processor: processor,
Semaphore: semaphore.NewWeighted(100),
})
assert.NoError(t, consumer.ConsumeLogs(context.Background(), logs))
result, err := consumer.ConsumeLogs(context.Background(), logs)
assert.NoError(t, err)
assert.Equal(t, otlp.ConsumeLogsResult{}, result)

now := modelpb.FromTime(time.Now())
for _, e := range processed {
Expand Down Expand Up @@ -231,16 +235,19 @@ func TestConsumeLogsSemaphore(t *testing.T) {
startCh := make(chan struct{})
go func() {
close(startCh)
assert.NoError(t, consumer.ConsumeLogs(context.Background(), logs))
_, err := consumer.ConsumeLogs(context.Background(), logs)
assert.NoError(t, err)
}()

<-startCh
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
defer cancel()
assert.Equal(t, consumer.ConsumeLogs(ctx, logs).Error(), "context deadline exceeded")
_, err := consumer.ConsumeLogs(ctx, logs)
assert.Equal(t, err.Error(), "context deadline exceeded")
close(doneCh)

assert.NoError(t, consumer.ConsumeLogs(context.Background(), logs))
_, err = consumer.ConsumeLogs(context.Background(), logs)
assert.NoError(t, err)
}

func TestConsumerConsumeLogsException(t *testing.T) {
Expand Down Expand Up @@ -293,7 +300,9 @@ Caused by: LowLevelException
Processor: processor,
Semaphore: semaphore.NewWeighted(100),
})
assert.NoError(t, consumer.ConsumeLogs(context.Background(), logs))
result, err := consumer.ConsumeLogs(context.Background(), logs)
assert.NoError(t, err)
assert.Equal(t, otlp.ConsumeLogsResult{}, result)

now := modelpb.FromTime(time.Now())
for _, e := range processed {
Expand Down Expand Up @@ -443,7 +452,9 @@ func TestConsumerConsumeOTelEventLogs(t *testing.T) {
Processor: processor,
Semaphore: semaphore.NewWeighted(100),
})
assert.NoError(t, consumer.ConsumeLogs(context.Background(), logs))
result, err := consumer.ConsumeLogs(context.Background(), logs)
assert.NoError(t, err)
assert.Equal(t, otlp.ConsumeLogsResult{}, result)

assert.Len(t, processed, 1)
assert.Equal(t, "event", processed[0].Event.Kind)
Expand Down Expand Up @@ -580,7 +591,9 @@ func TestConsumerConsumeLogsLabels(t *testing.T) {
Processor: processor,
Semaphore: semaphore.NewWeighted(100),
})
assert.NoError(t, consumer.ConsumeLogs(context.Background(), logs))
result, err := consumer.ConsumeLogs(context.Background(), logs)
assert.NoError(t, err)
assert.Equal(t, otlp.ConsumeLogsResult{}, result)

assert.Len(t, processed, 3)
assert.Equal(t, modelpb.Labels{"key0": {Global: true, Value: "zero"}, "key1": {Value: "one"}}, modelpb.Labels(processed[0].Labels))
Expand Down
Loading

0 comments on commit 5d37ae7

Please sign in to comment.