Skip to content

Commit

Permalink
Merge pull request grafana#146 from grafana/update-otel
Browse files Browse the repository at this point in the history
Update otel-collector dependency to v0.6.1
  • Loading branch information
joe-elliott authored Aug 31, 2020
2 parents 2dea4b1 + 01da6f3 commit 96d2945
Show file tree
Hide file tree
Showing 2,334 changed files with 277,500 additions and 192,795 deletions.
3 changes: 3 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[submodule "opentelemetry-proto"]
path = opentelemetry-proto
url = https://github.com/open-telemetry/opentelemetry-proto
4 changes: 3 additions & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,6 @@ linters:

issues:
exclude:
- Error return value of .*log\.Logger\)\.Log\x60 is not checked
- Error return value of .*log\.Logger\)\.Log\x60 is not checked
- package github.com/golang/protobuf/proto is deprecated
- package github.com/golang/protobuf/jsonpb is deprecated
24 changes: 20 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -76,16 +76,32 @@ endif

.PHONY: gen-proto
gen-proto:
vend -package
protoc -I vendor/github.com/open-telemetry/opentelemetry-proto -I pkg/tempopb/ pkg/tempopb/tempo.proto --go_out=plugins=grpc:pkg/tempopb
$(MAKE) vendor-dependencies
git submodule init
rm -rf ./vendor/github.com/open-telemetry/opentelemetry-proto
protoc -I opentelemetry-proto/ opentelemetry-proto/opentelemetry/proto/common/v1/common.proto --gogofaster_out=plugins=grpc:./vendor
protoc -I opentelemetry-proto/ opentelemetry-proto/opentelemetry/proto/resource/v1/resource.proto --gogofaster_out=plugins=grpc:./vendor
# protoc -I opentelemetry-proto/ opentelemetry-proto/opentelemetry/proto/logs/v1/logs.proto --gogofaster_out=plugins=grpc:./vendor
protoc -I opentelemetry-proto/ opentelemetry-proto/opentelemetry/proto/metrics/v1/metrics.proto --gogofaster_out=plugins=grpc:./vendor
protoc -I opentelemetry-proto/ opentelemetry-proto/opentelemetry/proto/trace/v1/trace.proto --gogofaster_out=plugins=grpc:./vendor
# protoc -I opentelemetry-proto/ opentelemetry-proto/opentelemetry/proto/collector/logs/v1/logs_service.proto --gogofaster_out=plugins=grpc:./vendor
protoc -I opentelemetry-proto/ opentelemetry-proto/opentelemetry/proto/collector/metrics/v1/metrics_service.proto --gogofaster_out=plugins=grpc:./vendor
protoc -I opentelemetry-proto/ opentelemetry-proto/opentelemetry/proto/collector/metrics/v1/metrics_service.proto \
--grpc-gateway_out=logtostderr=true,grpc_api_configuration=opentelemetry-proto/opentelemetry/proto/collector/metrics/v1/metrics_service_http.yaml:./vendor
protoc -I opentelemetry-proto/ opentelemetry-proto/opentelemetry/proto/collector/trace/v1/trace_service.proto --gogofaster_out=plugins=grpc:./vendor
protoc -I opentelemetry-proto/ opentelemetry-proto/opentelemetry/proto/collector/trace/v1/trace_service.proto \
--grpc-gateway_out=logtostderr=true,grpc_api_configuration=opentelemetry-proto/opentelemetry/proto/collector/trace/v1/trace_service_http.yaml:./vendor
protoc -I opentelemetry-proto/ -I pkg/tempopb/ pkg/tempopb/tempo.proto --gogofaster_out=plugins=grpc:pkg/tempopb

.PHONY: vendor-dependencies
vendor-dependencies:
go mod tidy
go mod vendor
# ignore log.go b/c the proto version used by v0.6.1 doesn't actually have logs proto.
find | grep 'vendor/go.opentelemetry.io.*go$\' | grep -v -e 'log.go$\' | xargs -L 1 sed -i 's+go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/+github.com/open-telemetry/opentelemetry-proto/gen/go/+g'
$(MAKE) gen-proto


.PHONY: install-tools
install-tools:
go get -u github.com/nomad-software/vend
go get -u github.com/golang/protobuf/protoc-gen-go
go get -u github.com/gogo/protobuf/protoc-gen-gogofaster
2 changes: 1 addition & 1 deletion cmd/tempo-query/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM jaegertracing/jaeger-query:1.17
FROM jaegertracing/jaeger-query:1.19.2

ENV SPAN_STORAGE_TYPE=grpc-plugin \
GRPC_STORAGE_PLUGIN_BINARY=/tmp/tempo-query
Expand Down
175 changes: 29 additions & 146 deletions cmd/tempo-query/tempo/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,18 @@ package tempo

