Skip to content

Commit

Permalink
Correct the translation of protobuf to json at query frontend (grafan…
Browse files Browse the repository at this point in the history
…a#480)

* Correct the translation of protobuf to json at query frontend

Signed-off-by: Annanay <[email protected]>

* Tests improvements, simplified code, added error tags to spans

Signed-off-by: Annanay <[email protected]>

* Remove logging in util function, log at caller

Signed-off-by: Annanay <[email protected]>
  • Loading branch information
annanay25 authored Jan 28, 2021
1 parent d7bc9b1 commit 80a1717
Show file tree
Hide file tree
Showing 11 changed files with 166 additions and 44 deletions.
6 changes: 5 additions & 1 deletion modules/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,11 @@ func (c *Compactor) Owns(hash string) bool {
}

func (c *Compactor) Combine(objA []byte, objB []byte) []byte {
return tempo_util.CombineTraces(objA, objB)
combinedTrace, err := tempo_util.CombineTraces(objA, objB)
if err != nil {
level.Error(util.Logger).Log("msg", "error combining trace protos", "err", err.Error())
}
return combinedTrace
}

func (c *Compactor) waitRingActive(ctx context.Context) error {
Expand Down
8 changes: 7 additions & 1 deletion modules/frontend/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
ot_log "github.com/opentracing/opentracing-go/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/weaveworks/common/middleware"
"github.com/weaveworks/common/user"

"github.com/grafana/tempo/pkg/util"
Expand Down Expand Up @@ -54,7 +55,12 @@ func NewTripperware(cfg Config, logger log.Logger, registerer prometheus.Registe
r = r.WithContext(ctx)
resp, err := rt.RoundTrip(r)

level.Info(logger).Log("method", r.Method, "url", r.URL.RequestURI(), "duration", time.Since(start).String(), "error", err == nil)
traceID, _ := middleware.ExtractTraceID(ctx)
statusCode := 500
if resp != nil {
statusCode = resp.StatusCode
}
level.Info(logger).Log("method", r.Method, "traceID", traceID, "url", r.URL.RequestURI(), "duration", time.Since(start).String(), "status", statusCode)

return resp, err
})
Expand Down
46 changes: 43 additions & 3 deletions modules/frontend/querysharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package frontend

import (
"bytes"
"context"
"encoding/binary"
"encoding/hex"
"io"
Expand All @@ -10,10 +11,14 @@ import (
"strings"

"github.com/go-kit/kit/log"
"github.com/golang/protobuf/jsonpb"
"github.com/golang/protobuf/proto"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/weaveworks/common/user"

"github.com/grafana/tempo/modules/querier"
"github.com/grafana/tempo/pkg/tempopb"
"github.com/grafana/tempo/pkg/util"
)

Expand Down Expand Up @@ -51,6 +56,12 @@ func (s shardQuery) Do(r *http.Request) (*http.Response, error) {
s.blockBoundaries = createBlockBoundaries(s.queryShards)
}

// check marshalling format
marshallingFormat := util.JSONTypeHeaderValue
if r.Header.Get(util.AcceptHeaderKey) == util.ProtobufTypeHeaderValue {
marshallingFormat = util.ProtobufTypeHeaderValue
}

reqs := make([]*http.Request, s.queryShards)
for i := 0; i < len(s.blockBoundaries)-1; i++ {
reqs[i] = r.Clone(r.Context())
Expand All @@ -66,6 +77,9 @@ func (s shardQuery) Do(r *http.Request) (*http.Response, error) {

reqs[i].Header.Set(user.OrgIDHeaderName, userID)

// Enforce frontend <> querier communication to be in protobuf bytes
reqs[i].Header.Set(util.AcceptHeaderKey, util.ProtobufTypeHeaderValue)

// adding to RequestURI only because weaveworks/common uses the RequestURI field to
// translate from http.Request to httpgrpc.Request
// https://github.com/weaveworks/common/blob/47e357f4e1badb7da17ad74bae63e228bdd76e8f/httpgrpc/server/server.go#L48
Expand All @@ -77,7 +91,7 @@ func (s shardQuery) Do(r *http.Request) (*http.Response, error) {
return nil, err
}

return mergeResponses(rrs)
return mergeResponses(r.Context(), marshallingFormat, rrs)
}

// createBlockBoundaries splits the range of blockIDs into queryShards parts
Expand Down Expand Up @@ -139,7 +153,11 @@ func doRequests(reqs []*http.Request, downstream Handler) ([]RequestResponse, er
return resps, firstErr
}

func mergeResponses(rrs []RequestResponse) (*http.Response, error) {
func mergeResponses(ctx context.Context, marshallingFormat string, rrs []RequestResponse) (*http.Response, error) {
// tracing instrumentation
span, _ := opentracing.StartSpanFromContext(ctx, "frontend.mergeResponses")
defer span.Finish()

var errCode = http.StatusOK
var errBody io.ReadCloser
var combinedTrace []byte
Expand All @@ -155,7 +173,11 @@ func mergeResponses(rrs []RequestResponse) (*http.Response, error) {
if len(combinedTrace) == 0 {
combinedTrace = body
} else {
combinedTrace = util.CombineTraces(combinedTrace, body)
combinedTrace, err = util.CombineTraces(combinedTrace, body)
if err != nil {
// will result in a 500 internal server error
return nil, errors.Wrap(err, "error combining traces at query frontend")
}
}
} else if rr.Response.StatusCode != http.StatusNotFound {
errCode = rr.Response.StatusCode
Expand All @@ -174,6 +196,24 @@ func mergeResponses(rrs []RequestResponse) (*http.Response, error) {
}

if errCode == http.StatusOK {
if marshallingFormat == util.JSONTypeHeaderValue {
// if request is for application/json, unmarshal into proto object and re-marshal into json bytes
traceObject := &tempopb.Trace{}
err := proto.Unmarshal(combinedTrace, traceObject)
if err != nil {
return nil, err
}

var jsonTrace bytes.Buffer
marshaller := &jsonpb.Marshaler{}
err = marshaller.Marshal(&jsonTrace, traceObject)
if err != nil {
return nil, err
}
combinedTrace = jsonTrace.Bytes()
}

span.SetTag("response marshalling format", marshallingFormat)
return &http.Response{
StatusCode: http.StatusOK,
Body: ioutil.NopCloser(bytes.NewReader(combinedTrace)),
Expand Down
56 changes: 50 additions & 6 deletions modules/frontend/querysharding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,18 @@ package frontend

import (
"bytes"
"context"
"io/ioutil"
"net/http"
"testing"

"github.com/golang/protobuf/jsonpb"
"github.com/golang/protobuf/proto"
"github.com/stretchr/testify/assert"

"github.com/grafana/tempo/pkg/tempopb"
"github.com/grafana/tempo/pkg/util"
"github.com/grafana/tempo/pkg/util/test"
"github.com/stretchr/testify/assert"
)

func TestCreateBlockShards(t *testing.T) {
Expand Down Expand Up @@ -59,10 +63,23 @@ func TestMergeResponses(t *testing.T) {
b2, err := proto.Marshal(t2)
assert.NoError(t, err)

combinedTrace, err := util.CombineTraces(b1, b2)
assert.NoError(t, err)

traceObject := &tempopb.Trace{}
err = proto.Unmarshal(combinedTrace, traceObject)
assert.NoError(t, err)

var combinedTraceJSON bytes.Buffer
marshaller := &jsonpb.Marshaler{}
err = marshaller.Marshal(&combinedTraceJSON, traceObject)
assert.NoError(t, err)

tests := []struct {
name string
requestResponse []RequestResponse
expected *http.Response
name string
requestResponse []RequestResponse
marshallingFormat string
expected *http.Response
}{
{
name: "combine status ok responses",
Expand All @@ -88,7 +105,7 @@ func TestMergeResponses(t *testing.T) {
},
expected: &http.Response{
StatusCode: http.StatusOK,
Body: ioutil.NopCloser(bytes.NewReader(util.CombineTraces(b1, b2))),
Body: ioutil.NopCloser(bytes.NewReader(combinedTrace)),
Header: http.Header{},
},
},
Expand Down Expand Up @@ -158,10 +175,37 @@ func TestMergeResponses(t *testing.T) {
Header: http.Header{},
},
},
{
name: "accept application/json",
requestResponse: []RequestResponse{
{
Response: &http.Response{
StatusCode: http.StatusOK,
Body: ioutil.NopCloser(bytes.NewReader(b1)),
},
},
{
Response: &http.Response{
StatusCode: http.StatusOK,
Body: ioutil.NopCloser(bytes.NewReader(b2)),
},
},
},
marshallingFormat: util.JSONTypeHeaderValue,
expected: &http.Response{
StatusCode: http.StatusOK,
Body: ioutil.NopCloser(bytes.NewReader(combinedTraceJSON.Bytes())),
Header: http.Header{},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
merged, err := mergeResponses(tt.requestResponse)
marshallingFormat := util.ProtobufTypeHeaderValue
if len(tt.marshallingFormat) > 0 {
marshallingFormat = tt.marshallingFormat
}
merged, err := mergeResponses(context.Background(), marshallingFormat, tt.requestResponse)
assert.NoError(t, err)
assert.Equal(t, tt.expected, merged)
})
Expand Down
6 changes: 5 additions & 1 deletion modules/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,11 @@ func (i *instance) writeTraceToHeadBlock(id common.ID, b []byte) error {
}

func (i *instance) Combine(objA []byte, objB []byte) []byte {
return util.CombineTraces(objA, objB)
combinedTrace, err := util.CombineTraces(objA, objB)
if err != nil {
level.Error(cortex_util.Logger).Log("msg", "error combining trace protos", "err", err.Error())
}
return combinedTrace
}

// pushRequestTraceID gets the TraceID of the first span in the batch and assumes its the trace ID throughout
Expand Down
2 changes: 2 additions & 0 deletions modules/querier/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ func (q *Querier) TraceByIDHandler(w http.ResponseWriter, r *http.Request) {
}

if r.Header.Get(util.AcceptHeaderKey) == util.ProtobufTypeHeaderValue {
span.SetTag("response marshalling format", util.ProtobufTypeHeaderValue)
b, err := proto.Marshal(resp.Trace)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
Expand All @@ -82,6 +83,7 @@ func (q *Querier) TraceByIDHandler(w http.ResponseWriter, r *http.Request) {
return
}

span.SetTag("response marshalling format", util.JSONTypeHeaderValue)
marshaller := &jsonpb.Marshaler{}
err = marshaller.Marshal(w, resp.Trace)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/util/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ const (
TraceIDVar = "traceID"
AcceptHeaderKey = "Accept"
ProtobufTypeHeaderValue = "application/protobuf"
JSONTypeHeaderValue = "application/json"
)

func ParseTraceID(r *http.Request) ([]byte, error) {
Expand Down
29 changes: 10 additions & 19 deletions pkg/util/trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,55 +5,46 @@ import (
"hash"
"hash/fnv"

"github.com/pkg/errors"

"github.com/cortexproject/cortex/pkg/util"
"github.com/go-kit/kit/log/level"
"github.com/gogo/protobuf/proto"
"github.com/grafana/tempo/pkg/tempopb"
)

func CombineTraces(objA []byte, objB []byte) []byte {
func CombineTraces(objA []byte, objB []byte) ([]byte, error) {
// if the byte arrays are the same, we can return quickly
if bytes.Equal(objA, objB) {
return objA
return objA, nil
}

// hashes differ. unmarshal and combine traces
traceA := &tempopb.Trace{}
traceB := &tempopb.Trace{}

errA := proto.Unmarshal(objA, traceA)
if errA != nil {
level.Error(util.Logger).Log("msg", "error unsmarshaling objA", "err", errA)
}

errB := proto.Unmarshal(objB, traceB)
if errB != nil {
level.Error(util.Logger).Log("msg", "error unsmarshaling objB", "err", errB)
}

// if we had problems unmarshaling one or the other, return the one that marshalled successfully
if errA != nil && errB == nil {
return objB
return objB, errors.Wrap(errA, "error unsmarshaling objA")
} else if errB != nil && errA == nil {
return objA
return objA, errors.Wrap(errB, "error unsmarshaling objB")
} else if errA != nil && errB != nil {
// if both failed let's send back an empty trace
level.Error(util.Logger).Log("msg", "both A and B failed to unmarshal. returning an empty trace")
bytes, err := proto.Marshal(&tempopb.Trace{})
if err != nil {
level.Error(util.Logger).Log("msg", "somehow marshalling an empty trace threw an error.", "err", err)
}
return bytes
bytes, _ := proto.Marshal(&tempopb.Trace{})
return bytes, errors.Wrap(errA, "both A and B failed to unmarshal. returning an empty trace")
}

traceComplete, _, _, _ := CombineTraceProtos(traceA, traceB)

bytes, err := proto.Marshal(traceComplete)
if err != nil {
level.Error(util.Logger).Log("msg", "marshalling the combine trace threw an error.", "err", err)
return objA
return objA, errors.Wrap(err, "marshalling the combine trace threw an error")
}
return bytes
return bytes, nil
}

// CombineTraceProtos combines two trace protos into one. Note that it is destructive.
Expand Down
Loading

0 comments on commit 80a1717

Please sign in to comment.