diff --git a/.github/workflows/govulncheck.yml b/.github/workflows/govulncheck.yml index 9c271a919e..152b5b50b8 100644 --- a/.github/workflows/govulncheck.yml +++ b/.github/workflows/govulncheck.yml @@ -32,7 +32,6 @@ jobs: run: govulncheck ./ddtrace/... ./appsec/... ./profiler/... ./internal/... - name: Run govulncheck-contribs run: | - # Excluding legacy contrib grpc.v12 - go list -f '{{.Dir}}' ./contrib/... | grep -v -e grpc.v12 | while read dir ; do + go list -f '{{.Dir}}' ./contrib/... | while read dir ; do govulncheck -C $dir . done \ No newline at end of file diff --git a/.github/workflows/multios-unit-tests.yml b/.github/workflows/multios-unit-tests.yml index b9f8a7f9ba..3ca8900602 100644 --- a/.github/workflows/multios-unit-tests.yml +++ b/.github/workflows/multios-unit-tests.yml @@ -53,7 +53,7 @@ jobs: - name: "Runner ${{ matrix.runner-index }}: Test Core and Contrib (No Integration Tests)" shell: bash run: | - go list ./... | grep -v -e grpc.v12 -e google.golang.org/api -e sarama -e confluent-kafka-go -e cmemprof | sort >packages.txt + go list ./... | grep -v -e google.golang.org/api -e sarama -e confluent-kafka-go -e cmemprof | sort >packages.txt gotestsum --junitfile ${REPORT} -- $(cat packages.txt) -v -coverprofile=coverage.txt -covermode=atomic -timeout 15m - name: Upload the results to Datadog CI App if: always() diff --git a/.github/workflows/orchestrion.yml b/.github/workflows/orchestrion.yml new file mode 100644 index 0000000000..f7578e4ecd --- /dev/null +++ b/.github/workflows/orchestrion.yml @@ -0,0 +1,23 @@ +name: Orchestrion +on: + workflow_dispatch: # manually + pull_request: + merge_group: + push: + branches: + - release-v* + +permissions: read-all + +concurrency: + # Automatically cancel previous runs if a new one is triggered to conserve resources. + group: ${{ github.workflow }}-${{ github.event_name }}-${{ github.ref }} + cancel-in-progress: true + +jobs: + test: + name: 'Run Tests' + uses: DataDog/orchestrion/.github/workflows/workflow_call.yml@eliott.bouhana/APPSEC-53773 # we don't want to pin our own action + with: + dd-trace-go-ref: ${{ github.sha }} + runs-on: ubuntu-latest-16-cores diff --git a/.github/workflows/unit-integration-tests.yml b/.github/workflows/unit-integration-tests.yml index 81d027f00d..0eb78e3b7e 100644 --- a/.github/workflows/unit-integration-tests.yml +++ b/.github/workflows/unit-integration-tests.yml @@ -85,6 +85,9 @@ jobs: image: cassandra:3.11 env: JVM_OPTS: "-Xms750m -Xmx750m" + CASSANDRA_CLUSTER_NAME: "dd-trace-go-test-cluster" + CASSANDRA_DC: "dd-trace-go-test-datacenter" + CASSANDRA_ENDPOINT_SNITCH: "GossipingPropertyFileSnitch" ports: - 9042:9042 mysql: @@ -194,7 +197,7 @@ jobs: - name: Test Contrib run: | mkdir -p $TEST_RESULTS - PACKAGE_NAMES=$(go list ./contrib/... | grep -v -e grpc.v12 -e google.golang.org/api) + PACKAGE_NAMES=$(go list ./contrib/... | grep -v -e google.golang.org/api) gotestsum --junitfile ${TEST_RESULTS}/gotestsum-report.xml -- $PACKAGE_NAMES -v -race -coverprofile=coverage.txt -covermode=atomic - name: Upload the results to Datadog CI App @@ -246,32 +249,6 @@ jobs: go mod tidy # Go1.16 doesn't update the sum file correctly after the go get, this tidy fixes it go test -v ./contrib/google.golang.org/api/... - - name: Testing outlier gRPC v1.2 - run: | - # This hacky approach is necessary because running the tests regularly - # do not allow using grpc-go@v1.2.0 alongside sketches-go@v1.1.0. - # sketches-go@v1.0.0 is no longer possible to test because internal/datastreams/propagator.go - # expects sketches-go to have the package `github.com/DataDog/sketches-go/ddsketch/encoding` which - # is only present from v1.1.0 onwards. - go mod vendor - - # Checkout grpc-go@v1.2.0 - cd vendor/google.golang.org && rm -rf grpc - git clone https://github.com/grpc/grpc-go grpc && cd grpc - git fetch origin && git checkout v1.2.0 && cd ../../.. - - # Checkout sketches-go@v1.1.0 - cd vendor/github.com/DataDog && rm -rf sketches-go - git clone https://github.com/DataDog/sketches-go && cd sketches-go - git fetch origin && git checkout v1.1.0 && cd ../../../.. - - # Revert to old metadata functions as FromIncomingContext and NewOutgoingContext are not present in v1.2.0. - # These functions were updated to current versions to avoid compilation errors in the development environments. - sed -i 's/metadata\.FromIncomingContext/metadata.FromContext/g' ./contrib/google.golang.org/grpc.v12/* - sed -i 's/metadata\.NewOutgoingContext/metadata.NewContext/g' ./contrib/google.golang.org/grpc.v12/* - - go test -mod=vendor -v ./contrib/google.golang.org/grpc.v12/... - test-core: runs-on: group: "APM Larger Runners" diff --git a/.gitignore b/.gitignore index 66bca130e5..5528039708 100644 --- a/.gitignore +++ b/.gitignore @@ -15,7 +15,6 @@ go.work* dd-trace-go.iml vendor -/contrib/google.golang.org/grpc.v12/vendor/ /contrib_coverage.txt /core_coverage.txt /gotestsum-report.xml diff --git a/.golangci.yml b/.golangci.yml index 88a9f92c5b..6fcd0cd10e 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -1,8 +1,6 @@ run: deadline: 10m - skip-dirs: - # This package is an exception and has its own test job (Testing outlier gRPC v1.2). - - contrib/google.golang.org/grpc.v12 + linters: disable-all: true enable: diff --git a/README.md b/README.md index 8ea1c5ad10..070fb5720a 100644 --- a/README.md +++ b/README.md @@ -53,9 +53,7 @@ Before considering contributions to the project, please take a moment to read ou ### Testing -Tests can be run locally using the Go toolset. The grpc.v12 integration will fail (and this is normal), because it covers for deprecated methods. In the CI environment -we vendor this version of the library inside the integration. Under normal circumstances this is not something that we want to do, because users using this integration -might be running versions different from the vendored one, creating hard to debug conflicts. +Tests can be run locally using the Go toolset. To run integration tests locally, you should set the `INTEGRATION` environment variable. The dependencies of the integration tests are best run via Docker. To get an idea about the versions and the set-up take a look at our [docker-compose config](./docker-compose.yaml). diff --git a/contrib/gocql/gocql/example_test.go b/contrib/gocql/gocql/example_test.go index 0313fdd32a..34982b5ab3 100644 --- a/contrib/gocql/gocql/example_test.go +++ b/contrib/gocql/gocql/example_test.go @@ -7,15 +7,18 @@ package gocql_test import ( "context" + "log" + + "github.com/gocql/gocql" gocqltrace "gopkg.in/DataDog/dd-trace-go.v1/contrib/gocql/gocql" "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext" "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" ) -func Example() { +func ExampleNewCluster() { // Initialise a wrapped Cassandra session and create a query. - cluster := gocqltrace.NewCluster([]string{"127.0.0.1"}, gocqltrace.WithServiceName("ServiceName")) + cluster := gocqltrace.NewCluster([]string{"127.0.0.1:9043"}, gocqltrace.WithServiceName("ServiceName")) session, _ := cluster.CreateSession() query := session.Query("CREATE KEYSPACE if not exists trace WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor': 1}") @@ -34,3 +37,63 @@ func Example() { // Execute your query as usual query.Exec() } + +func ExampleCreateTracedSession() { + cluster := gocql.NewCluster("127.0.0.1:9042") + cluster.Keyspace = "my-keyspace" + + // Create a new traced session using any number of options + session, err := gocqltrace.CreateTracedSession(cluster, gocqltrace.WithServiceName("ServiceName")) + if err != nil { + log.Fatal(err) + } + query := session.Query("CREATE KEYSPACE if not exists trace WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor': 1}") + + // Use context to pass information down the call chain + _, ctx := tracer.StartSpanFromContext(context.Background(), "parent.request", + tracer.SpanType(ext.SpanTypeCassandra), + tracer.ServiceName("web"), + tracer.ResourceName("/home"), + ) + query.WithContext(ctx) + + // If you don't want a concrete query to be traced, you can do query.Observer(nil) + + // Finally, execute the query + if err := query.Exec(); err != nil { + log.Fatal(err) + } +} + +func ExampleNewObserver() { + cluster := gocql.NewCluster("127.0.0.1:9042") + cluster.Keyspace = "my-keyspace" + + // Create a new regular gocql session + session, err := cluster.CreateSession() + if err != nil { + log.Fatal(err) + } + // Create a new observer using same set of options as gocqltrace.CreateTracedSession. + obs := gocqltrace.NewObserver(cluster, gocqltrace.WithServiceName("ServiceName")) + + // Attach the observer to queries / batches individually. + tracedQuery := session.Query("SELECT something FROM somewhere").Observer(obs) + untracedQuery := session.Query("SELECT something FROM somewhere") + + // Use context to pass information down the call chain + _, ctx := tracer.StartSpanFromContext(context.Background(), "parent.request", + tracer.SpanType(ext.SpanTypeCassandra), + tracer.ServiceName("web"), + tracer.ResourceName("/home"), + ) + tracedQuery.WithContext(ctx) + + // Finally, execute the query + if err := tracedQuery.Exec(); err != nil { + log.Fatal(err) + } + if err := untracedQuery.Exec(); err != nil { + log.Fatal(err) + } +} diff --git a/contrib/gocql/gocql/gocql.go b/contrib/gocql/gocql/gocql.go index 03a84c6002..058c557109 100644 --- a/contrib/gocql/gocql/gocql.go +++ b/contrib/gocql/gocql/gocql.go @@ -12,6 +12,7 @@ import ( "math" "strconv" "strings" + "time" "gopkg.in/DataDog/dd-trace-go.v1/ddtrace" "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext" @@ -30,6 +31,9 @@ func init() { } // ClusterConfig embeds gocql.ClusterConfig and keeps information relevant to tracing. +// +// Deprecated: use the Observer based method CreateTracedSession instead, which allows to use +// native gocql types instead of wrapped types. type ClusterConfig struct { *gocql.ClusterConfig hosts []string @@ -37,6 +41,9 @@ type ClusterConfig struct { } // NewCluster calls gocql.NewCluster and returns a wrapped instrumented version of it. +// +// Deprecated: use the Observer based method CreateTracedSession instead, which allows to use +// native gocql types instead of wrapped types. func NewCluster(hosts []string, opts ...WrapOption) *ClusterConfig { return &ClusterConfig{ ClusterConfig: gocql.NewCluster(hosts...), @@ -46,6 +53,9 @@ func NewCluster(hosts []string, opts ...WrapOption) *ClusterConfig { } // Session embeds gocql.Session and keeps information relevant to tracing. +// +// Deprecated: use the Observer based method CreateTracedSession instead, which allows to use +// native gocql types instead of wrapped types. type Session struct { *gocql.Session hosts []string @@ -66,10 +76,13 @@ func (c *ClusterConfig) CreateSession() (*Session, error) { } // Query inherits from gocql.Query, it keeps the tracer and the context. +// +// Deprecated: use the Observer based method CreateTracedSession instead, which allows to use +// native gocql types instead of wrapped types. type Query struct { *gocql.Query - *params - ctx context.Context + params params + ctx context.Context } // Query calls the underlying gocql.Session's Query method and returns a new Query augmented with tracing. @@ -79,13 +92,19 @@ func (s *Session) Query(stmt string, values ...interface{}) *Query { } // Batch inherits from gocql.Batch, it keeps the tracer and the context. +// +// Deprecated: use the Observer based method CreateTracedSession instead, which allows to use +// native gocql types instead of wrapped types. type Batch struct { *gocql.Batch - *params - ctx context.Context + params params + ctx context.Context } // NewBatch calls the underlying gocql.Session's NewBatch method and returns a new Batch augmented with tracing. +// +// Deprecated: use the Observer based method CreateTracedSession instead, which allows to use +// native gocql types instead of wrapped types. func (s *Session) NewBatch(typ gocql.BatchType) *Batch { b := s.Session.NewBatch(typ) return wrapBatch(b, s.hosts, s.opts...) @@ -93,10 +112,15 @@ func (s *Session) NewBatch(typ gocql.BatchType) *Batch { // params contains fields and metadata useful for command tracing type params struct { - config *queryConfig + config *config keyspace string paginated bool + skipPaginated bool clusterContactPoints string + consistency string + hostInfo *gocql.HostInfo + startTime time.Time + finishTime time.Time } // WrapQuery wraps a gocql.Query into a traced Query under the given service name. @@ -109,7 +133,8 @@ type params struct { // of `WithContext` and `PageState` but not that of `Consistency`, `Trace`, // `Observer`, etc. // -// Deprecated: initialize your ClusterConfig with NewCluster instead. +// Deprecated: use the Observer based method CreateTracedSession instead, which allows to use +// native gocql types instead of wrapped types. func WrapQuery(q *gocql.Query, opts ...WrapOption) *Query { return wrapQuery(q, nil, opts...) } @@ -124,7 +149,7 @@ func wrapQuery(q *gocql.Query, hosts []string, opts ...WrapOption) *Query { cfg.resourceName = parts[1] } } - p := ¶ms{config: cfg} + p := params{config: cfg} if len(hosts) > 0 { p.clusterContactPoints = strings.Join(hosts, ",") } @@ -155,43 +180,6 @@ func (tq *Query) PageState(state []byte) *Query { return tq } -// NewChildSpan creates a new span from the params and the context. -func (tq *Query) newChildSpan(ctx context.Context) ddtrace.Span { - p := tq.params - opts := []ddtrace.StartSpanOption{ - tracer.SpanType(ext.SpanTypeCassandra), - tracer.ServiceName(p.config.serviceName), - tracer.ResourceName(p.config.resourceName), - tracer.Tag(ext.CassandraPaginated, fmt.Sprintf("%t", p.paginated)), - tracer.Tag(ext.CassandraKeyspace, p.keyspace), - tracer.Tag(ext.Component, componentName), - tracer.Tag(ext.SpanKind, ext.SpanKindClient), - tracer.Tag(ext.DBSystem, ext.DBSystemCassandra), - } - if !math.IsNaN(p.config.analyticsRate) { - opts = append(opts, tracer.Tag(ext.EventSampleRate, p.config.analyticsRate)) - } - if tq.clusterContactPoints != "" { - opts = append(opts, tracer.Tag(ext.CassandraContactPoints, tq.clusterContactPoints)) - } - for k, v := range tq.config.customTags { - opts = append(opts, tracer.Tag(k, v)) - } - span, _ := tracer.StartSpanFromContext(ctx, p.config.querySpanName, opts...) - return span -} - -func (tq *Query) finishSpan(span ddtrace.Span, err error) { - if err != nil && tq.params.config.shouldIgnoreError(err) { - err = nil - } - if tq.params.config.noDebugStack { - span.Finish(tracer.WithError(err), tracer.NoDebugStack()) - } else { - span.Finish(tracer.WithError(err)) - } -} - // Exec is rewritten so that it passes by our custom Iter func (tq *Query) Exec() error { return tq.Iter().Close() @@ -199,37 +187,40 @@ func (tq *Query) Exec() error { // MapScan wraps in a span query.MapScan call. func (tq *Query) MapScan(m map[string]interface{}) error { - span := tq.newChildSpan(tq.ctx) + span := startQuerySpan(tq.ctx, tq.params) err := tq.Query.MapScan(m) - tq.finishSpan(span, err) + finishSpan(span, err, tq.params) return err } // MapScanCAS wraps in a span query.MapScanCAS call. func (tq *Query) MapScanCAS(m map[string]interface{}) (applied bool, err error) { - span := tq.newChildSpan(tq.ctx) + span := startQuerySpan(tq.ctx, tq.params) applied, err = tq.Query.MapScanCAS(m) - tq.finishSpan(span, err) + finishSpan(span, err, tq.params) return applied, err } // Scan wraps in a span query.Scan call. func (tq *Query) Scan(dest ...interface{}) error { - span := tq.newChildSpan(tq.ctx) + span := startQuerySpan(tq.ctx, tq.params) err := tq.Query.Scan(dest...) - tq.finishSpan(span, err) + finishSpan(span, err, tq.params) return err } // ScanCAS wraps in a span query.ScanCAS call. func (tq *Query) ScanCAS(dest ...interface{}) (applied bool, err error) { - span := tq.newChildSpan(tq.ctx) + span := startQuerySpan(tq.ctx, tq.params) applied, err = tq.Query.ScanCAS(dest...) - tq.finishSpan(span, err) + finishSpan(span, err, tq.params) return applied, err } // Iter inherits from gocql.Iter and contains a span. +// +// Deprecated: use the Observer based method CreateTracedSession instead, which allows to use +// native gocql types instead of wrapped types. type Iter struct { *gocql.Iter span ddtrace.Span @@ -237,7 +228,7 @@ type Iter struct { // Iter starts a new span at query.Iter call. func (tq *Query) Iter() *Iter { - span := tq.newChildSpan(tq.ctx) + span := startQuerySpan(tq.ctx, tq.params) iter := tq.Query.Iter() span.SetTag(ext.CassandraRowCount, strconv.Itoa(iter.NumRows())) span.SetTag(ext.CassandraConsistencyLevel, tq.GetConsistency().String()) @@ -253,7 +244,7 @@ func (tq *Query) Iter() *Iter { cluster := tIter.Iter.Host().ClusterName() dc := tIter.Iter.Host().DataCenter() - if tq.config.clusterTagLegacyMode { + if tq.params.config.clusterTagLegacyMode { tIter.span.SetTag(ext.CassandraCluster, dc) } else { tIter.span.SetTag(ext.CassandraCluster, cluster) @@ -273,7 +264,10 @@ func (tIter *Iter) Close() error { return err } -// Scanner inherits from a gocql.Scanner derived from an Iter +// Scanner inherits from a gocql.Scanner derived from an Iter. +// +// Deprecated: use the Observer based method CreateTracedSession instead, which allows to use +// native gocql types instead of wrapped types. type Scanner struct { gocql.Scanner span ddtrace.Span @@ -309,7 +303,8 @@ func (s *Scanner) Err() error { // of `WithContext` and `WithTimestamp` but not that of `SerialConsistency`, `Trace`, // `Observer`, etc. // -// Deprecated: initialize your ClusterConfig with NewCluster instead. +// Deprecated: use the Observer based method CreateTracedSession instead, which allows to use +// native gocql types instead of wrapped types. func WrapBatch(b *gocql.Batch, opts ...WrapOption) *Batch { return wrapBatch(b, nil, opts...) } @@ -319,7 +314,7 @@ func wrapBatch(b *gocql.Batch, hosts []string, opts ...WrapOption) *Batch { for _, fn := range opts { fn(cfg) } - p := ¶ms{config: cfg} + p := params{config: cfg} if len(hosts) > 0 { p.clusterContactPoints = strings.Join(hosts, ",") } @@ -354,45 +349,102 @@ func (tb *Batch) WithTimestamp(timestamp int64) *Batch { // ExecuteBatch calls session.ExecuteBatch on the Batch, tracing the execution. func (tb *Batch) ExecuteBatch(session *gocql.Session) error { - span := tb.newChildSpan(tb.ctx) + p := params{ + config: tb.params.config, + keyspace: tb.Batch.Keyspace(), + paginated: tb.params.paginated, + clusterContactPoints: tb.params.clusterContactPoints, + consistency: tb.Batch.GetConsistency().String(), + } + span := startBatchSpan(tb.ctx, p) err := session.ExecuteBatch(tb.Batch) - tb.finishSpan(span, err) + finishSpan(span, err, tb.params) return err } +func startQuerySpan(ctx context.Context, p params) ddtrace.Span { + opts := commonStartSpanOptions(p) + if p.keyspace != "" { + opts = append(opts, tracer.Tag(ext.CassandraKeyspace, p.keyspace)) + } + if !p.skipPaginated { + opts = append(opts, tracer.Tag(ext.CassandraPaginated, fmt.Sprintf("%t", p.paginated))) + } + for k, v := range p.config.customTags { + opts = append(opts, tracer.Tag(k, v)) + } + span, _ := tracer.StartSpanFromContext(ctx, p.config.querySpanName, opts...) + return span +} + // newChildSpan creates a new span from the params and the context. -func (tb *Batch) newChildSpan(ctx context.Context) ddtrace.Span { - p := tb.params +func startBatchSpan(ctx context.Context, p params) ddtrace.Span { + cfg := p.config + opts := commonStartSpanOptions(p) + if p.keyspace != "" { + opts = append(opts, tracer.Tag(ext.CassandraKeyspace, p.keyspace)) + } + if p.consistency != "" { + opts = append(opts, tracer.Tag(ext.CassandraConsistencyLevel, p.consistency)) + } + for k, v := range cfg.customTags { + opts = append(opts, tracer.Tag(k, v)) + } + span, _ := tracer.StartSpanFromContext(ctx, cfg.batchSpanName, opts...) + return span +} + +func commonStartSpanOptions(p params) []tracer.StartSpanOption { + cfg := p.config opts := []ddtrace.StartSpanOption{ tracer.SpanType(ext.SpanTypeCassandra), - tracer.ServiceName(p.config.serviceName), - tracer.ResourceName(p.config.resourceName), - tracer.Tag(ext.CassandraConsistencyLevel, tb.Cons.String()), - tracer.Tag(ext.CassandraKeyspace, tb.Keyspace()), + tracer.ServiceName(cfg.serviceName), tracer.Tag(ext.Component, componentName), tracer.Tag(ext.SpanKind, ext.SpanKindClient), tracer.Tag(ext.DBSystem, ext.DBSystemCassandra), } - if !math.IsNaN(p.config.analyticsRate) { - opts = append(opts, tracer.Tag(ext.EventSampleRate, p.config.analyticsRate)) + if p.config.resourceName != "" { + opts = append(opts, tracer.ResourceName(p.config.resourceName)) } - if tb.clusterContactPoints != "" { - opts = append(opts, tracer.Tag(ext.CassandraContactPoints, tb.clusterContactPoints)) + if !p.startTime.IsZero() { + opts = append(opts, tracer.StartTime(p.startTime)) } - for k, v := range tb.config.customTags { - opts = append(opts, tracer.Tag(k, v)) + if !math.IsNaN(cfg.analyticsRate) { + opts = append(opts, tracer.Tag(ext.EventSampleRate, cfg.analyticsRate)) } - span, _ := tracer.StartSpanFromContext(ctx, p.config.batchSpanName, opts...) - return span + if p.clusterContactPoints != "" { + opts = append(opts, tracer.Tag(ext.CassandraContactPoints, p.clusterContactPoints)) + } + if p.hostInfo != nil { + opts = append(opts, + tracer.Tag(ext.TargetHost, p.hostInfo.ConnectAddress().String()), + tracer.Tag(ext.TargetPort, strconv.Itoa(p.hostInfo.Port())), + ) + if p.hostInfo.HostID() != "" { + opts = append(opts, tracer.Tag(ext.CassandraHostID, p.hostInfo.HostID())) + } + if p.hostInfo.ClusterName() != "" { + opts = append(opts, tracer.Tag(ext.CassandraCluster, p.hostInfo.ClusterName())) + } + if p.hostInfo.DataCenter() != "" { + opts = append(opts, tracer.Tag(ext.CassandraDatacenter, p.hostInfo.DataCenter())) + } + } + return opts } -func (tb *Batch) finishSpan(span ddtrace.Span, err error) { - if err != nil && tb.params.config.shouldIgnoreError(err) { +func finishSpan(span ddtrace.Span, err error, p params) { + if err != nil && p.config.shouldIgnoreError(err) { err = nil } - if tb.params.config.noDebugStack { - span.Finish(tracer.WithError(err), tracer.NoDebugStack()) - } else { - span.Finish(tracer.WithError(err)) + opts := []ddtrace.FinishOption{ + tracer.WithError(err), + } + if !p.finishTime.IsZero() { + opts = append(opts, tracer.FinishTime(p.finishTime)) + } + if p.config.noDebugStack { + opts = append(opts, tracer.NoDebugStack()) } + span.Finish(opts...) } diff --git a/contrib/gocql/gocql/gocql_test.go b/contrib/gocql/gocql/gocql_test.go index 1ff7ea21ee..4b7e44c28b 100644 --- a/contrib/gocql/gocql/gocql_test.go +++ b/contrib/gocql/gocql/gocql_test.go @@ -26,7 +26,6 @@ import ( ) const ( - debug = false cassandraHost = "127.0.0.1:9042" ) @@ -43,12 +42,6 @@ func newTracedCassandraCluster(opts ...WrapOption) *ClusterConfig { } func updateTestClusterConfig(cfg *gocql.ClusterConfig) { - // the InitialHostLookup must be disabled in newer versions of - // gocql otherwise "no connections were made when creating the session" - // error is returned for Cassandra misconfiguration (that we don't need - // since we're testing another behavior and not the client). - // Check: https://github.com/gocql/gocql/issues/946 - cfg.DisableInitialHostLookup = true // the default timeouts (600ms) are sometimes too short in CI and cause // PRs being tested to flake due to this integration. cfg.ConnectTimeout = 2 * time.Second @@ -105,8 +98,8 @@ func TestErrorWrapper(t *testing.T) { if iter.Host() != nil { assert.Equal(span.Tag(ext.TargetPort), "9042") assert.Equal(span.Tag(ext.TargetHost), iter.Host().HostID()) - assert.Equal(span.Tag(ext.CassandraCluster), "Test Cluster") - assert.Equal(span.Tag(ext.CassandraDatacenter), "datacenter1") + assert.Equal(span.Tag(ext.CassandraCluster), "dd-trace-go-test-cluster") + assert.Equal(span.Tag(ext.CassandraDatacenter), "dd-trace-go-test-datacenter") } } @@ -153,8 +146,8 @@ func TestChildWrapperSpan(t *testing.T) { if iter.Host() != nil { assert.Equal(childSpan.Tag(ext.TargetPort), "9042") assert.Equal(childSpan.Tag(ext.TargetHost), iter.Host().HostID()) - assert.Equal(childSpan.Tag(ext.CassandraCluster), "Test Cluster") - assert.Equal(childSpan.Tag(ext.CassandraDatacenter), "datacenter1") + assert.Equal(childSpan.Tag(ext.CassandraCluster), "dd-trace-go-test-cluster") + assert.Equal(childSpan.Tag(ext.CassandraDatacenter), "dd-trace-go-test-datacenter") } } @@ -185,27 +178,27 @@ func TestCompatMode(t *testing.T) { { name: "== v1.65", gocqlCompat: "v1.65", - wantCluster: "datacenter1", + wantCluster: "dd-trace-go-test-datacenter", }, { name: "< v1.65", gocqlCompat: "v1.64", - wantCluster: "datacenter1", + wantCluster: "dd-trace-go-test-datacenter", }, { name: "> v1.65", gocqlCompat: "v1.66", - wantCluster: "Test Cluster", + wantCluster: "dd-trace-go-test-cluster", }, { name: "empty", gocqlCompat: "", - wantCluster: "Test Cluster", + wantCluster: "dd-trace-go-test-cluster", }, { name: "bad version", gocqlCompat: "bad-version", - wantCluster: "Test Cluster", + wantCluster: "dd-trace-go-test-cluster", }, } for _, tc := range testCases { @@ -216,7 +209,7 @@ func TestCompatMode(t *testing.T) { assert.Equal(t, s.Tag(ext.TargetPort), "9042") assert.NotEmpty(t, s.Tag(ext.TargetHost)) assert.Equal(t, tc.wantCluster, s.Tag(ext.CassandraCluster)) - assert.Equal(t, "datacenter1", s.Tag(ext.CassandraDatacenter)) + assert.Equal(t, "dd-trace-go-test-datacenter", s.Tag(ext.CassandraDatacenter)) }) } } diff --git a/contrib/gocql/gocql/observer.go b/contrib/gocql/gocql/observer.go new file mode 100644 index 0000000000..a75d4c8b16 --- /dev/null +++ b/contrib/gocql/gocql/observer.go @@ -0,0 +1,112 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2024 Datadog, Inc. + +package gocql + +import ( + "context" + "strings" + + "github.com/gocql/gocql" + + "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext" + "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" +) + +// CreateTracedSession returns a new session augmented with tracing. +func CreateTracedSession(cluster *gocql.ClusterConfig, opts ...WrapOption) (*gocql.Session, error) { + obs := NewObserver(cluster, opts...) + cfg := obs.cfg + + if cfg.traceQuery { + cluster.QueryObserver = obs + } + if cfg.traceBatch { + cluster.BatchObserver = obs + } + if cfg.traceConnect { + cluster.ConnectObserver = obs + } + return cluster.CreateSession() +} + +// NewObserver creates a new Observer to trace gocql. +// This method is useful in case you want to attach the observer to individual traces / batches instead of instrumenting +// the whole client. +func NewObserver(cluster *gocql.ClusterConfig, opts ...WrapOption) *Observer { + cfg := defaultConfig() + for _, fn := range opts { + fn(cfg) + } + return &Observer{ + cfg: cfg, + clusterContactPoints: strings.Join(cluster.Hosts, ","), + } +} + +var ( + _ gocql.QueryObserver = (*Observer)(nil) + _ gocql.BatchObserver = (*Observer)(nil) + _ gocql.ConnectObserver = (*Observer)(nil) +) + +// Observer implements gocql observer interfaces to support tracing. +type Observer struct { + cfg *config + clusterContactPoints string +} + +// ObserveQuery implements gocql.QueryObserver. +func (o *Observer) ObserveQuery(ctx context.Context, query gocql.ObservedQuery) { + p := params{ + config: o.cfg, + keyspace: query.Keyspace, + skipPaginated: true, + clusterContactPoints: o.clusterContactPoints, + hostInfo: query.Host, + startTime: query.Start, + finishTime: query.End, + } + span := startQuerySpan(ctx, p) + resource := o.cfg.resourceName + if resource == "" { + resource = query.Statement + } + span.SetTag(ext.ResourceName, resource) + span.SetTag(ext.CassandraRowCount, query.Rows) + finishSpan(span, query.Err, p) +} + +// ObserveBatch implements gocql.BatchObserver. +func (o *Observer) ObserveBatch(ctx context.Context, batch gocql.ObservedBatch) { + p := params{ + config: o.cfg, + keyspace: batch.Keyspace, + skipPaginated: true, + clusterContactPoints: o.clusterContactPoints, + hostInfo: batch.Host, + startTime: batch.Start, + finishTime: batch.End, + } + span := startBatchSpan(ctx, p) + finishSpan(span, batch.Err, p) +} + +// ObserveConnect implements gocql.ConnectObserver. +func (o *Observer) ObserveConnect(connect gocql.ObservedConnect) { + p := params{ + config: o.cfg, + clusterContactPoints: o.clusterContactPoints, + hostInfo: connect.Host, + startTime: connect.Start, + finishTime: connect.End, + } + opts := commonStartSpanOptions(p) + for k, v := range o.cfg.customTags { + opts = append(opts, tracer.Tag(k, v)) + } + span := tracer.StartSpan("cassandra.connect", opts...) + finishSpan(span, connect.Err, p) +} diff --git a/contrib/gocql/gocql/observer_test.go b/contrib/gocql/gocql/observer_test.go new file mode 100644 index 0000000000..148dd3d1bb --- /dev/null +++ b/contrib/gocql/gocql/observer_test.go @@ -0,0 +1,444 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2024 Datadog, Inc. + +package gocql + +import ( + "context" + "testing" + + "github.com/gocql/gocql" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext" + "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/mocktracer" + "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" +) + +func TestObserver_Query(t *testing.T) { + testCases := []struct { + name string + opts []WrapOption + updateQuery func(cluster *gocql.ClusterConfig, sess *gocql.Session, q *gocql.Query) *gocql.Query + wantServiceName string + wantResourceName string + wantRowCount int + wantErr bool + wantErrTag bool + }{ + { + name: "default", + opts: nil, + wantRowCount: 1, + }, + { + name: "service_and_resource_name", + opts: []WrapOption{ + WithServiceName("test-service"), + WithResourceName("test-resource"), + }, + wantRowCount: 1, + wantServiceName: "test-service", + wantResourceName: "test-resource", + }, + { + name: "error", + opts: nil, + updateQuery: func(_ *gocql.ClusterConfig, sess *gocql.Session, _ *gocql.Query) *gocql.Query { + stmt := "SELECT name, age FRM trace.person WHERE name = 'This does not exist'" + return sess.Query(stmt) + }, + wantServiceName: "", + wantResourceName: "SELECT name, age FRM trace.person WHERE name = 'This does not exist'", + wantRowCount: 0, + wantErr: true, + wantErrTag: true, + }, + { + name: "error_ignore", + opts: []WrapOption{ + WithErrorCheck(func(_ error) bool { + return false + }), + }, + updateQuery: func(_ *gocql.ClusterConfig, sess *gocql.Session, _ *gocql.Query) *gocql.Query { + stmt := "SELECT name, age FRM trace.person WHERE name = 'This does not exist'" + return sess.Query(stmt) + }, + wantServiceName: "", + wantResourceName: "SELECT name, age FRM trace.person WHERE name = 'This does not exist'", + wantRowCount: 0, + wantErr: true, + wantErrTag: false, + }, + { + name: "individual_query_trace", + opts: []WrapOption{ + WithTraceQuery(false), + }, + updateQuery: func(cluster *gocql.ClusterConfig, _ *gocql.Session, q *gocql.Query) *gocql.Query { + obs := NewObserver(cluster, WithResourceName("test resource"), WithServiceName("test service")) + return q.Observer(obs) + }, + wantServiceName: "test service", + wantResourceName: "test resource", + wantRowCount: 1, + wantErr: false, + wantErrTag: false, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + mt := mocktracer.Start() + defer mt.Stop() + + cluster := newCassandraCluster() + cluster.Hosts = []string{cassandraHost, "127.0.0.1:9043"} + cluster.Keyspace = "trace" + + opts := []WrapOption{ + WithTraceQuery(true), + WithTraceBatch(false), + WithTraceConnect(false), + } + opts = append(opts, tc.opts...) + sess, err := CreateTracedSession(cluster, opts...) + require.NoError(t, err) + + p, ctx := tracer.StartSpanFromContext(context.Background(), "parentSpan") + + stmt := "SELECT * FROM trace.person WHERE name = 'Cassandra'" + q := sess.Query(stmt) + if tc.updateQuery != nil { + q = tc.updateQuery(cluster, sess, q) + } + q = q.WithContext(ctx) + + err = q.Exec() + if tc.wantErr { + require.Error(t, err) + } else { + require.NoError(t, err) + } + + p.Finish() + + spans := mt.FinishedSpans() + require.Len(t, spans, 2) + + wantService := tc.wantServiceName + if wantService == "" { + wantService = "gocql.query" + } + wantResource := tc.wantResourceName + if wantResource == "" { + wantResource = stmt + } + wantRowCount := tc.wantRowCount + + parentSpan := spans[1] + querySpan := spans[0] + + assert.Equal(t, "parentSpan", parentSpan.OperationName()) + assert.Equal(t, querySpan.ParentID(), parentSpan.SpanID()) + + assertCommonTags(t, querySpan) + + assert.Equal(t, "cassandra.query", querySpan.OperationName()) + assert.Equal(t, wantResource, querySpan.Tag(ext.ResourceName)) + assert.Equal(t, wantService, querySpan.Tag(ext.ServiceName)) + assert.Equal(t, wantRowCount, querySpan.Tag(ext.CassandraRowCount)) + + if tc.wantErrTag { + assert.NotNil(t, querySpan.Tag(ext.Error)) + } else { + assert.Nil(t, querySpan.Tag(ext.Error)) + } + }) + } +} + +func TestObserver_Batch(t *testing.T) { + testCases := []struct { + name string + opts []WrapOption + updateBatch func(cluster *gocql.ClusterConfig, sess *gocql.Session, b *gocql.Batch) *gocql.Batch + wantServiceName string + wantResourceName string + wantErr bool + wantErrTag bool + }{ + { + name: "default", + opts: nil, + }, + { + name: "service_and_resource_name", + opts: []WrapOption{ + WithServiceName("test-service"), + WithResourceName("test-resource"), + }, + wantServiceName: "test-service", + wantResourceName: "test-resource", + }, + { + name: "error", + opts: nil, + updateBatch: func(_ *gocql.ClusterConfig, sess *gocql.Session, _ *gocql.Batch) *gocql.Batch { + stmt := "SELECT name, age FRM trace.person WHERE name = 'This does not exist'" + b := sess.NewBatch(gocql.UnloggedBatch) + b.Query(stmt) + return b + }, + wantServiceName: "", + wantResourceName: "", + wantErr: true, + wantErrTag: true, + }, + { + name: "error_ignore", + opts: []WrapOption{ + WithErrorCheck(func(_ error) bool { + return false + }), + }, + updateBatch: func(_ *gocql.ClusterConfig, sess *gocql.Session, _ *gocql.Batch) *gocql.Batch { + stmt := "SELECT name, age FRM trace.person WHERE name = 'This does not exist'" + b := sess.NewBatch(gocql.UnloggedBatch) + b.Query(stmt) + return b + }, + wantServiceName: "", + wantResourceName: "", + wantErr: true, + wantErrTag: false, + }, + { + name: "individual_batch_trace", + opts: []WrapOption{ + WithTraceBatch(false), + }, + updateBatch: func(cluster *gocql.ClusterConfig, _ *gocql.Session, b *gocql.Batch) *gocql.Batch { + obs := NewObserver(cluster, WithResourceName("test resource"), WithServiceName("test service")) + return b.Observer(obs) + }, + wantServiceName: "test service", + wantResourceName: "test resource", + wantErr: false, + wantErrTag: false, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + mt := mocktracer.Start() + defer mt.Stop() + + cluster := newCassandraCluster() + cluster.Hosts = []string{cassandraHost, "127.0.0.1:9043"} + cluster.Keyspace = "trace" + + opts := []WrapOption{ + WithTraceQuery(true), + WithTraceBatch(true), + WithTraceConnect(false), + } + opts = append(opts, tc.opts...) + sess, err := CreateTracedSession(cluster, opts...) + require.NoError(t, err) + + p, ctx := tracer.StartSpanFromContext(context.Background(), "parentSpan") + + stmt := "INSERT INTO trace.person (name, age, description) VALUES (?, ?, ?)" + b := sess.NewBatch(gocql.UnloggedBatch) + b.Query(stmt, "Kate", 80, "Cassandra's sister running in kubernetes") + b.Query(stmt, "Lucas", 60, "Another person") + + if tc.updateBatch != nil { + b = tc.updateBatch(cluster, sess, b) + } + b = b.WithContext(ctx) + + err = sess.ExecuteBatch(b) + if tc.wantErr { + require.Error(t, err) + } else { + require.NoError(t, err) + } + + p.Finish() + + spans := mt.FinishedSpans() + require.Len(t, spans, 2) + + wantService := tc.wantServiceName + if wantService == "" { + wantService = "gocql.query" + } + wantResource := tc.wantResourceName + if wantResource == "" { + wantResource = "cassandra.batch" + } + + parentSpan := spans[1] + batchSpan := spans[0] + + assert.Equal(t, "parentSpan", parentSpan.OperationName()) + assert.Equal(t, batchSpan.ParentID(), parentSpan.SpanID()) + + assertCommonTags(t, batchSpan) + + assert.Equal(t, "cassandra.batch", batchSpan.OperationName()) + assert.Equal(t, wantResource, batchSpan.Tag(ext.ResourceName)) + assert.Equal(t, wantService, batchSpan.Tag(ext.ServiceName)) + assert.Nil(t, batchSpan.Tag(ext.CassandraRowCount)) + + if tc.wantErrTag { + assert.NotNil(t, batchSpan.Tag(ext.Error)) + } else { + assert.Nil(t, batchSpan.Tag(ext.Error)) + } + }) + } +} + +func TestObserver_Connect(t *testing.T) { + testCases := []struct { + name string + opts []WrapOption + wantServiceName string + wantResourceName string + wantErr bool + wantErrTag bool + }{ + { + name: "default", + opts: nil, + }, + { + name: "service_and_resource_name", + opts: []WrapOption{ + WithServiceName("test-service"), + WithResourceName("test-resource"), + }, + wantServiceName: "test-service", + wantResourceName: "test-resource", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + mt := mocktracer.Start() + defer mt.Stop() + + cluster := newCassandraCluster() + cluster.Hosts = []string{cassandraHost, "127.0.0.1:9043"} + cluster.Keyspace = "trace" + + opts := []WrapOption{ + WithTraceQuery(false), + WithTraceBatch(false), + WithTraceConnect(true), + } + opts = append(opts, tc.opts...) + sess, err := CreateTracedSession(cluster, opts...) + require.NoError(t, err) + + err = sess.Query("SELECT * FROM trace.person WHERE name = 'Cassandra'").Exec() + require.NoError(t, err) + + wantService := tc.wantServiceName + if wantService == "" { + wantService = "gocql.query" + } + wantResource := tc.wantResourceName + if wantResource == "" { + wantResource = "cassandra.connect" + } + + spans := mt.FinishedSpans() + + var okSpans []mocktracer.Span + var okSpansHostInfo []mocktracer.Span + var errSpans []mocktracer.Span + + for _, span := range spans { + port := span.Tag(ext.TargetPort) + require.NotEmpty(t, port) + switch port { + case "9042": + okSpans = append(okSpans, span) + + case "9043": + errSpans = append(errSpans, span) + + default: + assert.FailNow(t, "unexpected port: "+port.(string)) + } + } + assert.NotEmpty(t, okSpans) + // the errSpans slice might be empty or not, so we don't assert any length to avoid flakiness. + + for _, span := range spans { + // this information should be present in all spans. + assert.Equal(t, "cassandra.connect", span.OperationName()) + assert.Equal(t, wantResource, span.Tag(ext.ResourceName)) + assert.Equal(t, wantService, span.Tag(ext.ServiceName)) + + assert.Equal(t, "gocql/gocql", span.Tag(ext.Component)) + assert.Equal(t, ext.SpanKindClient, span.Tag(ext.SpanKind)) + assert.Equal(t, "cassandra", span.Tag(ext.DBSystem)) + assert.Equal(t, "127.0.0.1:9042,127.0.0.1:9043", span.Tag(ext.CassandraContactPoints)) + assert.Equal(t, "127.0.0.1", span.Tag(ext.TargetHost)) + } + for _, span := range okSpans { + assert.Equal(t, "9042", span.Tag(ext.TargetPort)) + assert.Nil(t, span.Tag(ext.Error)) + + if span.Tag(ext.CassandraHostID) != nil { + okSpansHostInfo = append(okSpansHostInfo, span) + } + } + assert.NotEmpty(t, okSpansHostInfo, "should have found at least one non-error connect span with additional host info") + + for _, span := range okSpansHostInfo { + // this information is not present in all the spans for some reason. + assert.Equal(t, "dd-trace-go-test-cluster", span.Tag(ext.CassandraCluster)) + assert.Equal(t, "dd-trace-go-test-datacenter", span.Tag(ext.CassandraDatacenter)) + assert.NotEmpty(t, span.Tag(ext.CassandraHostID)) + } + for _, span := range errSpans { + assert.Equal(t, "9043", span.Tag(ext.TargetPort)) + assert.NotNil(t, span.Tag(ext.Error)) + + // since this node does not exist, this information should not be present. + assert.Nil(t, span.Tag(ext.CassandraCluster)) + assert.Nil(t, span.Tag(ext.CassandraDatacenter)) + assert.Nil(t, span.Tag(ext.CassandraHostID)) + } + }) + } +} + +func assertCommonTags(t *testing.T, span mocktracer.Span) { + t.Helper() + + assert.Equal(t, "trace", span.Tag(ext.CassandraKeyspace)) + assert.Equal(t, "gocql/gocql", span.Tag(ext.Component)) + assert.Equal(t, ext.SpanKindClient, span.Tag(ext.SpanKind)) + assert.Equal(t, "cassandra", span.Tag(ext.DBSystem)) + assert.Equal(t, "127.0.0.1:9042,127.0.0.1:9043", span.Tag(ext.CassandraContactPoints)) + assert.Equal(t, "9042", span.Tag(ext.TargetPort)) + assert.Equal(t, "127.0.0.1", span.Tag(ext.TargetHost)) + assert.Equal(t, "dd-trace-go-test-cluster", span.Tag(ext.CassandraCluster)) + assert.Equal(t, "dd-trace-go-test-datacenter", span.Tag(ext.CassandraDatacenter)) + assert.NotEmpty(t, span.Tag(ext.CassandraHostID)) + + // These tags can't be obtained with the Observer API. + assert.Nil(t, span.Tag(ext.CassandraPaginated)) + assert.Nil(t, span.Tag(ext.CassandraConsistencyLevel)) +} diff --git a/contrib/gocql/gocql/option.go b/contrib/gocql/gocql/option.go index c0df25d2b2..e6e70a75e4 100644 --- a/contrib/gocql/gocql/option.go +++ b/contrib/gocql/gocql/option.go @@ -18,21 +18,26 @@ import ( const defaultServiceName = "gocql.query" -type queryConfig struct { - serviceName, resourceName string - querySpanName, batchSpanName string - noDebugStack bool - analyticsRate float64 - errCheck func(err error) bool - customTags map[string]interface{} - clusterTagLegacyMode bool +type config struct { + serviceName, resourceName string + querySpanName, batchSpanName string + noDebugStack bool + analyticsRate float64 + errCheck func(err error) bool + customTags map[string]interface{} + clusterTagLegacyMode bool + traceQuery, traceBatch, traceConnect bool } // WrapOption represents an option that can be passed to WrapQuery. -type WrapOption func(*queryConfig) +type WrapOption func(*config) -func defaultConfig() *queryConfig { - cfg := &queryConfig{} +func defaultConfig() *config { + cfg := &config{ + traceQuery: true, + traceBatch: true, + traceConnect: true, + } cfg.serviceName = namingschema.ServiceNameOverrideV0(defaultServiceName, defaultServiceName) cfg.querySpanName = namingschema.OpName(namingschema.CassandraOutbound) cfg.batchSpanName = namingschema.OpNameOverrideV0(namingschema.CassandraOutbound, "cassandra.batch") @@ -55,7 +60,7 @@ func defaultConfig() *queryConfig { // WithServiceName sets the given service name for the returned query. func WithServiceName(name string) WrapOption { - return func(cfg *queryConfig) { + return func(cfg *config) { cfg.serviceName = name } } @@ -67,14 +72,14 @@ func WithServiceName(name string) WrapOption { // call, which can be costly when called repeatedly. Using WithResourceName will // avoid that call. Under normal circumstances, it is safe to rely on the default. func WithResourceName(name string) WrapOption { - return func(cfg *queryConfig) { + return func(cfg *config) { cfg.resourceName = name } } // WithAnalytics enables Trace Analytics for all started spans. func WithAnalytics(on bool) WrapOption { - return func(cfg *queryConfig) { + return func(cfg *config) { if on { cfg.analyticsRate = 1.0 } else { @@ -86,7 +91,7 @@ func WithAnalytics(on bool) WrapOption { // WithAnalyticsRate sets the sampling rate for Trace Analytics events // correlated to started spans. func WithAnalyticsRate(rate float64) WrapOption { - return func(cfg *queryConfig) { + return func(cfg *config) { if rate >= 0.0 && rate <= 1.0 { cfg.analyticsRate = rate } else { @@ -99,12 +104,12 @@ func WithAnalyticsRate(rate float64) WrapOption { // with an error. This is useful in situations where errors are frequent and // performance is critical. func NoDebugStack() WrapOption { - return func(cfg *queryConfig) { + return func(cfg *config) { cfg.noDebugStack = true } } -func (c *queryConfig) shouldIgnoreError(err error) bool { +func (c *config) shouldIgnoreError(err error) bool { return c != nil && c.errCheck != nil && !c.errCheck(err) } @@ -112,7 +117,7 @@ func (c *queryConfig) shouldIgnoreError(err error) bool { // error should be marked as an error. The fn is called whenever a CQL request // finishes with an error. func WithErrorCheck(fn func(err error) bool) WrapOption { - return func(cfg *queryConfig) { + return func(cfg *config) { // When the error is explicitly marked as not-an-error, that is // when this errCheck function returns false, the APM code will // just skip the error and pretend the span was successful. @@ -128,10 +133,34 @@ func WithErrorCheck(fn func(err error) bool) WrapOption { // WithCustomTag will attach the value to the span tagged by the key. func WithCustomTag(key string, value interface{}) WrapOption { - return func(cfg *queryConfig) { + return func(cfg *config) { if cfg.customTags == nil { cfg.customTags = make(map[string]interface{}) } cfg.customTags[key] = value } } + +// WithTraceQuery will enable tracing for queries (default is true). +// This option only takes effect in CreateTracedSession and NewObserver. +func WithTraceQuery(enabled bool) WrapOption { + return func(cfg *config) { + cfg.traceQuery = enabled + } +} + +// WithTraceBatch will enable tracing for batches (default is true). +// This option only takes effect in CreateTracedSession and NewObserver. +func WithTraceBatch(enabled bool) WrapOption { + return func(cfg *config) { + cfg.traceBatch = enabled + } +} + +// WithTraceConnect will enable tracing for connections (default is true). +// This option only takes effect in CreateTracedSession and NewObserver. +func WithTraceConnect(enabled bool) WrapOption { + return func(cfg *config) { + cfg.traceConnect = enabled + } +} diff --git a/contrib/google.golang.org/grpc.v12/example_test.go b/contrib/google.golang.org/grpc.v12/example_test.go deleted file mode 100644 index 677e897519..0000000000 --- a/contrib/google.golang.org/grpc.v12/example_test.go +++ /dev/null @@ -1,50 +0,0 @@ -// Unless explicitly stated otherwise all files in this repository are licensed -// under the Apache License Version 2.0. -// This product includes software developed at Datadog (https://www.datadoghq.com/). -// Copyright 2016 Datadog, Inc. - -package grpc_test - -import ( - "log" - "net" - - grpctrace "gopkg.in/DataDog/dd-trace-go.v1/contrib/google.golang.org/grpc.v12" - - "google.golang.org/grpc" -) - -func Example_client() { - // Create the client interceptor using the grpc trace package. - i := grpctrace.UnaryClientInterceptor(grpctrace.WithServiceName("my-grpc-client")) - - // Dial in using the created interceptor... - conn, err := grpc.Dial("localhost:50051", grpc.WithInsecure(), grpc.WithUnaryInterceptor(i)) - if err != nil { - log.Fatal(err) - } - defer conn.Close() - - // And continue using the connection as normal. -} - -func Example_server() { - // Create a listener for the server. - ln, err := net.Listen("tcp", ":50051") - if err != nil { - log.Fatal(err) - } - - // Create the unary server interceptor using the grpc trace package. - i := grpctrace.UnaryServerInterceptor(grpctrace.WithServiceName("my-grpc-client")) - - // Initialize the grpc server as normal, using the tracing interceptor. - s := grpc.NewServer(grpc.UnaryInterceptor(i)) - - // ... register your services - - // Start serving incoming connections. - if err := s.Serve(ln); err != nil { - log.Fatalf("failed to serve: %v", err) - } -} diff --git a/contrib/google.golang.org/grpc.v12/fixtures_test.pb.go b/contrib/google.golang.org/grpc.v12/fixtures_test.pb.go deleted file mode 100644 index 5567277baf..0000000000 --- a/contrib/google.golang.org/grpc.v12/fixtures_test.pb.go +++ /dev/null @@ -1,173 +0,0 @@ -// Unless explicitly stated otherwise all files in this repository are licensed -// under the Apache License Version 2.0. -// This product includes software developed at Datadog (https://www.datadoghq.com/). -// Copyright 2016 Datadog, Inc. - -// Code generated by protoc-gen-go. DO NOT EDIT. -// source: fixtures_test.proto - -/* -Package grpc is a generated protocol buffer package. - -It is generated from these files: - - fixtures_test.proto - -It has these top-level messages: - - FixtureRequest - FixtureReply -*/ -package grpc - -import ( - fmt "fmt" - - proto "github.com/golang/protobuf/proto" - - math "math" - - context "golang.org/x/net/context" - - grpc1 "google.golang.org/grpc" -) - -// Reference imports to suppress errors if they are not otherwise used. -var _ = proto.Marshal -var _ = fmt.Errorf -var _ = math.Inf - -// This is a compile-time assertion to ensure that this generated file -// is compatible with the proto package it is being compiled against. -// A compilation error at this line likely means your copy of the -// proto package needs to be updated. -const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package - -// The request message containing the user's name. -type FixtureRequest struct { - Name string `protobuf:"bytes,1,opt,name=name" json:"name,omitempty"` -} - -func (m *FixtureRequest) Reset() { *m = FixtureRequest{} } -func (m *FixtureRequest) String() string { return proto.CompactTextString(m) } -func (*FixtureRequest) ProtoMessage() {} -func (*FixtureRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} } - -func (m *FixtureRequest) GetName() string { - if m != nil { - return m.Name - } - return "" -} - -// The response message containing the greetings -type FixtureReply struct { - Message string `protobuf:"bytes,1,opt,name=message" json:"message,omitempty"` -} - -func (m *FixtureReply) Reset() { *m = FixtureReply{} } -func (m *FixtureReply) String() string { return proto.CompactTextString(m) } -func (*FixtureReply) ProtoMessage() {} -func (*FixtureReply) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} } - -func (m *FixtureReply) GetMessage() string { - if m != nil { - return m.Message - } - return "" -} - -func init() { - proto.RegisterType((*FixtureRequest)(nil), "grpc.FixtureRequest") - proto.RegisterType((*FixtureReply)(nil), "grpc.FixtureReply") -} - -// Reference imports to suppress errors if they are not otherwise used. -var _ context.Context -var _ grpc1.ClientConn - -// This is a compile-time assertion to ensure that this generated file -// is compatible with the grpc package it is being compiled against. -const _ = grpc1.SupportPackageIsVersion4 - -// Client API for Fixture service - -type FixtureClient interface { - Ping(ctx context.Context, in *FixtureRequest, opts ...grpc1.CallOption) (*FixtureReply, error) -} - -type fixtureClient struct { - cc *grpc1.ClientConn -} - -func NewFixtureClient(cc *grpc1.ClientConn) FixtureClient { - return &fixtureClient{cc} -} - -func (c *fixtureClient) Ping(ctx context.Context, in *FixtureRequest, opts ...grpc1.CallOption) (*FixtureReply, error) { - out := new(FixtureReply) - err := grpc1.Invoke(ctx, "/grpc.Fixture/Ping", in, out, c.cc, opts...) - if err != nil { - return nil, err - } - return out, nil -} - -// Server API for Fixture service - -type FixtureServer interface { - Ping(context.Context, *FixtureRequest) (*FixtureReply, error) -} - -func RegisterFixtureServer(s *grpc1.Server, srv FixtureServer) { - s.RegisterService(&_Fixture_serviceDesc, srv) -} - -func _Fixture_Ping_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc1.UnaryServerInterceptor) (interface{}, error) { - in := new(FixtureRequest) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(FixtureServer).Ping(ctx, in) - } - info := &grpc1.UnaryServerInfo{ - Server: srv, - FullMethod: "/grpc.Fixture/Ping", - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(FixtureServer).Ping(ctx, req.(*FixtureRequest)) - } - return interceptor(ctx, in, info, handler) -} - -var _Fixture_serviceDesc = grpc1.ServiceDesc{ - ServiceName: "grpc.Fixture", - HandlerType: (*FixtureServer)(nil), - Methods: []grpc1.MethodDesc{ - { - MethodName: "Ping", - Handler: _Fixture_Ping_Handler, - }, - }, - Streams: []grpc1.StreamDesc{}, - Metadata: "fixtures_test.proto", -} - -func init() { proto.RegisterFile("fixtures_test.proto", fileDescriptor0) } - -var fileDescriptor0 = []byte{ - // 177 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x12, 0x4e, 0xcb, 0xac, 0x28, - 0x29, 0x2d, 0x4a, 0x2d, 0x8e, 0x2f, 0x49, 0x2d, 0x2e, 0xd1, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, - 0x62, 0x49, 0x2f, 0x2a, 0x48, 0x56, 0x52, 0xe1, 0xe2, 0x73, 0x83, 0x48, 0x06, 0xa5, 0x16, 0x96, - 0xa6, 0x16, 0x97, 0x08, 0x09, 0x71, 0xb1, 0xe4, 0x25, 0xe6, 0xa6, 0x4a, 0x30, 0x2a, 0x30, 0x6a, - 0x70, 0x06, 0x81, 0xd9, 0x4a, 0x1a, 0x5c, 0x3c, 0x70, 0x55, 0x05, 0x39, 0x95, 0x42, 0x12, 0x5c, - 0xec, 0xb9, 0xa9, 0xc5, 0xc5, 0x89, 0xe9, 0x30, 0x65, 0x30, 0xae, 0x91, 0x2d, 0x17, 0x3b, 0x54, - 0xa5, 0x90, 0x11, 0x17, 0x4b, 0x40, 0x66, 0x5e, 0xba, 0x90, 0x88, 0x1e, 0xc8, 0x26, 0x3d, 0x54, - 0x6b, 0xa4, 0x84, 0xd0, 0x44, 0x0b, 0x72, 0x2a, 0x95, 0x18, 0x9c, 0x74, 0xb8, 0x24, 0x33, 0xf3, - 0x21, 0x32, 0xa9, 0x15, 0x89, 0xb9, 0x05, 0x39, 0xa9, 0xc5, 0x7a, 0x20, 0x37, 0x83, 0x44, 0x9c, - 0x78, 0x43, 0x52, 0x8b, 0x4b, 0xdc, 0x83, 0x02, 0x9c, 0x03, 0x40, 0x1e, 0x08, 0x60, 0x4c, 0x62, - 0x03, 0xfb, 0xc4, 0x18, 0x10, 0x00, 0x00, 0xff, 0xff, 0x18, 0x42, 0x90, 0x4d, 0xe0, 0x00, 0x00, - 0x00, -} diff --git a/contrib/google.golang.org/grpc.v12/fixtures_test.proto b/contrib/google.golang.org/grpc.v12/fixtures_test.proto deleted file mode 100644 index 15a8aa5cb3..0000000000 --- a/contrib/google.golang.org/grpc.v12/fixtures_test.proto +++ /dev/null @@ -1,21 +0,0 @@ -syntax = "proto3"; - -option java_multiple_files = true; -option java_package = "io.grpc.examples.testgrpc"; -option java_outer_classname = "TestGRPCProto"; - -package grpc; - -service Fixture { - rpc Ping (FixtureRequest) returns (FixtureReply) {} -} - -// The request message containing the user's name. -message FixtureRequest { - string name = 1; -} - -// The response message containing the greetings -message FixtureReply { - string message = 1; -} diff --git a/contrib/google.golang.org/grpc.v12/grpc.go b/contrib/google.golang.org/grpc.v12/grpc.go deleted file mode 100644 index ede5748332..0000000000 --- a/contrib/google.golang.org/grpc.v12/grpc.go +++ /dev/null @@ -1,133 +0,0 @@ -// Unless explicitly stated otherwise all files in this repository are licensed -// under the Apache License Version 2.0. -// This product includes software developed at Datadog (https://www.datadoghq.com/). -// Copyright 2016 Datadog, Inc. - -//go:generate protoc -I . fixtures_test.proto --go_out=plugins=grpc:. - -// Package grpc provides functions to trace the google.golang.org/grpc package v1.2. -package grpc // import "gopkg.in/DataDog/dd-trace-go.v1/contrib/google.golang.org/grpc.v12" - -import ( - "net" - "strings" - - "gopkg.in/DataDog/dd-trace-go.v1/contrib/google.golang.org/internal/grpcutil" - "gopkg.in/DataDog/dd-trace-go.v1/contrib/internal/options" - "gopkg.in/DataDog/dd-trace-go.v1/ddtrace" - "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext" - "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" - "gopkg.in/DataDog/dd-trace-go.v1/internal/globalconfig" - "gopkg.in/DataDog/dd-trace-go.v1/internal/log" - "gopkg.in/DataDog/dd-trace-go.v1/internal/telemetry" - - context "golang.org/x/net/context" - "google.golang.org/grpc" - "google.golang.org/grpc/metadata" - "google.golang.org/grpc/peer" -) - -const componentName = "google.golang.org/grpc.v12" - -func init() { - telemetry.LoadIntegration(componentName) - tracer.MarkIntegrationImported("google.golang.org/grpc/v12") -} - -// UnaryServerInterceptor will trace requests to the given grpc server. -func UnaryServerInterceptor(opts ...InterceptorOption) grpc.UnaryServerInterceptor { - cfg := new(interceptorConfig) - serverDefaults(cfg) - for _, fn := range opts { - fn(cfg) - } - if cfg.serviceName == "" { - cfg.serviceName = "grpc.server" - if svc := globalconfig.ServiceName(); svc != "" { - cfg.serviceName = svc - } - } - - log.Debug("contrib/google.golang.org/grpc.v12: Configuring UnaryServerInterceptor: %#v", cfg) - return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { - span, ctx := startServerSpanFromContext(ctx, info.FullMethod, cfg) - resp, err := handler(ctx, req) - span.Finish(tracer.WithError(err)) - return resp, err - } -} - -func startServerSpanFromContext(ctx context.Context, method string, cfg *interceptorConfig) (ddtrace.Span, context.Context) { - methodElements := strings.SplitN(strings.TrimPrefix(method, "/"), "/", 2) - extraOpts := []tracer.StartSpanOption{ - tracer.ServiceName(cfg.serviceName), - tracer.ResourceName(method), - tracer.Tag(tagMethod, method), - tracer.SpanType(ext.AppTypeRPC), - tracer.Measured(), - tracer.Tag(ext.Component, componentName), - tracer.Tag(ext.SpanKind, ext.SpanKindServer), - tracer.Tag(ext.RPCSystem, ext.RPCSystemGRPC), - tracer.Tag(ext.RPCService, methodElements[0]), - tracer.Tag(ext.GRPCFullMethod, method), - } - // copy opts in case the caller reuses the slice in parallel - // we will add the items in extraOpts - optsLocal := options.Copy(cfg.spanOpts...) - optsLocal = append(optsLocal, extraOpts...) - md, _ := metadata.FromIncomingContext(ctx) // nil is ok - if sctx, err := tracer.Extract(grpcutil.MDCarrier(md)); err == nil { - optsLocal = append(optsLocal, tracer.ChildOf(sctx)) - } - return tracer.StartSpanFromContext(ctx, cfg.spanName, optsLocal...) -} - -// UnaryClientInterceptor will add tracing to a grpc client. -func UnaryClientInterceptor(opts ...InterceptorOption) grpc.UnaryClientInterceptor { - cfg := new(interceptorConfig) - clientDefaults(cfg) - for _, fn := range opts { - fn(cfg) - } - log.Debug("contrib/google.golang.org/grpc.v12: Configuring UnaryClientInterceptor: %#v", cfg) - return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { - var ( - span ddtrace.Span - p peer.Peer - ) - methodElements := strings.Split(strings.TrimPrefix(method, "/"), "/") - spanopts := cfg.spanOpts - spanopts = append(spanopts, - tracer.ServiceName(cfg.serviceName), - tracer.Tag(tagMethod, method), - tracer.SpanType(ext.AppTypeRPC), - tracer.Tag(ext.Component, componentName), - tracer.Tag(ext.SpanKind, ext.SpanKindClient), - tracer.Tag(ext.RPCSystem, ext.RPCSystemGRPC), - tracer.Tag(ext.RPCService, methodElements[0]), - tracer.Tag(ext.GRPCFullMethod, method), - ) - span, ctx = tracer.StartSpanFromContext(ctx, cfg.spanName, spanopts...) - md, ok := metadata.FromIncomingContext(ctx) - if !ok { - md = metadata.MD{} - } - _ = tracer.Inject(span.Context(), grpcutil.MDCarrier(md)) - ctx = metadata.NewOutgoingContext(ctx, md) - opts = append(opts, grpc.Peer(&p)) - err := invoker(ctx, method, req, reply, cc, opts...) - if p.Addr != nil { - addr := p.Addr.String() - host, port, err := net.SplitHostPort(addr) - if err == nil { - if host != "" { - span.SetTag(ext.TargetHost, host) - } - span.SetTag(ext.TargetPort, port) - } - } - span.SetTag(tagCode, grpc.Code(err).String()) - span.Finish(tracer.WithError(err)) - return err - } -} diff --git a/contrib/google.golang.org/grpc.v12/grpc_test.go b/contrib/google.golang.org/grpc.v12/grpc_test.go deleted file mode 100644 index d67c917a3d..0000000000 --- a/contrib/google.golang.org/grpc.v12/grpc_test.go +++ /dev/null @@ -1,417 +0,0 @@ -// Unless explicitly stated otherwise all files in this repository are licensed -// under the Apache License Version 2.0. -// This product includes software developed at Datadog (https://www.datadoghq.com/). -// Copyright 2016 Datadog, Inc. - -package grpc - -import ( - "fmt" - "net" - "testing" - - "gopkg.in/DataDog/dd-trace-go.v1/contrib/internal/namingschematest" - "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext" - "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/mocktracer" - "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" - "gopkg.in/DataDog/dd-trace-go.v1/internal/globalconfig" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - context "golang.org/x/net/context" - "google.golang.org/grpc" - "google.golang.org/grpc/codes" -) - -func TestClient(t *testing.T) { - assert := assert.New(t) - mt := mocktracer.Start() - defer mt.Stop() - - rig, err := newRig(true, true) - if err != nil { - t.Fatalf("error setting up rig: %s", err) - } - defer rig.Close() - client := rig.client - - span, ctx := tracer.StartSpanFromContext(context.Background(), "a", tracer.ServiceName("b"), tracer.ResourceName("c")) - resp, err := client.Ping(ctx, &FixtureRequest{Name: "pass"}) - assert.Nil(err) - span.Finish() - assert.Equal(resp.Message, "passed") - - spans := mt.FinishedSpans() - assert.Len(spans, 3) - - var serverSpan, clientSpan, rootSpan mocktracer.Span - - for _, s := range spans { - // order of traces in buffer is not garanteed - switch s.OperationName() { - case "grpc.server": - serverSpan = s - case "grpc.client": - clientSpan = s - case "a": - rootSpan = s - } - } - - assert.NotNil(serverSpan) - assert.NotNil(clientSpan) - assert.NotNil(rootSpan) - - assert.Equal(clientSpan.Tag(ext.TargetHost), "127.0.0.1") - assert.Equal(clientSpan.Tag(ext.TargetPort), rig.port) - assert.Equal(clientSpan.Tag(tagCode), codes.OK.String()) - assert.Equal(clientSpan.TraceID(), rootSpan.TraceID()) - assert.Equal(clientSpan.Tag(ext.Component), "google.golang.org/grpc.v12") - assert.Equal(clientSpan.Tag(ext.SpanKind), ext.SpanKindClient) - assert.Equal("grpc", clientSpan.Tag(ext.RPCSystem)) - assert.Equal("grpc.Fixture", clientSpan.Tag(ext.RPCService)) - assert.Equal("/grpc.Fixture/Ping", clientSpan.Tag(ext.GRPCFullMethod)) - - assert.Equal(serverSpan.Tag(ext.ServiceName), "grpc") - assert.Equal(serverSpan.Tag(ext.ResourceName), "/grpc.Fixture/Ping") - assert.Equal(serverSpan.TraceID(), rootSpan.TraceID()) - assert.Equal(serverSpan.Tag(ext.Component), "google.golang.org/grpc.v12") - assert.Equal(serverSpan.Tag(ext.SpanKind), ext.SpanKindServer) - assert.Equal("grpc", serverSpan.Tag(ext.RPCSystem)) - assert.Equal("grpc.Fixture", serverSpan.Tag(ext.RPCService)) - assert.Equal("/grpc.Fixture/Ping", serverSpan.Tag(ext.GRPCFullMethod)) -} - -func TestChild(t *testing.T) { - assert := assert.New(t) - mt := mocktracer.Start() - defer mt.Stop() - - rig, err := newRig(true, false) - if err != nil { - t.Fatalf("error setting up rig: %s", err) - } - defer rig.Close() - - client := rig.client - resp, err := client.Ping(context.Background(), &FixtureRequest{Name: "child"}) - assert.Nil(err) - assert.Equal(resp.Message, "child") - - spans := mt.FinishedSpans() - assert.Len(spans, 2) - - var serverSpan, clientSpan mocktracer.Span - - for _, s := range spans { - // order of traces in buffer is not garanteed - switch s.OperationName() { - case "grpc.server": - serverSpan = s - case "child": - clientSpan = s - } - } - - assert.NotNil(clientSpan) - assert.Nil(clientSpan.Tag(ext.Error)) - assert.Equal(clientSpan.Tag(ext.ServiceName), "grpc") - assert.Equal(clientSpan.Tag(ext.ResourceName), "child") - assert.True(clientSpan.FinishTime().Sub(clientSpan.StartTime()) > 0) - - assert.NotNil(serverSpan) - assert.Nil(serverSpan.Tag(ext.Error)) - assert.Equal(serverSpan.Tag(ext.ServiceName), "grpc") - assert.Equal(serverSpan.Tag(ext.ResourceName), "/grpc.Fixture/Ping") - assert.True(serverSpan.FinishTime().Sub(serverSpan.StartTime()) > 0) - assert.Equal(serverSpan.Tag(ext.Component), "google.golang.org/grpc.v12") - assert.Equal(serverSpan.Tag(ext.SpanKind), ext.SpanKindServer) - assert.Equal("grpc", serverSpan.Tag(ext.RPCSystem)) - assert.Equal("grpc.Fixture", serverSpan.Tag(ext.RPCService)) - assert.Equal("/grpc.Fixture/Ping", serverSpan.Tag(ext.GRPCFullMethod)) -} - -func TestPass(t *testing.T) { - assert := assert.New(t) - mt := mocktracer.Start() - defer mt.Stop() - - rig, err := newRig(true, false) - if err != nil { - t.Fatalf("error setting up rig: %s", err) - } - defer rig.Close() - - client := rig.client - resp, err := client.Ping(context.Background(), &FixtureRequest{Name: "pass"}) - assert.Nil(err) - assert.Equal(resp.Message, "passed") - - spans := mt.FinishedSpans() - assert.Len(spans, 1) - - s := spans[0] - assert.Nil(s.Tag(ext.Error)) - assert.Equal(s.OperationName(), "grpc.server") - assert.Equal(s.Tag(ext.ServiceName), "grpc") - assert.Equal(s.Tag(ext.ResourceName), "/grpc.Fixture/Ping") - assert.Equal(s.Tag(ext.SpanType), ext.AppTypeRPC) - assert.True(s.FinishTime().Sub(s.StartTime()) > 0) - assert.Equal(s.Tag(ext.Component), "google.golang.org/grpc.v12") - assert.Equal(s.Tag(ext.SpanKind), ext.SpanKindServer) - assert.Equal(s.Tag(ext.RPCService), "grpc.Fixture") -} - -// fixtureServer a dummy implemenation of our grpc fixtureServer. -type fixtureServer struct{} - -func (s *fixtureServer) Ping(ctx context.Context, in *FixtureRequest) (*FixtureReply, error) { - switch { - case in.Name == "child": - span, _ := tracer.StartSpanFromContext(ctx, "child") - span.Finish() - return &FixtureReply{Message: "child"}, nil - case in.Name == "disabled": - if _, ok := tracer.SpanFromContext(ctx); ok { - panic("should be disabled") - } - return &FixtureReply{Message: "disabled"}, nil - } - return &FixtureReply{Message: "passed"}, nil -} - -// ensure it's a fixtureServer -var _ FixtureServer = &fixtureServer{} - -// rig contains all of the servers and connections we'd need for a -// grpc integration test -type rig struct { - server *grpc.Server - port string - listener net.Listener - conn *grpc.ClientConn - client FixtureClient -} - -func (r *rig) Close() { - r.server.Stop() - r.conn.Close() - r.listener.Close() -} - -func newRig(traceServer, traceClient bool) (*rig, error) { - return newRigWithOpts(traceServer, traceClient, WithServiceName("grpc")) -} - -func newRigWithOpts(traceServer, traceClient bool, iopts ...InterceptorOption) (*rig, error) { - var serverOpts []grpc.ServerOption - if traceServer { - serverOpts = append(serverOpts, grpc.UnaryInterceptor(UnaryServerInterceptor(iopts...))) - } - server := grpc.NewServer(serverOpts...) - - RegisterFixtureServer(server, new(fixtureServer)) - - li, err := net.Listen("tcp", "127.0.0.1:0") - if err != nil { - return nil, err - } - _, port, _ := net.SplitHostPort(li.Addr().String()) - // start our test fixtureServer. - go server.Serve(li) - - opts := []grpc.DialOption{grpc.WithInsecure()} - if traceClient { - opts = append(opts, grpc.WithUnaryInterceptor(UnaryClientInterceptor(iopts...))) - } - conn, err := grpc.Dial(li.Addr().String(), opts...) - if err != nil { - return nil, fmt.Errorf("error dialing: %s", err) - } - return &rig{ - listener: li, - port: port, - server: server, - conn: conn, - client: NewFixtureClient(conn), - }, err -} - -func TestAnalyticsSettings(t *testing.T) { - assertRate := func(t *testing.T, mt mocktracer.Tracer, rate interface{}, opts ...InterceptorOption) { - rig, err := newRigWithOpts(true, true, opts...) - if err != nil { - t.Fatalf("error setting up rig: %s", err) - } - defer rig.Close() - - client := rig.client - resp, err := client.Ping(context.Background(), &FixtureRequest{Name: "pass"}) - assert.Nil(t, err) - assert.Equal(t, resp.Message, "passed") - - spans := mt.FinishedSpans() - assert.Len(t, spans, 2) - - var serverSpan, clientSpan mocktracer.Span - - for _, s := range spans { - // order of traces in buffer is not garanteed - switch s.OperationName() { - case "grpc.server": - serverSpan = s - case "grpc.client": - clientSpan = s - } - } - - assert.Equal(t, rate, clientSpan.Tag(ext.EventSampleRate)) - assert.Equal(t, rate, serverSpan.Tag(ext.EventSampleRate)) - } - - t.Run("defaults", func(t *testing.T) { - mt := mocktracer.Start() - defer mt.Stop() - - assertRate(t, mt, nil) - }) - - t.Run("global", func(t *testing.T) { - t.Skip("global flag disabled") - mt := mocktracer.Start() - defer mt.Stop() - - rate := globalconfig.AnalyticsRate() - defer globalconfig.SetAnalyticsRate(rate) - globalconfig.SetAnalyticsRate(0.4) - - assertRate(t, mt, 0.4) - }) - - t.Run("enabled", func(t *testing.T) { - mt := mocktracer.Start() - defer mt.Stop() - - assertRate(t, mt, 1.0, WithAnalytics(true)) - }) - - t.Run("disabled", func(t *testing.T) { - mt := mocktracer.Start() - defer mt.Stop() - - assertRate(t, mt, nil, WithAnalytics(false)) - }) - - t.Run("override", func(t *testing.T) { - mt := mocktracer.Start() - defer mt.Stop() - - rate := globalconfig.AnalyticsRate() - defer globalconfig.SetAnalyticsRate(rate) - globalconfig.SetAnalyticsRate(0.4) - - assertRate(t, mt, 0.23, WithAnalyticsRate(0.23)) - }) - - t.Run("spanOpts", func(t *testing.T) { - mt := mocktracer.Start() - defer mt.Stop() - - assertRate(t, mt, 0.23, WithAnalyticsRate(0.33), WithSpanOptions(tracer.AnalyticsRate(0.23))) - }) -} - -func TestSpanOpts(t *testing.T) { - assert := assert.New(t) - mt := mocktracer.Start() - defer mt.Stop() - - rig, err := newRigWithOpts(true, true, WithSpanOptions(tracer.Tag("foo", "bar"))) - if err != nil { - t.Fatalf("error setting up rig: %s", err) - } - defer rig.Close() - client := rig.client - - resp, err := client.Ping(context.Background(), &FixtureRequest{Name: "pass"}) - assert.Nil(err) - assert.Equal(resp.Message, "passed") - - spans := mt.FinishedSpans() - assert.Len(spans, 2) - - for _, s := range spans { - assert.Equal(s.Tags()["foo"], "bar") - } -} - -func TestServerNamingSchema(t *testing.T) { - genSpans := namingschematest.GenSpansFn(func(t *testing.T, serviceOverride string) []mocktracer.Span { - var opts []InterceptorOption - if serviceOverride != "" { - opts = append(opts, WithServiceName(serviceOverride)) - } - mt := mocktracer.Start() - defer mt.Stop() - - rig, err := newRigWithOpts(true, false, opts...) - require.NoError(t, err) - defer rig.Close() - _, err = rig.client.Ping(context.Background(), &FixtureRequest{Name: "pass"}) - require.NoError(t, err) - - return mt.FinishedSpans() - }) - assertOpV0 := func(t *testing.T, spans []mocktracer.Span) { - require.Len(t, spans, 1) - assert.Equal(t, "grpc.server", spans[0].OperationName()) - } - assertOpV1 := func(t *testing.T, spans []mocktracer.Span) { - require.Len(t, spans, 1) - assert.Equal(t, "grpc.server.request", spans[0].OperationName()) - } - ddService := namingschematest.TestDDService - serviceOverride := namingschematest.TestServiceOverride - wantServiceNameV0 := namingschematest.ServiceNameAssertions{ - WithDefaults: []string{"grpc.server"}, - WithDDService: []string{ddService}, - WithDDServiceAndOverride: []string{serviceOverride}, - } - t.Run("ServiceName", namingschematest.NewServiceNameTest(genSpans, wantServiceNameV0)) - t.Run("SpanName", namingschematest.NewSpanNameTest(genSpans, assertOpV0, assertOpV1)) -} - -func TestClientNamingSchema(t *testing.T) { - genSpans := namingschematest.GenSpansFn(func(t *testing.T, serviceOverride string) []mocktracer.Span { - var opts []InterceptorOption - if serviceOverride != "" { - opts = append(opts, WithServiceName(serviceOverride)) - } - mt := mocktracer.Start() - defer mt.Stop() - - rig, err := newRigWithOpts(false, true, opts...) - require.NoError(t, err) - defer rig.Close() - _, err = rig.client.Ping(context.Background(), &FixtureRequest{Name: "pass"}) - require.NoError(t, err) - - return mt.FinishedSpans() - }) - assertOpV0 := func(t *testing.T, spans []mocktracer.Span) { - require.Len(t, spans, 1) - assert.Equal(t, "grpc.client", spans[0].OperationName()) - } - assertOpV1 := func(t *testing.T, spans []mocktracer.Span) { - require.Len(t, spans, 1) - assert.Equal(t, "grpc.client.request", spans[0].OperationName()) - } - serviceOverride := namingschematest.TestServiceOverride - wantServiceNameV0 := namingschematest.ServiceNameAssertions{ - WithDefaults: []string{"grpc.client"}, - WithDDService: []string{"grpc.client"}, - WithDDServiceAndOverride: []string{serviceOverride}, - } - t.Run("ServiceName", namingschematest.NewServiceNameTest(genSpans, wantServiceNameV0)) - t.Run("SpanName", namingschematest.NewSpanNameTest(genSpans, assertOpV0, assertOpV1)) -} diff --git a/contrib/google.golang.org/grpc.v12/option.go b/contrib/google.golang.org/grpc.v12/option.go deleted file mode 100644 index f367f38d21..0000000000 --- a/contrib/google.golang.org/grpc.v12/option.go +++ /dev/null @@ -1,82 +0,0 @@ -// Unless explicitly stated otherwise all files in this repository are licensed -// under the Apache License Version 2.0. -// This product includes software developed at Datadog (https://www.datadoghq.com/). -// Copyright 2016 Datadog, Inc. - -package grpc - -import ( - "gopkg.in/DataDog/dd-trace-go.v1/ddtrace" - "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" - "gopkg.in/DataDog/dd-trace-go.v1/internal" - "gopkg.in/DataDog/dd-trace-go.v1/internal/namingschema" -) - -const ( - defaultClientServiceName = "grpc.client" - defaultServerServiceName = "grpc.server" -) - -type interceptorConfig struct { - serviceName string - spanName string - spanOpts []ddtrace.StartSpanOption -} - -// InterceptorOption represents an option that can be passed to the grpc unary -// client and server interceptors. -type InterceptorOption func(*interceptorConfig) - -func defaults(cfg *interceptorConfig) { - // cfg.serviceName default set in interceptor - // cfg.spanOpts = append(cfg.spanOpts, tracer.AnalyticsRate(globalconfig.AnalyticsRate())) - if internal.BoolEnv("DD_TRACE_GRPC_ANALYTICS_ENABLED", false) { - cfg.spanOpts = append(cfg.spanOpts, tracer.AnalyticsRate(1.0)) - } -} - -func clientDefaults(cfg *interceptorConfig) { - cfg.serviceName = namingschema.ServiceNameOverrideV0(defaultClientServiceName, defaultClientServiceName) - cfg.spanName = namingschema.OpName(namingschema.GRPCClient) - defaults(cfg) -} - -func serverDefaults(cfg *interceptorConfig) { - cfg.serviceName = namingschema.ServiceName(defaultServerServiceName) - cfg.spanName = namingschema.OpName(namingschema.GRPCServer) - defaults(cfg) -} - -// WithServiceName sets the given service name for the intercepted client. -func WithServiceName(name string) InterceptorOption { - return func(cfg *interceptorConfig) { - cfg.serviceName = name - } -} - -// WithAnalytics enables Trace Analytics for all started spans. -func WithAnalytics(on bool) InterceptorOption { - return func(cfg *interceptorConfig) { - if on { - WithSpanOptions(tracer.AnalyticsRate(1.0))(cfg) - } - } -} - -// WithAnalyticsRate sets the sampling rate for Trace Analytics events -// correlated to started spans. -func WithAnalyticsRate(rate float64) InterceptorOption { - return func(cfg *interceptorConfig) { - if rate >= 0.0 && rate <= 1.0 { - WithSpanOptions(tracer.AnalyticsRate(rate))(cfg) - } - } -} - -// WithSpanOptions defines a set of additional ddtrace.StartSpanOption to be added -// to spans started by the integration. -func WithSpanOptions(opts ...ddtrace.StartSpanOption) InterceptorOption { - return func(cfg *interceptorConfig) { - cfg.spanOpts = append(cfg.spanOpts, opts...) - } -} diff --git a/contrib/google.golang.org/grpc.v12/tags.go b/contrib/google.golang.org/grpc.v12/tags.go deleted file mode 100644 index 6952e05ade..0000000000 --- a/contrib/google.golang.org/grpc.v12/tags.go +++ /dev/null @@ -1,12 +0,0 @@ -// Unless explicitly stated otherwise all files in this repository are licensed -// under the Apache License Version 2.0. -// This product includes software developed at Datadog (https://www.datadoghq.com/). -// Copyright 2016 Datadog, Inc. - -package grpc - -// Tags used for gRPC -const ( - tagMethod = "grpc.method" - tagCode = "grpc.code" -) diff --git a/contrib/internal/httptrace/config.go b/contrib/internal/httptrace/config.go index b545a447ce..691a529400 100644 --- a/contrib/internal/httptrace/config.go +++ b/contrib/internal/httptrace/config.go @@ -47,7 +47,7 @@ func newConfig() config { } else if r, err := regexp.Compile(s); err == nil { c.queryStringRegexp = r } else { - log.Debug("Could not compile regexp from %s. Using default regexp instead.", envQueryStringRegexp) + log.Error("Could not compile regexp from %s. Using default regexp instead.", envQueryStringRegexp) } return c } diff --git a/ddtrace/ext/db.go b/ddtrace/ext/db.go index f729f237b0..c9a046f86d 100644 --- a/ddtrace/ext/db.go +++ b/ddtrace/ext/db.go @@ -87,4 +87,7 @@ const ( // CassandraContactPoints holds the list of cassandra initial seed nodes used to discover the cluster. CassandraContactPoints = "db.cassandra.contact.points" + + // CassandraHostID represents the host ID for this operation. + CassandraHostID = "db.cassandra.host.id" ) diff --git a/ddtrace/tracer/option.go b/ddtrace/tracer/option.go index df84a0f8a9..9175c20efe 100644 --- a/ddtrace/tracer/option.go +++ b/ddtrace/tracer/option.go @@ -15,7 +15,6 @@ import ( "net/url" "os" "path/filepath" - "regexp" "runtime" "runtime/debug" "strconv" @@ -23,6 +22,7 @@ import ( "time" "golang.org/x/mod/semver" + "gopkg.in/DataDog/dd-trace-go.v1/ddtrace" "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext" "gopkg.in/DataDog/dd-trace-go.v1/internal" @@ -69,7 +69,6 @@ var contribIntegrations = map[string]struct { "github.com/gomodule/redigo": {"Redigo", false}, "google.golang.org/api": {"Google API", false}, "google.golang.org/grpc": {"gRPC", false}, - "google.golang.org/grpc/v12": {"gRPC v12", false}, "gopkg.in/jinzhu/gorm.v1": {"Gorm (gopkg)", false}, "github.com/gorilla/mux": {"Gorilla Mux", false}, "gorm.io/gorm.v1": {"Gorm v1", false}, @@ -98,6 +97,7 @@ var contribIntegrations = map[string]struct { "github.com/valyala/fasthttp": {"FastHTTP", false}, "github.com/zenazn/goji": {"Goji", false}, "log/slog": {"log/slog", false}, + "github.com/uptrace/bun": {"Bun", false}, } var ( @@ -696,23 +696,6 @@ func (c *config) loadContribIntegrations(deps []*debug.Module) { } for _, d := range deps { p := d.Path - // special use case, since gRPC does not update version number - if p == "google.golang.org/grpc" { - re := regexp.MustCompile(`v(\d.\d)\d*`) - match := re.FindStringSubmatch(d.Version) - if match == nil { - log.Warn("Unable to parse version of GRPC %v", d.Version) - continue - } - ver, err := strconv.ParseFloat(match[1], 32) - if err != nil { - log.Warn("Unable to parse version of GRPC %v as a float", d.Version) - continue - } - if ver <= 1.2 { - p = p + "/v12" - } - } s, ok := contribIntegrations[p] if !ok { continue diff --git a/ddtrace/tracer/option_test.go b/ddtrace/tracer/option_test.go index 26c5f80343..d4259e9f5c 100644 --- a/ddtrace/tracer/option_test.go +++ b/ddtrace/tracer/option_test.go @@ -303,39 +303,6 @@ func TestAgentIntegration(t *testing.T) { cfg.loadContribIntegrations(deps) assert.True(t, cfg.integrations["gRPC"].Available) assert.Equal(t, cfg.integrations["gRPC"].Version, "v1.520") - assert.False(t, cfg.integrations["gRPC v12"].Available) - }) - - t.Run("grpc v12", func(t *testing.T) { - cfg := newConfig() - defer clearIntegrationsForTests() - - d := debug.Module{ - Path: "google.golang.org/grpc", - Version: "v1.10", - } - - deps := []*debug.Module{&d} - cfg.loadContribIntegrations(deps) - assert.True(t, cfg.integrations["gRPC v12"].Available) - assert.Equal(t, cfg.integrations["gRPC v12"].Version, "v1.10") - assert.False(t, cfg.integrations["gRPC"].Available) - }) - - t.Run("grpc bad", func(t *testing.T) { - cfg := newConfig() - defer clearIntegrationsForTests() - - d := debug.Module{ - Path: "google.golang.org/grpc", - Version: "v10.10", - } - - deps := []*debug.Module{&d} - cfg.loadContribIntegrations(deps) - assert.False(t, cfg.integrations["gRPC v12"].Available) - assert.Equal(t, cfg.integrations["gRPC v12"].Version, "") - assert.False(t, cfg.integrations["gRPC"].Available) }) // ensure we clean up global state diff --git a/ddtrace/tracer/sampler_test.go b/ddtrace/tracer/sampler_test.go index b28a040037..117518b533 100644 --- a/ddtrace/tracer/sampler_test.go +++ b/ddtrace/tracer/sampler_test.go @@ -1637,3 +1637,87 @@ func TestSetGlobalSampleRate(t *testing.T) { assert.Equal(t, 0.0, rs.globalRate) assert.False(t, b) } + +func TestSampleTagsRootOnly(t *testing.T) { + assert := assert.New(t) + + t.Run("no-ctx-propagation", func(t *testing.T) { + Start(WithSamplingRules([]SamplingRule{ + TagsResourceRule(map[string]string{"tag": "20"}, "", "", "", 1), + TagsResourceRule(nil, "root", "", "", 0), + })) + tr := internal.GetGlobalTracer() + defer tr.Stop() + + root := tr.StartSpan("mysql.root", ResourceName("root")) + child := tr.StartSpan("mysql.child", ChildOf(root.Context())) + child.SetTag("tag", 20) + + // root span should be sampled with the second rule + // sampling decision is 0, thus "_dd.limit_psr" is not present + assert.Contains(root.(*span).Metrics, keyRulesSamplerAppliedRate) + assert.Equal(0., root.(*span).Metrics[keyRulesSamplerAppliedRate]) + assert.NotContains(root.(*span).Metrics, keyRulesSamplerLimiterRate) + + // neither"_dd.limit_psr", nor "_dd.rule_psr" should be present + // on the child span + assert.NotContains(child.(*span).Metrics, keyRulesSamplerAppliedRate) + assert.NotContains(child.(*span).Metrics, keyRulesSamplerLimiterRate) + + // setting this tag would change the result of sampling, + // which will occur after the span is finished + root.SetTag("tag", 20) + child.Finish() + + // first sampling rule is applied, the sampling decision is 1 + // and the "_dd.limit_psr" is present + root.Finish() + assert.Equal(1., root.(*span).Metrics[keyRulesSamplerAppliedRate]) + assert.Contains(root.(*span).Metrics, keyRulesSamplerLimiterRate) + + // neither"_dd.limit_psr", nor "_dd.rule_psr" should be present + // on the child span + assert.NotContains(child.(*span).Metrics, keyRulesSamplerAppliedRate) + assert.NotContains(child.(*span).Metrics, keyRulesSamplerLimiterRate) + }) + + t.Run("with-ctx-propagation", func(t *testing.T) { + Start(WithSamplingRules([]SamplingRule{ + TagsResourceRule(map[string]string{"tag": "20"}, "", "", "", 1), + TagsResourceRule(nil, "root", "", "", 0), + })) + tr := internal.GetGlobalTracer() + defer tr.Stop() + + root := tr.StartSpan("mysql.root", ResourceName("root")) + child := tr.StartSpan("mysql.child", ChildOf(root.Context())) + child.SetTag("tag", 20) + + // root span should be sampled with the second rule + // sampling decision is 0, thus "_dd.limit_psr" is not present + assert.Equal(0., root.(*span).Metrics[keyRulesSamplerAppliedRate]) + assert.Contains(root.(*span).Metrics, keyRulesSamplerAppliedRate) + assert.NotContains(root.(*span).Metrics, keyRulesSamplerLimiterRate) + + // neither"_dd.limit_psr", nor "_dd.rule_psr" should be present + // on the child span + assert.NotContains(child.(*span).Metrics, keyRulesSamplerAppliedRate) + assert.NotContains(child.(*span).Metrics, keyRulesSamplerLimiterRate) + + // context propagation locks the span, so no re-sampling should occur + tr.Inject(root.Context(), TextMapCarrier(map[string]string{})) + root.SetTag("tag", 20) + + child.Finish() + + // re-sampling should not occur + root.Finish() + assert.NotContains(child.(*span).Metrics, keyRulesSamplerAppliedRate) + assert.NotContains(root.(*span).Metrics, keyRulesSamplerLimiterRate) + + // neither"_dd.limit_psr", nor "_dd.rule_psr" should be present + // on the child span + assert.NotContains(child.(*span).Metrics, keyRulesSamplerAppliedRate) + assert.NotContains(child.(*span).Metrics, keyRulesSamplerLimiterRate) + }) +} diff --git a/ddtrace/tracer/span.go b/ddtrace/tracer/span.go index 323003e23a..e793c6caf7 100644 --- a/ddtrace/tracer/span.go +++ b/ddtrace/tracer/span.go @@ -500,9 +500,11 @@ func (s *span) Finish(opts ...ddtrace.FinishOption) { s.SetTag("go_execution_traced", "partial") } - if tr, ok := internal.GetGlobalTracer().(*tracer); ok && tr.rulesSampling.traces.enabled() { - if !s.context.trace.isLocked() && s.context.trace.propagatingTag(keyDecisionMaker) != "-4" { - tr.rulesSampling.SampleTrace(s) + if s.root() == s { + if tr, ok := internal.GetGlobalTracer().(*tracer); ok && tr.rulesSampling.traces.enabled() { + if !s.context.trace.isLocked() && s.context.trace.propagatingTag(keyDecisionMaker) != "-4" { + tr.rulesSampling.SampleTrace(s) + } } } @@ -600,13 +602,21 @@ func newAggregableSpan(s *span, obfuscator *obfuscate.Obfuscator) *aggregableSpa statusCode = uint32(c) } } + var isTraceRoot trilean + if s.ParentID == 0 { + isTraceRoot = trilean_true + } else { + isTraceRoot = trilean_false + } + key := aggregation{ - Name: s.Name, - Resource: obfuscatedResource(obfuscator, s.Type, s.Resource), - Service: s.Service, - Type: s.Type, - Synthetics: strings.HasPrefix(s.Meta[keyOrigin], "synthetics"), - StatusCode: statusCode, + Name: s.Name, + Resource: obfuscatedResource(obfuscator, s.Type, s.Resource), + Service: s.Service, + Type: s.Type, + Synthetics: strings.HasPrefix(s.Meta[keyOrigin], "synthetics"), + StatusCode: statusCode, + IsTraceRoot: isTraceRoot, } return &aggregableSpan{ key: key, diff --git a/ddtrace/tracer/span_test.go b/ddtrace/tracer/span_test.go index 95b1460ac0..a294b09af0 100644 --- a/ddtrace/tracer/span_test.go +++ b/ddtrace/tracer/span_test.go @@ -175,10 +175,11 @@ func TestNewAggregableSpan(t *testing.T) { Type: "sql", }, o) assert.Equal(t, aggregation{ - Name: "name", - Type: "sql", - Resource: "SELECT * FROM table WHERE password = ?", - Service: "service", + Name: "name", + Type: "sql", + Resource: "SELECT * FROM table WHERE password = ?", + Service: "service", + IsTraceRoot: 1, }, aggspan.key) }) @@ -190,10 +191,11 @@ func TestNewAggregableSpan(t *testing.T) { Type: "sql", }, nil) assert.Equal(t, aggregation{ - Name: "name", - Type: "sql", - Resource: "SELECT * FROM table WHERE password='secret'", - Service: "service", + Name: "name", + Type: "sql", + Resource: "SELECT * FROM table WHERE password='secret'", + Service: "service", + IsTraceRoot: 1, }, aggspan.key) }) } diff --git a/ddtrace/tracer/stats.go b/ddtrace/tracer/stats.go index 720a2a0230..5a3c1c2269 100644 --- a/ddtrace/tracer/stats.go +++ b/ddtrace/tracer/stats.go @@ -214,12 +214,13 @@ func (c *concentrator) flushAndSend(timenow time.Time, includeCurrent bool) { // aggregation specifies a uniquely identifiable key under which a certain set // of stats are grouped inside a bucket. type aggregation struct { - Name string - Type string - Resource string - Service string - StatusCode uint32 - Synthetics bool + Name string + Type string + Resource string + Service string + StatusCode uint32 + Synthetics bool + IsTraceRoot trilean } type rawBucket struct { @@ -278,6 +279,14 @@ func (sb *rawBucket) Export() statsBucket { return csb } +type trilean int32 + +const ( + trilean_not_set trilean = iota + trilean_true + trilean_false +) + type rawGroupedStats struct { hits uint64 topLevelHits uint64 @@ -285,6 +294,7 @@ type rawGroupedStats struct { duration uint64 okDistribution *ddsketch.DDSketch errDistribution *ddsketch.DDSketch + IsTraceRoot trilean } func newRawGroupedStats() *rawGroupedStats { @@ -335,6 +345,7 @@ func (s *rawGroupedStats) export(k aggregation) (groupedStats, error) { OkSummary: okSummary, ErrorSummary: errSummary, Synthetics: k.Synthetics, + IsTraceRoot: int32(k.IsTraceRoot), }, nil } diff --git a/ddtrace/tracer/stats_payload.go b/ddtrace/tracer/stats_payload.go index 35a68b46b9..3b77128b7a 100644 --- a/ddtrace/tracer/stats_payload.go +++ b/ddtrace/tracer/stats_payload.go @@ -53,4 +53,5 @@ type groupedStats struct { ErrorSummary []byte `json:"errorSummary,omitempty"` Synthetics bool `json:"synthetics,omitempty"` TopLevelHits uint64 `json:"topLevelHits,omitempty"` + IsTraceRoot int32 `json:"isTraceRoot,omitempty"` } diff --git a/ddtrace/tracer/textmap.go b/ddtrace/tracer/textmap.go index cd9b7860f8..1fee0ba689 100644 --- a/ddtrace/tracer/textmap.go +++ b/ddtrace/tracer/textmap.go @@ -507,7 +507,7 @@ func getDatadogPropagator(cp *chainedPropagator) *propagator { // if the reparenting ID is not set on the context, the span ID from datadog headers is used. func overrideDatadogParentID(ctx, w3cCtx, ddCtx *spanContext) { ctx.spanID = w3cCtx.spanID - if w3cCtx.reparentID != "" && w3cCtx.reparentID != "0000000000000000" { + if w3cCtx.reparentID != "" { ctx.reparentID = w3cCtx.reparentID } else if ddCtx != nil { // NIT: could be done without using fmt.Sprintf? Is it worth it? @@ -967,7 +967,7 @@ func composeTracestate(ctx *spanContext, priority int, oldState string) string { if !ctx.isRemote { b.WriteString(";p:") b.WriteString(spanIDHexEncoded(ctx.SpanID(), 16)) - } else if ctx.reparentID != "" && ctx.reparentID != "0000000000000000" { + } else if ctx.reparentID != "" { b.WriteString(";p:") b.WriteString(ctx.reparentID) } @@ -1202,10 +1202,6 @@ func parseTracestate(ctx *spanContext, header string) { setPropagatingTag(ctx, "_dd.p."+keySuffix, val) } } - // if dd list-member is present and last parent is not set, set it to zeros - if ctx.reparentID == "" { - ctx.reparentID = "0000000000000000" - } } } diff --git a/ddtrace/tracer/textmap_test.go b/ddtrace/tracer/textmap_test.go index 6147821fcf..9575ac226d 100644 --- a/ddtrace/tracer/textmap_test.go +++ b/ddtrace/tracer/textmap_test.go @@ -1756,7 +1756,7 @@ func TestEnvVars(t *testing.T) { }, out: []uint64{8687463697196027922, 1311768467284833366}, priority: 1, - lastParent: "0000000000000000", + lastParent: "", }, } for i, tc := range tests { @@ -1839,7 +1839,13 @@ func TestEnvVars(t *testing.T) { if tc.priority != 0 { sctx.setSamplingPriority(int(tc.priority), samplernames.Unknown) } - assert.Equal(s.(*span).Meta["_dd.parent_id"], tc.lastParent) + + if tc.lastParent == "" { + assert.Empty(s.(*span).Meta["_dd.parent_id"]) + } else { + assert.Equal(s.(*span).Meta["_dd.parent_id"], tc.lastParent) + } + assert.Equal(true, sctx.updated) headers := TextMapCarrier(map[string]string{}) diff --git a/ddtrace/tracer/tracer.go b/ddtrace/tracer/tracer.go index af794c7857..f0c74eecef 100644 --- a/ddtrace/tracer/tracer.go +++ b/ddtrace/tracer/tracer.go @@ -735,6 +735,9 @@ func (t *tracer) sample(span *span) { if t.rulesSampling.SampleTraceGlobalRate(span) { return } + if t.rulesSampling.SampleTrace(span) { + return + } t.prioritySampling.apply(span) } diff --git a/docker-compose.yaml b/docker-compose.yaml index 4548bea1c5..17788504ee 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -4,6 +4,9 @@ services: image: cassandra:3.11 environment: JVM_OPTS: "-Xms750m -Xmx750m" + CASSANDRA_CLUSTER_NAME: "dd-trace-go-test-cluster" + CASSANDRA_DC: "dd-trace-go-test-datacenter" + CASSANDRA_ENDPOINT_SNITCH: "GossipingPropertyFileSnitch" ports: - "9042:9042" mysql: @@ -97,9 +100,9 @@ services: TRACE_LANGUAGE: golang ENABLED_CHECKS: trace_stall,trace_count_header,trace_peer_service,trace_dd_service PORT: 9126 - DD_SUPPRESS_TRACE_PARSE_ERRORS: true - DD_POOL_TRACE_CHECK_FAILURES: true - DD_DISABLE_ERROR_RESPONSES: true + DD_SUPPRESS_TRACE_PARSE_ERRORS: "true" + DD_POOL_TRACE_CHECK_FAILURES: "true" + DD_DISABLE_ERROR_RESPONSES: "true" ports: - "127.0.0.1:9126:9126" mongodb: diff --git a/go.mod b/go.mod index 33809dd6cb..9aef54a508 100644 --- a/go.mod +++ b/go.mod @@ -48,9 +48,8 @@ require ( github.com/go-redis/redis/v7 v7.4.1 github.com/go-redis/redis/v8 v8.11.5 github.com/go-sql-driver/mysql v1.6.0 - github.com/gocql/gocql v0.0.0-20220224095938-0eacd3183625 + github.com/gocql/gocql v1.6.0 github.com/gofiber/fiber/v2 v2.52.5 - github.com/golang/protobuf v1.5.3 github.com/gomodule/redigo v1.8.9 github.com/google/pprof v0.0.0-20230817174616-7a8ec2ada47b github.com/google/uuid v1.5.0 @@ -97,7 +96,6 @@ require ( go.opentelemetry.io/otel/trace v1.20.0 go.uber.org/atomic v1.11.0 golang.org/x/mod v0.14.0 - golang.org/x/net v0.23.0 golang.org/x/oauth2 v0.9.0 golang.org/x/sys v0.20.0 golang.org/x/time v0.3.0 @@ -172,6 +170,7 @@ require ( github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9 // indirect github.com/golang-sql/sqlexp v0.1.0 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect + github.com/golang/protobuf v1.5.3 // indirect github.com/golang/snappy v0.0.4 // indirect github.com/google/go-cmp v0.6.0 // indirect github.com/google/gofuzz v1.2.0 // indirect @@ -260,6 +259,7 @@ require ( golang.org/x/arch v0.4.0 // indirect golang.org/x/crypto v0.21.0 // indirect golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect + golang.org/x/net v0.23.0 // indirect golang.org/x/sync v0.5.0 // indirect golang.org/x/term v0.18.0 // indirect golang.org/x/text v0.14.0 // indirect @@ -289,3 +289,15 @@ require ( sigs.k8s.io/structured-merge-diff/v4 v4.2.3 // indirect sigs.k8s.io/yaml v1.2.0 // indirect ) + +// Retract experimental versions +retract ( + v1.999.0-rc.8 + v1.999.0-rc.7 + v1.999.0-rc.6 + v1.999.0-rc.5 + v1.999.0-rc.4 + v1.999.0-rc.3 + v1.999.0-rc.2 + v1.999.0-rc.1 +) diff --git a/go.sum b/go.sum index 32f99bb03f..9c10be0aa2 100644 --- a/go.sum +++ b/go.sum @@ -1224,8 +1224,8 @@ github.com/go-test/deep v1.1.0/go.mod h1:5C2ZWiW0ErCdrYzpqxLbTX7MG14M9iiw8DgHncV github.com/goccy/go-json v0.9.11/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU= github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= -github.com/gocql/gocql v0.0.0-20220224095938-0eacd3183625 h1:6ImvI6U901e1ezn/8u2z3bh1DZIvMOia0yTSBxhy4Ao= -github.com/gocql/gocql v0.0.0-20220224095938-0eacd3183625/go.mod h1:3gM2c4D3AnkISwBxGnMMsS8Oy4y2lhbPRsH4xnJrHG8= +github.com/gocql/gocql v1.6.0 h1:IdFdOTbnpbd0pDhl4REKQDM+Q0SzKXQ1Yh+YZZ8T/qU= +github.com/gocql/gocql v1.6.0/go.mod h1:3gM2c4D3AnkISwBxGnMMsS8Oy4y2lhbPRsH4xnJrHG8= github.com/godbus/dbus v0.0.0-20151105175453-c7fdd8b5cd55/go.mod h1:/YcGZj5zSblfDWMMoOzV4fas9FZnQYTkDnsGvmh2Grw= github.com/godbus/dbus v0.0.0-20180201030542-885f9cc04c9c/go.mod h1:/YcGZj5zSblfDWMMoOzV4fas9FZnQYTkDnsGvmh2Grw= github.com/godbus/dbus v0.0.0-20190422162347-ade71ed3457e/go.mod h1:bBOAhwG1umN6/6ZUMtDFBMQR8jRg9O75tm9K00oMsK4= diff --git a/internal/appsec/dyngo/operation.go b/internal/appsec/dyngo/operation.go index 884a36ccaf..b5c0a16831 100644 --- a/internal/appsec/dyngo/operation.go +++ b/internal/appsec/dyngo/operation.go @@ -22,13 +22,18 @@ package dyngo import ( "context" - "gopkg.in/DataDog/dd-trace-go.v1/internal/orchestrion" "sync" + "sync/atomic" - "go.uber.org/atomic" - "gopkg.in/DataDog/dd-trace-go.v1/internal/log" + "gopkg.in/DataDog/dd-trace-go.v1/internal/orchestrion" ) +// LogError is the function used to log errors in the dyngo package. +// This is required because we really want to be able to log errors from dyngo +// but the log package depend on too much packages that we want to instrument. +// So we need to do this to avoid dependency cycles. +var LogError = func(string, ...any) {} + // Operation interface type allowing to register event listeners to the // operation. The event listeners will be automatically removed from the // operation once it finishes so that it no longer can be called on finished @@ -179,7 +184,7 @@ func StartAndRegisterOperation[O Operation, E ArgOf[O]](ctx context.Context, op // should call this function to ensure the operation is properly linked in the context tree. func RegisterOperation(ctx context.Context, op Operation) context.Context { op.unwrap().inContext = true - return context.WithValue(ctx, contextKey{}, op) + return orchestrion.CtxWithValue(ctx, contextKey{}, op) } // FinishOperation finishes the operation along with its results and emits a @@ -317,7 +322,7 @@ func (b *dataBroadcaster) clear() { func emitData[T any](b *dataBroadcaster, v T) { defer func() { if r := recover(); r != nil { - log.Error("appsec: recovered from an unexpected panic from an event listener: %+v", r) + LogError("appsec: recovered from an unexpected panic from an event listener: %+v", r) } }() b.mu.RLock() @@ -348,7 +353,7 @@ func (r *eventRegister) clear() { func emitEvent[O Operation, T any](r *eventRegister, op O, v T) { defer func() { if r := recover(); r != nil { - log.Error("appsec: recovered from an unexpected panic from an event listener: %+v", r) + LogError("appsec: recovered from an unexpected panic from an event listener: %+v", r) } }() r.mu.RLock() diff --git a/internal/appsec/emitter/ossec/lfi.go b/internal/appsec/emitter/ossec/lfi.go new file mode 100644 index 0000000000..b769d4c40b --- /dev/null +++ b/internal/appsec/emitter/ossec/lfi.go @@ -0,0 +1,41 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2024 Datadog, Inc. + +package ossec + +import ( + "io/fs" + + "gopkg.in/DataDog/dd-trace-go.v1/internal/appsec/dyngo" +) + +type ( + // OpenOperation type embodies any kind of function calls that will result in a call to an open(2) syscall + OpenOperation struct { + dyngo.Operation + blockErr error + } + + // OpenOperationArgs is the arguments for an open operation + OpenOperationArgs struct { + // Path is the path to the file to be opened + Path string + // Flags are the flags passed to the open(2) syscall + Flags int + // Perms are the permissions passed to the open(2) syscall if the creation of a file is required + Perms fs.FileMode + } + + // OpenOperationRes is the result of an open operation + OpenOperationRes[File any] struct { + // File is the file descriptor returned by the open(2) syscall + File *File + // Err is the error returned by the function + Err *error + } +) + +func (OpenOperationArgs) IsArgOf(*OpenOperation) {} +func (OpenOperationRes[File]) IsResultOf(*OpenOperation) {} diff --git a/internal/appsec/listener/grpcsec/grpc.go b/internal/appsec/listener/grpcsec/grpc.go index 279bdb870b..861021e86f 100644 --- a/internal/appsec/listener/grpcsec/grpc.go +++ b/internal/appsec/listener/grpcsec/grpc.go @@ -17,6 +17,7 @@ import ( "gopkg.in/DataDog/dd-trace-go.v1/internal/appsec/emitter/sharedsec" "gopkg.in/DataDog/dd-trace-go.v1/internal/appsec/listener" "gopkg.in/DataDog/dd-trace-go.v1/internal/appsec/listener/httpsec" + "gopkg.in/DataDog/dd-trace-go.v1/internal/appsec/listener/ossec" shared "gopkg.in/DataDog/dd-trace-go.v1/internal/appsec/listener/sharedsec" "gopkg.in/DataDog/dd-trace-go.v1/internal/appsec/listener/sqlsec" "gopkg.in/DataDog/dd-trace-go.v1/internal/log" @@ -41,6 +42,9 @@ var supportedAddresses = listener.AddressSet{ httpsec.HTTPClientIPAddr: {}, httpsec.UserIDAddr: {}, httpsec.ServerIoNetURLAddr: {}, + ossec.ServerIOFSFileAddr: {}, + sqlsec.ServerDBStatementAddr: {}, + sqlsec.ServerDBTypeAddr: {}, } // Install registers the gRPC WAF Event Listener on the given root operation. @@ -113,10 +117,14 @@ func (l *wafEventListener) onEvent(op *types.HandlerOperation, handlerArgs types return } - if l.isSecAddressListened(httpsec.ServerIoNetURLAddr) { + if httpsec.SSRFAddressesPresent(l.addresses) { httpsec.RegisterRoundTripperListener(op, &op.SecurityEventsHolder, wafCtx, l.limiter) } + if ossec.OSAddressesPresent(l.addresses) { + ossec.RegisterOpenListener(op, &op.SecurityEventsHolder, wafCtx, l.limiter) + } + if sqlsec.SQLAddressesPresent(l.addresses) { sqlsec.RegisterSQLListener(op, &op.SecurityEventsHolder, wafCtx, l.limiter) } diff --git a/internal/appsec/listener/httpsec/http.go b/internal/appsec/listener/httpsec/http.go index ec63e90e93..c458931336 100644 --- a/internal/appsec/listener/httpsec/http.go +++ b/internal/appsec/listener/httpsec/http.go @@ -16,6 +16,7 @@ import ( "gopkg.in/DataDog/dd-trace-go.v1/internal/appsec/emitter/httpsec/types" "gopkg.in/DataDog/dd-trace-go.v1/internal/appsec/emitter/sharedsec" "gopkg.in/DataDog/dd-trace-go.v1/internal/appsec/listener" + "gopkg.in/DataDog/dd-trace-go.v1/internal/appsec/listener/ossec" shared "gopkg.in/DataDog/dd-trace-go.v1/internal/appsec/listener/sharedsec" "gopkg.in/DataDog/dd-trace-go.v1/internal/appsec/listener/sqlsec" "gopkg.in/DataDog/dd-trace-go.v1/internal/log" @@ -55,6 +56,7 @@ var supportedAddresses = listener.AddressSet{ HTTPClientIPAddr: {}, UserIDAddr: {}, ServerIoNetURLAddr: {}, + ossec.ServerIOFSFileAddr: {}, sqlsec.ServerDBStatementAddr: {}, sqlsec.ServerDBTypeAddr: {}, } @@ -111,12 +113,16 @@ func (l *wafEventListener) onEvent(op *types.Operation, args types.HandlerOperat return } - if _, ok := l.addresses[ServerIoNetURLAddr]; ok { + if SSRFAddressesPresent(l.addresses) { dyngo.On(op, shared.MakeWAFRunListener(&op.SecurityEventsHolder, wafCtx, l.limiter, func(args types.RoundTripOperationArgs) waf.RunAddressData { return waf.RunAddressData{Ephemeral: map[string]any{ServerIoNetURLAddr: args.URL}} })) } + if ossec.OSAddressesPresent(l.addresses) { + ossec.RegisterOpenListener(op, &op.SecurityEventsHolder, wafCtx, l.limiter) + } + if sqlsec.SQLAddressesPresent(l.addresses) { sqlsec.RegisterSQLListener(op, &op.SecurityEventsHolder, wafCtx, l.limiter) } diff --git a/internal/appsec/listener/httpsec/roundtripper.go b/internal/appsec/listener/httpsec/roundtripper.go index 0d30e952f5..0d5102466a 100644 --- a/internal/appsec/listener/httpsec/roundtripper.go +++ b/internal/appsec/listener/httpsec/roundtripper.go @@ -8,6 +8,7 @@ package httpsec import ( "gopkg.in/DataDog/dd-trace-go.v1/internal/appsec/dyngo" "gopkg.in/DataDog/dd-trace-go.v1/internal/appsec/emitter/httpsec/types" + "gopkg.in/DataDog/dd-trace-go.v1/internal/appsec/listener" "gopkg.in/DataDog/dd-trace-go.v1/internal/appsec/listener/sharedsec" "gopkg.in/DataDog/dd-trace-go.v1/internal/appsec/trace" @@ -21,3 +22,8 @@ func RegisterRoundTripperListener(op dyngo.Operation, events *trace.SecurityEven return waf.RunAddressData{Ephemeral: map[string]any{ServerIoNetURLAddr: args.URL}} })) } + +func SSRFAddressesPresent(addresses listener.AddressSet) bool { + _, urlAddr := addresses[ServerIoNetURLAddr] + return urlAddr +} diff --git a/internal/appsec/listener/ossec/lfi.go b/internal/appsec/listener/ossec/lfi.go new file mode 100644 index 0000000000..12b32e1bf8 --- /dev/null +++ b/internal/appsec/listener/ossec/lfi.go @@ -0,0 +1,46 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2024 Datadog, Inc. + +package ossec + +import ( + "os" + + "github.com/DataDog/appsec-internal-go/limiter" + waf "github.com/DataDog/go-libddwaf/v3" + + "gopkg.in/DataDog/dd-trace-go.v1/appsec/events" + "gopkg.in/DataDog/dd-trace-go.v1/internal/appsec/dyngo" + "gopkg.in/DataDog/dd-trace-go.v1/internal/appsec/emitter/ossec" + "gopkg.in/DataDog/dd-trace-go.v1/internal/appsec/listener" + "gopkg.in/DataDog/dd-trace-go.v1/internal/appsec/listener/sharedsec" + "gopkg.in/DataDog/dd-trace-go.v1/internal/appsec/trace" +) + +const ( + ServerIOFSFileAddr = "server.io.fs.file" +) + +func RegisterOpenListener(op dyngo.Operation, eventsHolder *trace.SecurityEventsHolder, wafCtx *waf.Context, limiter limiter.Limiter) { + runWAF := sharedsec.MakeWAFRunListener(eventsHolder, wafCtx, limiter, func(args ossec.OpenOperationArgs) waf.RunAddressData { + return waf.RunAddressData{Ephemeral: map[string]any{ServerIOFSFileAddr: args.Path}} + }) + + dyngo.On(op, func(op *ossec.OpenOperation, args ossec.OpenOperationArgs) { + dyngo.OnData(op, func(e *events.BlockingSecurityEvent) { + dyngo.OnFinish(op, func(_ *ossec.OpenOperation, res ossec.OpenOperationRes[*os.File]) { + if res.Err != nil { + *res.Err = e + } + }) + }) + runWAF(op, args) + }) +} + +func OSAddressesPresent(addresses listener.AddressSet) bool { + _, fileAddr := addresses[ServerIOFSFileAddr] + return fileAddr +} diff --git a/internal/appsec/waf_test.go b/internal/appsec/waf_test.go index 972ec592e0..bc760ee20c 100644 --- a/internal/appsec/waf_test.go +++ b/internal/appsec/waf_test.go @@ -6,21 +6,25 @@ package appsec_test import ( + "context" "database/sql" "encoding/json" "fmt" "io" + "io/fs" "log" "math/rand" "net/http" "net/http/httptest" "net/url" "os" + "strconv" "strings" "testing" internal "github.com/DataDog/appsec-internal-go/appsec" waf "github.com/DataDog/go-libddwaf/v3" + pAppsec "gopkg.in/DataDog/dd-trace-go.v1/appsec" "gopkg.in/DataDog/dd-trace-go.v1/appsec/events" sqltrace "gopkg.in/DataDog/dd-trace-go.v1/contrib/database/sql" @@ -28,6 +32,8 @@ import ( "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/mocktracer" "gopkg.in/DataDog/dd-trace-go.v1/internal/appsec" "gopkg.in/DataDog/dd-trace-go.v1/internal/appsec/config" + "gopkg.in/DataDog/dd-trace-go.v1/internal/appsec/dyngo" + "gopkg.in/DataDog/dd-trace-go.v1/internal/appsec/emitter/ossec" "gopkg.in/DataDog/dd-trace-go.v1/internal/appsec/listener/httpsec" _ "github.com/glebarez/go-sqlite" @@ -642,7 +648,99 @@ func TestRASPSQLi(t *testing.T) { }) } } +} + +func TestRASPLFI(t *testing.T) { + t.Setenv("DD_APPSEC_RULES", "testdata/rasp.json") + appsec.Start() + defer appsec.Stop() + + if !appsec.RASPEnabled() { + t.Skip("RASP needs to be enabled for this test") + } + + // Simulate what orchestrion does + WrappedOpen := func(ctx context.Context, path string, flags int) (file *os.File, err error) { + parent, _ := dyngo.FromContext(ctx) + op := &ossec.OpenOperation{ + Operation: dyngo.NewOperation(parent), + } + + dyngo.StartOperation(op, ossec.OpenOperationArgs{ + Path: path, + Flags: flags, + Perms: fs.FileMode(0), + }) + + defer dyngo.FinishOperation(op, ossec.OpenOperationRes[*os.File]{ + File: &file, + Err: &err, + }) + + return + } + + mux := httptrace.NewServeMux() + mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + // Subsequent spans inherit their parent from context. + path := r.URL.Query().Get("path") + block := r.URL.Query().Get("block") + if block == "true" { + _, err := WrappedOpen(r.Context(), path, os.O_RDONLY) + require.ErrorIs(t, err, &events.BlockingSecurityEvent{}) + return + } + _, err := WrappedOpen(r.Context(), "/tmp/test", os.O_RDWR) + require.NoError(t, err) + w.WriteHeader(204) + }) + srv := httptest.NewServer(mux) + defer srv.Close() + + for _, tc := range []struct { + name string + path string + block bool + }{ + { + name: "no-error", + path: "", + block: false, + }, + { + name: "passwd", + path: "/etc/passwd", + block: true, + }, + { + name: "shadow", + path: "/etc/shadow", + block: true, + }, + } { + t.Run(tc.name, func(t *testing.T) { + mt := mocktracer.Start() + defer mt.Stop() + + req, err := http.NewRequest("GET", srv.URL+"?path="+tc.path+"&block="+strconv.FormatBool(tc.block), nil) + require.NoError(t, err) + res, err := srv.Client().Do(req) + require.NoError(t, err) + defer res.Body.Close() + + spans := mt.FinishedSpans() + require.Len(t, spans, 1) + + if tc.block { + require.Equal(t, 403, res.StatusCode) + require.Contains(t, spans[0].Tag("_dd.appsec.json"), "rasp-930-100") + require.Contains(t, spans[0].Tags(), "_dd.stack") + } else { + require.Equal(t, 204, res.StatusCode) + } + }) + } } // BenchmarkSampleWAFContext benchmarks the creation of a WAF context and running the WAF on a request/response pair diff --git a/internal/log/log.go b/internal/log/log.go index c32b1ed9ba..16be1ac7d0 100644 --- a/internal/log/log.go +++ b/internal/log/log.go @@ -15,6 +15,7 @@ import ( "sync" "time" + "gopkg.in/DataDog/dd-trace-go.v1/internal/appsec/dyngo" "gopkg.in/DataDog/dd-trace-go.v1/internal/version" ) @@ -102,6 +103,11 @@ func init() { if v := os.Getenv("DD_LOGGING_RATE"); v != "" { setLoggingRate(v) } + + // This is required because we really want to be able to log errors from dyngo + // but the log package depend on too much packages that we want to instrument. + // So we need to do this to avoid dependency cycles. + dyngo.LogError = Error } func setLoggingRate(v string) { diff --git a/internal/orchestrion/context_stack.go b/internal/orchestrion/context_stack.go index 6b4e408acc..60dd1edcac 100644 --- a/internal/orchestrion/context_stack.go +++ b/internal/orchestrion/context_stack.go @@ -17,14 +17,14 @@ func getDDContextStack() *contextStack { return gls.(*contextStack) } - newStack := new(contextStack) + newStack := &contextStack{} setDDGLS(newStack) return newStack } // Peek returns the top context from the stack without removing it. func (s *contextStack) Peek(key any) any { - if s == nil { + if s == nil || *s == nil { return nil } @@ -38,7 +38,7 @@ func (s *contextStack) Peek(key any) any { // Push adds a context to the stack. func (s *contextStack) Push(key, val any) { - if s == nil { + if s == nil || *s == nil { return } @@ -47,7 +47,7 @@ func (s *contextStack) Push(key, val any) { // Pop removes the top context from the stack and returns it. func (s *contextStack) Pop(key any) any { - if s == nil { + if s == nil || *s == nil { return nil } diff --git a/internal/stacktrace/stacktrace.go b/internal/stacktrace/stacktrace.go index 368e41356b..9b9ec8c525 100644 --- a/internal/stacktrace/stacktrace.go +++ b/internal/stacktrace/stacktrace.go @@ -32,7 +32,7 @@ var ( "github.com/DataDog/go-libddwaf", "github.com/DataDog/datadog-agent", "github.com/DataDog/appsec-internal-go", - "github.com/DataDog/orchestrion", + "github.com/datadog/orchestrion", } ) @@ -219,7 +219,7 @@ func (it *framesIterator) Next() (StackFrame, bool) { return StackFrame{}, false } - if it.skipSymbol(frame.Function) { + if it.skipFrame(frame) { continue } @@ -237,9 +237,13 @@ func (it *framesIterator) Next() (StackFrame, bool) { } } -func (it *framesIterator) skipSymbol(symbol string) bool { +func (it *framesIterator) skipFrame(frame runtime.Frame) bool { + if frame.File == "" { // skip orchestrion generated code + return true + } + for _, prefix := range it.skipPrefixes { - if strings.HasPrefix(symbol, prefix) { + if strings.HasPrefix(frame.Function, prefix) { return true } } diff --git a/test.sh b/test.sh index 2ecd68209b..5b63e08b55 100755 --- a/test.sh +++ b/test.sh @@ -111,6 +111,6 @@ if [[ "$contrib" != "" ]]; then sleep $sleeptime fi - PACKAGE_NAMES=$(go list ./contrib/... | grep -v -e grpc.v12 -e google.golang.org/api) + PACKAGE_NAMES=$(go list ./contrib/... | grep -v -e google.golang.org/api) nice -n20 gotestsum --junitfile ./gotestsum-report.xml -- -race -v -coverprofile=contrib_coverage.txt -covermode=atomic $PACKAGE_NAMES fi