import (
"context"
"encoding/binary"
"encoding/json"
"fmt"
"net/http"
"time"

"github.com/golang/protobuf/jsonpb"
"github.com/grafana/tempo/pkg/tempopb"

jaeger "github.com/jaegertracing/jaeger/model"
jaeger_spanstore "github.com/jaegertracing/jaeger/storage/spanstore"
"github.com/open-telemetry/opentelemetry-collector/translator/conventions"
ot_common "github.com/open-telemetry/opentelemetry-proto/gen/go/common/v1"
ot_resource "github.com/open-telemetry/opentelemetry-proto/gen/go/resource/v1"
ot_trace "github.com/open-telemetry/opentelemetry-proto/gen/go/trace/v1"

ot_pdata "go.opentelemetry.io/collector/consumer/pdata"
ot_jaeger "go.opentelemetry.io/collector/translator/trace/jaeger"
)

type Backend struct {
Expand All @@ -32,44 +30,49 @@ func (b *Backend) GetDependencies(endTs time.Time, lookback time.Duration) ([]ja
return nil, nil
}
func (b *Backend) GetTrace(ctx context.Context, traceID jaeger.TraceID) (*jaeger.Trace, error) {

hexID := fmt.Sprintf("%016x%016x", traceID.High, traceID.Low)

resp, err := http.Get(b.tempoEndpoint + hexID)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed get to tempo %w", err)
}

out := &tempopb.Trace{}
err = json.NewDecoder(resp.Body).Decode(out)
resp.Body.Close()
unmarshaller := &jsonpb.Unmarshaler{}
err = unmarshaller.Unmarshal(resp.Body, out)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to unmarshal trace json %w", err)
}
resp.Body.Close()

if len(out.Batches) == 0 {
return nil, fmt.Errorf("TraceID Not Found: " + hexID)
return nil, fmt.Errorf("traceID not found: %s", hexID)
}

otTrace := ot_pdata.TracesFromOtlp(out.Batches)
jaegerBatches, err := ot_jaeger.InternalTracesToJaegerProto(otTrace)

if err != nil {
return nil, fmt.Errorf("error translating to jaegerBatches %v: %w", hexID, err)
}

jaegerTrace := &jaeger.Trace{
Spans: []*jaeger.Span{},
ProcessMap: []jaeger.Trace_ProcessMapping{},
}

