From 125c594a7abec112a96ab36c75afe8d28e91ddf1 Mon Sep 17 00:00:00 2001 From: Ryan Fox-Tyler <60440289+ryanfoxtyler@users.noreply.github.com> Date: Tue, 22 Oct 2024 12:26:37 -0400 Subject: [PATCH] Replace outdated OpenCensus with OpenTelemetry --- For more details, open the [Copilot Workspace session](https://copilot-workspace.githubnext.com/dgraph-io/dgraph?shareId=XXXX-XXXX-XXXX-XXXX). --- conn/pool.go | 5 ++-- conn/raft_server.go | 16 +++++++++---- dgraph/cmd/alpha/run.go | 40 ++++++++++++++++++++++++------- dgraph/cmd/increment/increment.go | 24 ++++++++++++------- dgraph/cmd/zero/http.go | 25 +++++++++++++++++-- go.mod | 19 +++++++-------- 6 files changed, 94 insertions(+), 35 deletions(-) diff --git a/conn/pool.go b/conn/pool.go index db866441d5c..6838144e918 100644 --- a/conn/pool.go +++ b/conn/pool.go @@ -25,7 +25,9 @@ import ( "github.com/golang/glog" "github.com/pkg/errors" - "go.opencensus.io/plugin/ocgrpc" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "google.golang.org/grpc" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" @@ -170,7 +172,6 @@ func (p *Pools) Connect(addr string, tlsClientConf *tls.Config) *Pool { // newPool creates a new "pool" with one gRPC connection, refcount 0. func newPool(addr string, tlsClientConf *tls.Config) (*Pool, error) { conOpts := []grpc.DialOption{ - grpc.WithStatsHandler(&ocgrpc.ClientHandler{}), grpc.WithDefaultCallOptions( grpc.MaxCallRecvMsgSize(x.GrpcMaxSize), grpc.MaxCallSendMsgSize(x.GrpcMaxSize), diff --git a/conn/raft_server.go b/conn/raft_server.go index 55faf838afb..a5e7412b363 100644 --- a/conn/raft_server.go +++ b/conn/raft_server.go @@ -28,7 +28,9 @@ import ( "github.com/golang/glog" "github.com/pkg/errors" "go.etcd.io/etcd/raft/v3/raftpb" - otrace "go.opencensus.io/trace" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "github.com/dgraph-io/dgo/v240/protos/api" "github.com/dgraph-io/dgraph/v24/protos/pb" @@ -192,13 +194,16 @@ func (w *RaftServer) RaftMessage(server pb.Raft_RaftMessageServer) error { if ctx.Err() != nil { return ctx.Err() } - span := otrace.FromContext(ctx) + span := trace.SpanFromContext(ctx) node := w.GetNode() if node == nil || node.Raft() == nil { return ErrNoNode } - span.Annotatef(nil, "Stream server is node %#x", node.Id) + span.AddEvent("Stream server is node", trace.WithAttributes(attribute.KeyValue{ + Key: "node_id", + Value: attribute.Int64Value(int64(node.Id)), + })) var rc *pb.RaftContext raft := node.Raft() @@ -256,7 +261,10 @@ func (w *RaftServer) RaftMessage(server pb.Raft_RaftMessageServer) error { } if loop == 1 { rc = batch.GetContext() - span.Annotatef(nil, "Stream from %#x", rc.GetId()) + span.AddEvent("Stream from", trace.WithAttributes(attribute.KeyValue{ + Key: "from_id", + Value: attribute.Int64Value(int64(rc.GetId())), + })) if rc != nil { node.Connect(rc.Id, rc.Addr) } diff --git a/dgraph/cmd/alpha/run.go b/dgraph/cmd/alpha/run.go index ec0d8d672d6..7628c3e4a42 100644 --- a/dgraph/cmd/alpha/run.go +++ b/dgraph/cmd/alpha/run.go @@ -38,9 +38,13 @@ import ( "github.com/golang/glog" "github.com/pkg/errors" "github.com/spf13/cobra" - "go.opencensus.io/plugin/ocgrpc" - otrace "go.opencensus.io/trace" - "go.opencensus.io/zpages" + "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/exporters/jaeger" + "go.opentelemetry.io/otel/exporters/prometheus" + "go.opentelemetry.io/otel/sdk/resource" + "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/semconv/v1.4.0" "golang.org/x/net/trace" "google.golang.org/grpc" "google.golang.org/grpc/credentials" @@ -456,7 +460,7 @@ func serveGRPC(l net.Listener, tlsCfg *tls.Config, closer *z.Closer) { grpc.MaxRecvMsgSize(x.GrpcMaxSize), grpc.MaxSendMsgSize(x.GrpcMaxSize), grpc.MaxConcurrentStreams(1000), - grpc.StatsHandler(&ocgrpc.ServerHandler{}), + grpc.StatsHandler(&otelgrpc.ServerHandler{}), grpc.UnaryInterceptor(audit.AuditRequestGRPC), } if tlsCfg != nil { @@ -773,10 +777,30 @@ func run() { return true, true } } - otrace.ApplyConfig(otrace.Config{ - DefaultSampler: otrace.ProbabilitySampler(x.WorkerConfig.Trace.GetFloat64("ratio")), - MaxAnnotationEventsPerSpan: 256, - }) + + // Initialize OpenTelemetry + exporter, err := jaeger.New(jaeger.WithCollectorEndpoint(jaeger.WithEndpoint(Alpha.Conf.GetString("jaeger")))) + if err != nil { + log.Fatalf("Failed to create Jaeger exporter: %v", err) + } + + promExporter, err := prometheus.New() + if err != nil { + log.Fatalf("Failed to create Prometheus exporter: %v", err) + } + + tp := trace.NewTracerProvider( + trace.WithBatcher(exporter), + trace.WithBatcher(promExporter), + trace.WithResource(resource.NewWithAttributes( + semconv.SchemaURL, + semconv.ServiceNameKey.String("dgraph.alpha"), + )), + ) + otel.SetTracerProvider(tp) + + // Register Prometheus exporter + http.HandleFunc("/metrics", promExporter.ServeHTTP) // Posting will initialize index which requires schema. Hence, initialize // schema before calling posting.Init(). diff --git a/dgraph/cmd/increment/increment.go b/dgraph/cmd/increment/increment.go index b825dbfb74e..0d915fc5dad 100644 --- a/dgraph/cmd/increment/increment.go +++ b/dgraph/cmd/increment/increment.go @@ -29,7 +29,8 @@ import ( "github.com/pkg/errors" "github.com/spf13/cobra" "github.com/spf13/viper" - "go.opencensus.io/trace" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/trace" "github.com/dgraph-io/dgo/v240" "github.com/dgraph-io/dgo/v240/protos/api" @@ -90,7 +91,7 @@ type Counter struct { } func queryCounter(ctx context.Context, txn *dgo.Txn, pred string) (Counter, error) { - span := trace.FromContext(ctx) + span := trace.SpanFromContext(ctx) ctx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() @@ -114,7 +115,10 @@ func queryCounter(ctx context.Context, txn *dgo.Txn, pred string) (Counter, erro default: x.Panic(errors.Errorf("Invalid response: %q", resp.Json)) } - span.Annotatef(nil, "Found counter: %+v", counter) + span.AddEvent("Found counter", trace.WithAttributes(otel.KeyValue{ + Key: "counter", + Value: otel.StringValue(fmt.Sprintf("%+v", counter)), + })) counter.startTs = resp.GetTxn().GetStartTs() counter.qLatency = time.Duration(resp.Latency.GetTotalNs()).Round(time.Millisecond) return counter, nil @@ -140,7 +144,7 @@ func process(dg *dgo.Dgraph, conf *viper.Viper) (Counter, error) { } }() - ctx, span := trace.StartSpan(context.Background(), "Counter") + ctx, span := otel.Tracer("Counter").Start(context.Background(), "Counter") defer span.End() counter, err := queryCounter(ctx, txn, pred) @@ -170,11 +174,13 @@ func process(dg *dgo.Dgraph, conf *viper.Viper) (Counter, error) { } func run(conf *viper.Viper) { - trace.ApplyConfig(trace.Config{ - DefaultSampler: trace.AlwaysSample(), - MaxAnnotationEventsPerSpan: 256, - }) - x.RegisterExporters(conf, "dgraph.increment") + otel.SetTracerProvider(trace.NewTracerProvider( + trace.WithBatcher(jaeger.New(jaeger.WithCollectorEndpoint(Alpha.Conf.GetString("jaeger")))), + trace.WithResource(resource.NewWithAttributes( + semconv.SchemaURL, + semconv.ServiceNameKey.String("dgraph.increment"), + )), + )) startTime := time.Now() defer func() { fmt.Println("Total:", time.Since(startTime).Round(time.Millisecond)) }() diff --git a/dgraph/cmd/zero/http.go b/dgraph/cmd/zero/http.go index c60ad6eeee6..9899a41ee9a 100644 --- a/dgraph/cmd/zero/http.go +++ b/dgraph/cmd/zero/http.go @@ -32,6 +32,9 @@ import ( "github.com/dgraph-io/dgo/v240/protos/api" "github.com/dgraph-io/dgraph/v24/protos/pb" "github.com/dgraph-io/dgraph/v24/x" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" ) // intFromQueryParam checks for name as a query param, converts it to uint64 and returns it. @@ -54,6 +57,9 @@ func intFromQueryParam(w http.ResponseWriter, r *http.Request, name string) (uin } func (st *state) assign(w http.ResponseWriter, r *http.Request) { + ctx, span := otel.Tracer("zero").Start(r.Context(), "assign") + defer span.End() + x.AddCorsHeaders(w) w.Header().Set("Content-Type", "application/json") if r.Method == "OPTIONS" { @@ -70,7 +76,7 @@ func (st *state) assign(w http.ResponseWriter, r *http.Request) { } num := &pb.Num{Val: val} - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + ctx, cancel := context.WithTimeout(ctx, 30*time.Second) defer cancel() var ids *pb.AssignedIds @@ -109,6 +115,9 @@ func (st *state) assign(w http.ResponseWriter, r *http.Request) { // removeNode can be used to remove a node from the cluster. It takes in the RAFT id of the node // and the group it belongs to. It can be used to remove Dgraph alpha and Zero nodes(group=0). func (st *state) removeNode(w http.ResponseWriter, r *http.Request) { + ctx, span := otel.Tracer("zero").Start(r.Context(), "removeNode") + defer span.End() + x.AddCorsHeaders(w) if r.Method == "OPTIONS" { return @@ -144,6 +153,9 @@ func (st *state) removeNode(w http.ResponseWriter, r *http.Request) { // moveTablet can be used to move a tablet to a specific group. It takes in tablet and group as // argument. func (st *state) moveTablet(w http.ResponseWriter, r *http.Request) { + ctx, span := otel.Tracer("zero").Start(r.Context(), "moveTablet") + defer span.End() + x.AddCorsHeaders(w) if r.Method == "OPTIONS" { return @@ -211,10 +223,13 @@ func (st *state) moveTablet(w http.ResponseWriter, r *http.Request) { } func (st *state) getState(w http.ResponseWriter, r *http.Request) { + ctx, span := otel.Tracer("zero").Start(r.Context(), "getState") + defer span.End() + x.AddCorsHeaders(w) w.Header().Set("Content-Type", "application/json") - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() if err := st.node.WaitLinearizableRead(ctx); err != nil { w.WriteHeader(http.StatusInternalServerError) @@ -235,6 +250,9 @@ func (st *state) getState(w http.ResponseWriter, r *http.Request) { } func (s *Server) zeroHealth(ctx context.Context) (*api.Response, error) { + _, span := otel.Tracer("zero").Start(ctx, "zeroHealth") + defer span.End() + if ctx.Err() != nil { return nil, errors.Wrap(ctx.Err(), "http request context error") } @@ -254,6 +272,9 @@ func (s *Server) zeroHealth(ctx context.Context) (*api.Response, error) { } func (st *state) pingResponse(w http.ResponseWriter, r *http.Request) { + ctx, span := otel.Tracer("zero").Start(r.Context(), "pingResponse") + defer span.End() + x.AddCorsHeaders(w) /* diff --git a/go.mod b/go.mod index a458004f47a..2d2adb0fd1e 100644 --- a/go.mod +++ b/go.mod @@ -3,8 +3,6 @@ module github.com/dgraph-io/dgraph/v24 go 1.22.6 require ( - contrib.go.opencensus.io/exporter/jaeger v0.2.1 - contrib.go.opencensus.io/exporter/prometheus v0.4.2 github.com/DataDog/opencensus-go-exporter-datadog v0.0.0-20220622145613-731d59e8b567 github.com/HdrHistogram/hdrhistogram-go v1.1.2 github.com/IBM/sarama v1.43.3 @@ -54,7 +52,15 @@ require ( github.com/viterin/vek v0.4.2 github.com/xdg/scram v1.0.5 go.etcd.io/etcd/raft/v3 v3.5.16 - go.opencensus.io v0.24.0 + go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.54.0 + go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.54.0 + go.opentelemetry.io/otel v1.29.0 + go.opentelemetry.io/otel/exporters/jaeger v1.9.0 + go.opentelemetry.io/otel/exporters/prometheus v0.41.0 + go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.29.0 + go.opentelemetry.io/otel/metric v1.29.0 + go.opentelemetry.io/otel/sdk v1.29.0 + go.opentelemetry.io/otel/trace v1.29.0 go.uber.org/zap v1.27.0 golang.org/x/crypto v0.28.0 golang.org/x/exp v0.0.0-20240909161429-701f63a606c0 @@ -148,13 +154,6 @@ require ( github.com/uber/jaeger-client-go v2.28.0+incompatible // indirect github.com/viterin/partial v1.1.0 // indirect github.com/xdg/stringprep v1.0.3 // indirect - go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.54.0 // indirect - go.opentelemetry.io/otel v1.29.0 // indirect - go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.29.0 // indirect - go.opentelemetry.io/otel/metric v1.29.0 // indirect - go.opentelemetry.io/otel/sdk v1.29.0 // indirect - go.opentelemetry.io/otel/trace v1.29.0 // indirect - go.uber.org/multierr v1.11.0 // indirect golang.org/x/time v0.6.0 // indirect google.golang.org/api v0.196.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20240903143218-8af14fe29dc1 // indirect