Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stats/opentelemetry: Introduce Tracing API #7852

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,10 @@ func (dcs *defaultConfigSelector) SelectConfig(rpcInfo iresolver.RPCInfo) (*ires
// function.
func NewClient(target string, opts ...DialOption) (conn *ClientConn, err error) {
cc := &ClientConn{
target: target,
conns: make(map[*addrConn]struct{}),
dopts: defaultDialOptions(),
target: target,
conns: make(map[*addrConn]struct{}),
dopts: defaultDialOptions(),
nameResolutionDelayed: false,
}

cc.retryThrottler.Store((*retryThrottler)(nil))
Expand Down Expand Up @@ -604,6 +605,10 @@ type ClientConn struct {
idlenessMgr *idle.Manager
metricsRecorderList *stats.MetricsRecorderList

// To track if there was a delay in name resolution, helping to track
// latency issues in gRPC connection setup.
nameResolutionDelayed bool

// The following provide their own synchronization, and therefore don't
// require cc.mu to be held to access them.
csMgr *connectivityStateManager
Expand Down
5 changes: 5 additions & 0 deletions stats/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ type RPCTagInfo struct {
// FailFast indicates if this RPC is failfast.
// This field is only valid on client side, it's always false on server side.
FailFast bool
// NameResolutionDelay indicates whether there was a delay in name
// resolution.
//
// This field is only valid on client side, it's always false on server side.
NameResolutionDelay bool
}

// Handler defines the interface for the related stats handling (e.g., RPCs, connections).
Expand Down
96 changes: 77 additions & 19 deletions stats/opentelemetry/client_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,15 @@

import (
"context"
"strings"
"sync/atomic"
"time"

"go.opentelemetry.io/otel"
otelcodes "go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc"
grpccodes "google.golang.org/grpc/codes"
estats "google.golang.org/grpc/experimental/stats"
istats "google.golang.org/grpc/internal/stats"
"google.golang.org/grpc/metadata"
Expand All @@ -33,6 +38,7 @@
)

type clientStatsHandler struct {
statsHandler
estats.MetricsRecorder
options Options
clientMetrics clientMetrics
Expand Down Expand Up @@ -68,6 +74,15 @@
rm.registerMetrics(metrics, meter)
}

func (h *clientStatsHandler) initializeTracing() {
if isTracingDisabled(h.options.TraceOptions) {
return
}

otel.SetTextMapPropagator(h.options.TraceOptions.TextMapPropagator)
otel.SetTracerProvider(h.options.TraceOptions.TracerProvider)
}

func (h *clientStatsHandler) unaryInterceptor(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
ci := &callInfo{
target: cc.CanonicalTarget(),
Expand All @@ -85,8 +100,12 @@
}

startTime := time.Now()
var span *trace.Span
if !isTracingDisabled(h.options.TraceOptions) {
purnesh42H marked this conversation as resolved.
Show resolved Hide resolved
ctx, span = h.createCallTraceSpan(ctx, method)
}
err := invoker(ctx, method, req, reply, cc, opts...)
h.perCallMetrics(ctx, err, startTime, ci)
h.perCallTracesAndMetrics(ctx, err, startTime, ci, span)
return err
}

Expand Down Expand Up @@ -119,22 +138,50 @@
}

startTime := time.Now()

var span *trace.Span
if !isTracingDisabled(h.options.TraceOptions) {
purnesh42H marked this conversation as resolved.
Show resolved Hide resolved
ctx, span = h.createCallTraceSpan(ctx, method)
}
callback := func(err error) {
h.perCallMetrics(ctx, err, startTime, ci)
h.perCallTracesAndMetrics(ctx, err, startTime, ci, span)
}
opts = append([]grpc.CallOption{grpc.OnFinish(callback)}, opts...)
return streamer(ctx, desc, cc, method, opts...)
}

func (h *clientStatsHandler) perCallMetrics(ctx context.Context, err error, startTime time.Time, ci *callInfo) {
callLatency := float64(time.Since(startTime)) / float64(time.Second) // calculate ASAP
attrs := otelmetric.WithAttributeSet(otelattribute.NewSet(
otelattribute.String("grpc.method", ci.method),
otelattribute.String("grpc.target", ci.target),
otelattribute.String("grpc.status", canonicalString(status.Code(err))),
))
h.clientMetrics.callDuration.Record(ctx, callLatency, attrs)
// perCallTracesAndMetrics records per call trace spans and metrics.
func (h *clientStatsHandler) perCallTracesAndMetrics(ctx context.Context, err error, startTime time.Time, ci *callInfo, ts *trace.Span) {
if !isTracingDisabled(h.options.TraceOptions) && ts != nil {
s := status.Convert(err)
if s.Code() == grpccodes.OK {
(*ts).SetStatus(otelcodes.Ok, s.Message())
} else {
(*ts).SetStatus(otelcodes.Error, s.Message())
}

Check warning on line 160 in stats/opentelemetry/client_metrics.go

View check run for this annotation

Codecov / codecov/patch

stats/opentelemetry/client_metrics.go#L159-L160

Added lines #L159 - L160 were not covered by tests
(*ts).End()
}
if !isMetricsDisabled(h.options.MetricsOptions) {
callLatency := float64(time.Since(startTime)) / float64(time.Second)
attrs := otelmetric.WithAttributeSet(otelattribute.NewSet(
otelattribute.String("grpc.method", ci.method),
otelattribute.String("grpc.target", ci.target),
otelattribute.String("grpc.status", canonicalString(status.Code(err))),
))
h.clientMetrics.callDuration.Record(ctx, callLatency, attrs)
}
}

// createCallTraceSpan creates a call span to put in the provided context using
// provided TraceProvider. If TraceProvider is nil, it returns context as is.
func (h *clientStatsHandler) createCallTraceSpan(ctx context.Context, method string) (context.Context, *trace.Span) {
if h.options.TraceOptions.TracerProvider == nil {
logger.Error("TraceProvider is not provided in trace options")
return ctx, nil
}

Check warning on line 180 in stats/opentelemetry/client_metrics.go

View check run for this annotation

Codecov / codecov/patch

stats/opentelemetry/client_metrics.go#L178-L180

Added lines #L178 - L180 were not covered by tests
mn := strings.Replace(removeLeadingSlash(method), "/", ".", -1)
tracer := otel.Tracer("grpc-open-telemetry")
ctx, span := tracer.Start(ctx, mn, trace.WithSpanKind(trace.SpanKindClient))
return ctx, &span
}

// TagConn exists to satisfy stats.Handler.
Expand Down Expand Up @@ -163,15 +210,21 @@
}
ctx = istats.SetLabels(ctx, labels)
}
ai := &attemptInfo{ // populates information about RPC start.
startTime: time.Now(),
xdsLabels: labels.TelemetryLabels,
method: info.FullMethodName,
ai := &attemptInfo{}
startTime := time.Now()
if !isTracingDisabled(h.options.TraceOptions) {
callSpan := trace.SpanFromContext(ctx)
if info.NameResolutionDelay {
callSpan.AddEvent("Delayed name resolution complete")
}

Check warning on line 219 in stats/opentelemetry/client_metrics.go

View check run for this annotation

Codecov / codecov/patch

stats/opentelemetry/client_metrics.go#L218-L219

Added lines #L218 - L219 were not covered by tests
ctx, ai = h.traceTagRPC(trace.ContextWithSpan(ctx, callSpan), info)
}
ri := &rpcInfo{
ai.startTime = startTime
ai.xdsLabels = labels.TelemetryLabels
ai.method = info.FullMethodName
return setRPCInfo(ctx, &rpcInfo{
ai: ai,
}
return setRPCInfo(ctx, ri)
})
}

func (h *clientStatsHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
Expand All @@ -180,7 +233,12 @@
logger.Error("ctx passed into client side stats handler metrics event handling has no client attempt data present")
return
}
h.processRPCEvent(ctx, rs, ri.ai)
if !isMetricsDisabled(h.options.MetricsOptions) {
h.processRPCEvent(ctx, rs, ri.ai)
}
if !isTracingDisabled(h.options.TraceOptions) {
h.populateSpan(ctx, rs, ri.ai)
}
}

func (h *clientStatsHandler) processRPCEvent(ctx context.Context, s stats.RPCStats, ai *attemptInfo) {
Expand Down
Loading
Loading