// now convert trace to jaeger
// todo: remove custom code in favor of otelcol once it's complete
for _, batch := range out.Batches {
for _, span := range batch.Spans {
jSpan := protoSpanToJaegerSpan(span)
jProcess, processID := protoResourceToJaegerProcess(batch.Resource)

jSpan.ProcessID = processID
jSpan.Process = &jProcess

jaegerTrace.Spans = append(jaegerTrace.Spans, jSpan)
jaegerTrace.ProcessMap = append(jaegerTrace.ProcessMap, jaeger.Trace_ProcessMapping{
Process: jProcess,
ProcessID: processID,
})
for _, batch := range jaegerBatches {
// otel proto conversion doesn't set jaeger spans for some reason.
for _, s := range batch.Spans {
s.Process = batch.Process
}

jaegerTrace.Spans = append(jaegerTrace.Spans, batch.Spans...)
jaegerTrace.ProcessMap = append(jaegerTrace.ProcessMap, jaeger.Trace_ProcessMapping{
Process: *batch.Process,
ProcessID: batch.Process.ServiceName,
})
}

return jaegerTrace, nil
Expand All @@ -90,123 +93,3 @@ func (b *Backend) FindTraceIDs(ctx context.Context, query *jaeger_spanstore.Trac
func (b *Backend) WriteSpan(span *jaeger.Span) error {
return nil
}

func protoResourceToJaegerProcess(in *ot_resource.Resource) (jaeger.Process, string) {
processName := ""
p := jaeger.Process{
Tags: make([]jaeger.KeyValue, 0, len(in.Attributes)),
}

for _, att := range in.Attributes {
if att == nil {
continue
}

tag := protoAttToJaegerTag(att)
if tag.Key == conventions.AttributeServiceName {
p.ServiceName = tag.VStr
}

if tag.Key == conventions.AttributeHostHostname {
processName = tag.VStr
}

p.Tags = append(p.Tags, tag)
}

return p, processName
}

func protoSpanToJaegerSpan(in *ot_trace.Span) *jaeger.Span {
traceID := jaeger.TraceID{
High: binary.BigEndian.Uint64(in.TraceId[:8]),
Low: binary.BigEndian.Uint64(in.TraceId[8:]),
}

s := &jaeger.Span{
TraceID: traceID,
SpanID: jaeger.SpanID(binary.BigEndian.Uint64(in.SpanId)),
OperationName: in.Name,
StartTime: time.Unix(0, int64(in.StartTimeUnixnano)),
Duration: time.Unix(0, int64(in.EndTimeUnixnano)).Sub(time.Unix(0, int64(in.StartTimeUnixnano))),
Tags: protoAttsToJaegerTags(in.Attributes),
Logs: protoEventsToJaegerLogs(in.Events),
}

for _, link := range in.Links {
s.References = append(s.References, jaeger.SpanRef{
TraceID: traceID,
SpanID: jaeger.SpanID(binary.BigEndian.Uint64(link.SpanId)),
RefType: jaeger.SpanRefType_CHILD_OF,
})
}

return s
}

func protoAttsToJaegerTags(ocAttribs []*ot_common.AttributeKeyValue) []jaeger.KeyValue {
if ocAttribs == nil {
return nil
}

// Pre-allocate assuming that few attributes, if any at all, are nil.
jTags := make([]jaeger.KeyValue, 0, len(ocAttribs))
for _, att := range ocAttribs {
if att == nil {
continue
}

jTags = append(jTags, protoAttToJaegerTag(att))
}

return jTags
}

func protoAttToJaegerTag(attrib *ot_common.AttributeKeyValue) jaeger.KeyValue {
jTag := jaeger.KeyValue{Key: attrib.Key}
switch attrib.Type {
case ot_common.AttributeKeyValue_STRING:
// Jaeger-to-OC maps binary tags to string attributes and encodes them as
// base64 strings. Blindingly attempting to decode base64 seems too much.
str := attrib.StringValue
jTag.VStr = str
jTag.VType = jaeger.ValueType_STRING
case ot_common.AttributeKeyValue_INT:
i := attrib.IntValue
jTag.VInt64 = i
jTag.VType = jaeger.ValueType_INT64
case ot_common.AttributeKeyValue_BOOL:
b := attrib.BoolValue
jTag.VBool = b
jTag.VType = jaeger.ValueType_BOOL
case ot_common.AttributeKeyValue_DOUBLE:
d := attrib.DoubleValue
jTag.VFloat64 = d
jTag.VType = jaeger.ValueType_FLOAT64
default:
str := "<Unknown OpenTelemetry Attribute for key \"" + attrib.Key + "\">"
jTag.VStr = str
jTag.VType = jaeger.ValueType_STRING
}

return jTag
}

func protoEventsToJaegerLogs(ocSpanTimeEvents []*ot_trace.Span_Event) []jaeger.Log {
if ocSpanTimeEvents == nil {
return nil
}

// Assume that in general no time events are going to produce nil Jaeger logs.
jLogs := make([]jaeger.Log, 0, len(ocSpanTimeEvents))
for _, ocTimeEvent := range ocSpanTimeEvents {
jLog := jaeger.Log{
Timestamp: time.Unix(0, int64(ocTimeEvent.TimeUnixnano)),
Fields: protoAttsToJaegerTags(ocTimeEvent.Attributes),
}

jLogs = append(jLogs, jLog)
}

return jLogs
}
18 changes: 11 additions & 7 deletions cmd/tempo-vulture/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,11 @@ func hasMissingSpans(t *tempopb.Trace) bool {
linkedSpanIDs := make([][]byte, 0)

for _, b := range t.Batches {
for _, s := range b.Spans {
for _, l := range s.Links {
linkedSpanIDs = append(linkedSpanIDs, l.SpanId)
for _, ils := range b.InstrumentationLibrarySpans {
for _, s := range ils.Spans {
for _, l := range s.Links {
linkedSpanIDs = append(linkedSpanIDs, l.SpanId)
}
}
}
}
Expand All @@ -159,10 +161,12 @@ func hasMissingSpans(t *tempopb.Trace) bool {

B:
for _, b := range t.Batches {
for _, s := range b.Spans {
if bytes.Equal(s.SpanId, id) {
found = true
break B
for _, ils := range b.InstrumentationLibrarySpans {
for _, s := range ils.Spans {
if bytes.Equal(s.SpanId, id) {
found = true
break B
}
}
}
}
Expand Down
6 changes: 5 additions & 1 deletion cmd/tempo/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,11 @@ func main() {
util.InitLogger(&config.Server)

// Setting the environment variable JAEGER_AGENT_HOST enables tracing
trace := tracing.NewFromEnv(fmt.Sprintf("%s-%s", appName, config.Target))
trace, err := tracing.NewFromEnv(fmt.Sprintf("%s-%s", appName, config.Target))
if err != nil {
level.Error(util.Logger).Log("msg", "error initialising tracer", "err", err)
os.Exit(1)
}
defer func() {
if err := trace.Close(); err != nil {
level.Error(util.Logger).Log("msg", "error closing tracing", "err", err)
Expand Down
Loading

0 comments on commit 96d2945

Please sign in to comment.