Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

contrib/google.golang.org/grpc: add WithErrorCheck option #2035

Open
wants to merge 19 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions contrib/google.golang.org/grpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (cs *clientStream) RecvMsg(m interface{}) (err error) {
if p, ok := peer.FromContext(cs.Context()); ok {
setSpanTargetFromPeer(span, *p)
}
defer func() { finishWithError(span, err, cs.cfg) }()
defer func() { finishWithError(span, err, cs.method, cs.cfg) }()
}
err = cs.ClientStream.RecvMsg(m)
return err
Expand All @@ -64,7 +64,7 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) {
if p, ok := peer.FromContext(cs.Context()); ok {
setSpanTargetFromPeer(span, *p)
}
defer func() { finishWithError(span, err, cs.cfg) }()
defer func() { finishWithError(span, err, cs.method, cs.cfg) }()
}
err = cs.ClientStream.SendMsg(m)
return err
Expand Down Expand Up @@ -104,7 +104,7 @@ func StreamClientInterceptor(opts ...Option) grpc.StreamClientInterceptor {
return err
})
if err != nil {
finishWithError(span, err, cfg)
finishWithError(span, err, method, cfg)
return nil, err
}

Expand All @@ -116,7 +116,7 @@ func StreamClientInterceptor(opts ...Option) grpc.StreamClientInterceptor {

go func() {
<-stream.Context().Done()
finishWithError(span, stream.Context().Err(), cfg)
finishWithError(span, stream.Context().Err(), method, cfg)
}()
} else {
// if call tracing is disabled, just call streamer, but still return
Expand Down Expand Up @@ -158,7 +158,7 @@ func UnaryClientInterceptor(opts ...Option) grpc.UnaryClientInterceptor {
func(ctx context.Context, opts []grpc.CallOption) error {
return invoker(ctx, method, req, reply, cc, opts...)
})
finishWithError(span, err, cfg)
finishWithError(span, err, method, cfg)
return err
}
}
Expand Down
7 changes: 5 additions & 2 deletions contrib/google.golang.org/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ func init() {
// cache a constant option: saves one allocation per call
var spanTypeRPC = tracer.SpanType(ext.AppTypeRPC)

type fullMethodNameKey struct{}

func (cfg *config) startSpanOptions(opts ...tracer.StartSpanOption) []tracer.StartSpanOption {
if len(cfg.tags) == 0 && len(cfg.spanOpts) == 0 {
return opts
Expand Down Expand Up @@ -69,16 +71,17 @@ func startSpanFromContext(
if sctx, err := tracer.Extract(grpcutil.MDCarrier(md)); err == nil {
opts = append(opts, tracer.ChildOf(sctx))
}
ctx = context.WithValue(ctx, fullMethodNameKey{}, method)
return tracer.StartSpanFromContext(ctx, operation, opts...)
}

// finishWithError applies finish option and a tag with gRPC status code, disregarding OK, EOF and Canceled errors.
func finishWithError(span ddtrace.Span, err error, cfg *config) {
func finishWithError(span ddtrace.Span, err error, method string, cfg *config) {
if errors.Is(err, io.EOF) || errors.Is(err, context.Canceled) {
err = nil
}
errcode := status.Code(err)
if errcode == codes.OK || cfg.nonErrorCodes[errcode] {
if errcode == codes.OK || cfg.nonErrorCodes[errcode] || (cfg.nonErrorFunc != nil && cfg.nonErrorFunc(method, err)) {
err = nil
}
span.SetTag(tagCode, errcode.String())
Expand Down
10 changes: 10 additions & 0 deletions contrib/google.golang.org/grpc/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type config struct {
serviceName func() string
spanName string
nonErrorCodes map[codes.Code]bool
nonErrorFunc func(method string, err error) bool
traceStreamCalls bool
traceStreamMessages bool
noDebugStack bool
Expand Down Expand Up @@ -129,6 +130,15 @@ func NonErrorCodes(cs ...codes.Code) InterceptorOption {
}
}

// NonErrorFunc sets a custom function to determine whether an error should not be considered as an error for tracing purposes.
// This function is evaluated when an error occurs, and if it returns true, the error will not be recorded in the trace.
// f: A function taking the gRPC method and error as arguments, returning a boolean to indicate if the error should be ignored.
func NonErrorFunc(f func(method string, err error) bool) InterceptorOption {
return func(cfg *config) {
cfg.nonErrorFunc = f
}
}

// WithAnalytics enables Trace Analytics for all started spans.
func WithAnalytics(on bool) Option {
return func(cfg *config) {
Expand Down
8 changes: 4 additions & 4 deletions contrib/google.golang.org/grpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (ss *serverStream) RecvMsg(m interface{}) (err error) {
defer func() {
withMetadataTags(ss.ctx, ss.cfg, span)
withRequestTags(ss.cfg, m, span)
finishWithError(span, err, ss.cfg)
finishWithError(span, err, ss.method, ss.cfg)
}()
}
err = ss.ServerStream.RecvMsg(m)
Expand All @@ -72,7 +72,7 @@ func (ss *serverStream) SendMsg(m interface{}) (err error) {
ss.cfg.startSpanOptions(tracer.Measured())...,
)
span.SetTag(ext.Component, componentName)
defer func() { finishWithError(span, err, ss.cfg) }()
defer func() { finishWithError(span, err, ss.method, ss.cfg) }()
}
err = ss.ServerStream.SendMsg(m)
return err
Expand Down Expand Up @@ -110,7 +110,7 @@ func StreamServerInterceptor(opts ...Option) grpc.StreamServerInterceptor {
case info.IsClientStream:
span.SetTag(tagMethodKind, methodKindClientStream)
}
defer func() { finishWithError(span, err, cfg) }()
defer func() { finishWithError(span, err, info.FullMethod, cfg) }()
if appsec.Enabled() {
handler = appsecStreamHandlerMiddleware(span, handler)
}
Expand Down Expand Up @@ -157,7 +157,7 @@ func UnaryServerInterceptor(opts ...Option) grpc.UnaryServerInterceptor {
handler = appsecUnaryHandlerMiddleware(span, handler)
}
resp, err := handler(ctx, req)
finishWithError(span, err, cfg)
finishWithError(span, err, info.FullMethod, cfg)
return resp, err
}
}
Expand Down
10 changes: 9 additions & 1 deletion contrib/google.golang.org/grpc/stats_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,15 @@ func (h *clientStatsHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
span.SetTag(ext.TargetPort, port)
}
case *stats.End:
finishWithError(span, rs.Error, h.cfg)
val := ctx.Value(fullMethodNameKey{})
if val == nil {
return
}
fullMethod, ok := val.(string)
if !ok {
return
}
finishWithError(span, rs.Error, fullMethod, h.cfg)
darccio marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand Down
11 changes: 10 additions & 1 deletion contrib/google.golang.org/grpc/stats_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ func (h *serverStatsHandler) TagRPC(ctx context.Context, rti *stats.RPCTagInfo)
h.cfg.serviceName,
spanOpts...,
)
ctx = context.WithValue(ctx, fullMethodNameKey{}, rti.FullMethodName)
return ctx
}

Expand All @@ -52,8 +53,16 @@ func (h *serverStatsHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
if !ok {
return
}
val := ctx.Value(fullMethodNameKey{})
if val == nil {
return
}
fullMethod, ok := val.(string)
if !ok {
return
}
darccio marked this conversation as resolved.
Show resolved Hide resolved
if v, ok := rs.(*stats.End); ok {
finishWithError(span, v.Error, h.cfg)
finishWithError(span, v.Error, fullMethod, h.cfg)
}
}

Expand Down