Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v2: migrate contribs to new instrumentation api (2) #2816

Merged
merged 12 commits into from
Aug 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/unit-integration-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ jobs:
ports:
- 9042:9042
mysql:
image: circleci/mysql:5.7
image: cimg/mysql:8.0
env:
MYSQL_ROOT_PASSWORD: admin
MYSQL_PASSWORD: test
Expand All @@ -98,7 +98,7 @@ jobs:
ports:
- 3306:3306
postgres:
image: circleci/postgres:9.5
image: cimg/postgres:16.4
env:
POSTGRES_PASSWORD: postgres
POSTGRES_USER: postgres
Expand Down
2 changes: 1 addition & 1 deletion appsec/appsec.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ import (

"github.com/DataDog/dd-trace-go/v2/ddtrace/ext"
"github.com/DataDog/dd-trace-go/v2/ddtrace/tracer"
"github.com/DataDog/dd-trace-go/v2/instrumentation/appsec/emitter/httpsec"
"github.com/DataDog/dd-trace-go/v2/internal/appsec"
"github.com/DataDog/dd-trace-go/v2/internal/appsec/emitter/httpsec"
"github.com/DataDog/dd-trace-go/v2/internal/appsec/emitter/sharedsec"
"github.com/DataDog/dd-trace-go/v2/internal/log"
)
Expand Down
2 changes: 1 addition & 1 deletion contrib/99designs/gqlgen/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func (fn OptionFn) apply(cfg *config) {

func defaults(cfg *config) {
cfg.serviceName = instr.ServiceName(instrumentation.ComponentDefault, nil)
cfg.analyticsRate = instr.AnalyticsRate()
cfg.analyticsRate = instr.AnalyticsRate(false)
cfg.tags = make(map[string]interface{})
}

Expand Down
2 changes: 1 addition & 1 deletion contrib/aws/aws-sdk-go-v2/aws/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (fn OptionFn) apply(cfg *config) {
}

func defaults(cfg *config) {
cfg.analyticsRate = instr.AnalyticsRate()
cfg.analyticsRate = instr.AnalyticsRate(false)
}

// WithService sets the given service name for the dialled connection.
Expand Down
2 changes: 1 addition & 1 deletion contrib/aws/aws-sdk-go/aws/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (fn OptionFn) apply(cfg *config) {
}

func defaults(cfg *config) {
cfg.analyticsRate = instr.AnalyticsRate()
cfg.analyticsRate = instr.AnalyticsRate(false)
}

// WithService sets the given service name for the dialled connection.
Expand Down
2 changes: 1 addition & 1 deletion contrib/bradfitz/gomemcache/memcache/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (fn ClientOptionFn) apply(cfg *clientConfig) {
func defaults(cfg *clientConfig) {
cfg.serviceName = instr.ServiceName(instrumentation.ComponentDefault, nil)
cfg.operationName = instr.OperationName(instrumentation.ComponentDefault, nil)
cfg.analyticsRate = instr.AnalyticsRate()
cfg.analyticsRate = instr.AnalyticsRate(false)
}

// WithService sets the given service name for the dialled connection.
Expand Down
22 changes: 10 additions & 12 deletions contrib/confluentinc/confluent-kafka-go/kafka.v2/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,26 +11,24 @@ import (
"math"
"time"

"github.com/confluentinc/confluent-kafka-go/v2/kafka"

"github.com/DataDog/dd-trace-go/v2/datastreams"
"github.com/DataDog/dd-trace-go/v2/datastreams/options"
"github.com/DataDog/dd-trace-go/v2/ddtrace/ext"
"github.com/DataDog/dd-trace-go/v2/ddtrace/tracer"
"github.com/DataDog/dd-trace-go/v2/internal/log"
"github.com/DataDog/dd-trace-go/v2/internal/telemetry"

"github.com/confluentinc/confluent-kafka-go/v2/kafka"
"github.com/DataDog/dd-trace-go/v2/instrumentation"
)

const (
// make sure these 3 are updated to V2 for the V2 version.
componentName = "confluentinc/confluent-kafka-go/kafka.v2"
packageName = "contrib/confluentinc/confluent-kafka-go/kafka.v2"
integrationName = "github.com/confluentinc/confluent-kafka-go/v2"
componentName = instrumentation.PackageConfluentKafkaGoV2
pkgPath = "contrib/confluentinc/confluent-kafka-go/kafka"
)

var instr *instrumentation.Instrumentation

func init() {
telemetry.LoadIntegration(componentName)
tracer.MarkIntegrationImported(integrationName)
instr = instrumentation.Load(instrumentation.PackageConfluentKafkaGoV2)
}

// NewConsumer calls kafka.NewConsumer and wraps the resulting Consumer.
Expand Down Expand Up @@ -67,7 +65,7 @@ func WrapConsumer(c *kafka.Consumer, opts ...Option) *Consumer {
Consumer: c,
cfg: newConfig(opts...),
}
log.Debug("%s: Wrapping Consumer: %#v", packageName, wrapped.cfg)
instr.Logger().Debug("%s: Wrapping Consumer: %#v", pkgPath, wrapped.cfg)
wrapped.events = wrapped.traceEventsChannel(c.Events())
return wrapped
}
Expand Down Expand Up @@ -265,7 +263,7 @@ func WrapProducer(p *kafka.Producer, opts ...Option) *Producer {
events: p.Events(),
libraryVersion: version,
}
log.Debug("%s: Wrapping Producer: %#v", packageName, wrapped.cfg)
instr.Logger().Debug("%s: Wrapping Producer: %#v", pkgPath, wrapped.cfg)
wrapped.produceChannel = wrapped.traceProduceChannel(p.ProduceChannel())
if wrapped.cfg.dataStreamsEnabled {
wrapped.events = wrapped.traceEventsChannel(p.Events())
Expand Down
33 changes: 8 additions & 25 deletions contrib/confluentinc/confluent-kafka-go/kafka.v2/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,14 @@ import (
"testing"
"time"

"github.com/confluentinc/confluent-kafka-go/v2/kafka"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/DataDog/dd-trace-go/v2/datastreams"
"github.com/DataDog/dd-trace-go/v2/ddtrace/ext"
"github.com/DataDog/dd-trace-go/v2/ddtrace/mocktracer"
"github.com/DataDog/dd-trace-go/v2/ddtrace/tracer"
"github.com/DataDog/dd-trace-go/v2/internal/contrib/namingschematest"
internaldsm "github.com/DataDog/dd-trace-go/v2/internal/datastreams"

"github.com/confluentinc/confluent-kafka-go/v2/kafka"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

var (
Expand Down Expand Up @@ -92,7 +90,7 @@ func produceThenConsume(t *testing.T, consumerAction consumerActionFn, producerO

if c.cfg.dataStreamsEnabled {
backlogs := mt.SentDSMBacklogs()
toMap := func(b []internaldsm.Backlog) map[string]struct{} {
toMap := func(b []mocktracer.DSMBacklog) map[string]struct{} {
m := make(map[string]struct{})
for _, b := range backlogs {
m[strings.Join(b.Tags, "")] = struct{}{}
Expand Down Expand Up @@ -166,7 +164,7 @@ func TestConsumerChannel(t *testing.T) {
assert.Equal(t, float64(1), s.Tag(ext.MessagingKafkaPartition))
assert.Equal(t, 0.3, s.Tag(ext.EventSampleRate))
assert.Equal(t, strconv.Itoa(i+1), s.Tag("offset"))
assert.Equal(t, componentName, s.Tag(ext.Component))
assert.Equal(t, "confluentinc/confluent-kafka-go/kafka.v2", s.Tag(ext.Component))
assert.Equal(t, ext.SpanKindConsumer, s.Tag(ext.SpanKind))
assert.Equal(t, "kafka", s.Tag(ext.MessagingSystem))
}
Expand Down Expand Up @@ -237,7 +235,7 @@ func TestConsumerFunctional(t *testing.T) {
assert.Equal(t, 0.1, s0.Tag(ext.EventSampleRate))
assert.Equal(t, "queue", s0.Tag(ext.SpanType))
assert.Equal(t, float64(0), s0.Tag(ext.MessagingKafkaPartition))
assert.Equal(t, componentName, s0.Tag(ext.Component))
assert.Equal(t, "confluentinc/confluent-kafka-go/kafka.v2", s0.Tag(ext.Component))
assert.Equal(t, ext.SpanKindProducer, s0.Tag(ext.SpanKind))
assert.Equal(t, "kafka", s0.Tag(ext.MessagingSystem))
assert.Equal(t, "127.0.0.1", s0.Tag(ext.KafkaBootstrapServers))
Expand All @@ -249,7 +247,7 @@ func TestConsumerFunctional(t *testing.T) {
assert.Equal(t, nil, s1.Tag(ext.EventSampleRate))
assert.Equal(t, "queue", s1.Tag(ext.SpanType))
assert.Equal(t, float64(0), s1.Tag(ext.MessagingKafkaPartition))
assert.Equal(t, componentName, s1.Tag(ext.Component))
assert.Equal(t, "confluentinc/confluent-kafka-go/kafka.v2", s1.Tag(ext.Component))
assert.Equal(t, ext.SpanKindConsumer, s1.Tag(ext.SpanKind))
assert.Equal(t, "kafka", s1.Tag(ext.MessagingSystem))
assert.Equal(t, "127.0.0.1", s1.Tag(ext.KafkaBootstrapServers))
Expand Down Expand Up @@ -312,18 +310,3 @@ func TestCustomTags(t *testing.T) {
assert.Equal(t, "bar", s.Tag("foo"))
assert.Equal(t, "key1", s.Tag("key"))
}

func TestNamingSchema(t *testing.T) {
genSpans := func(t *testing.T, serviceOverride string) []*mocktracer.Span {
var opts []Option
if serviceOverride != "" {
opts = append(opts, WithService(serviceOverride))
}
consumerAction := consumerActionFn(func(c *Consumer) (*kafka.Message, error) {
return c.ReadMessage(3000 * time.Millisecond)
})
spans, _ := produceThenConsume(t, consumerAction, opts, opts)
return spans
}
namingschematest.NewKafkaTest(genSpans)(t)
}
22 changes: 8 additions & 14 deletions contrib/confluentinc/confluent-kafka-go/kafka.v2/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,10 @@ import (
"net"
"strings"

"github.com/DataDog/dd-trace-go/v2/internal"
"github.com/DataDog/dd-trace-go/v2/internal/namingschema"

"github.com/confluentinc/confluent-kafka-go/v2/kafka"
)

const defaultServiceName = "kafka"
"github.com/DataDog/dd-trace-go/v2/instrumentation"
)

type config struct {
ctx context.Context
Expand Down Expand Up @@ -48,17 +45,14 @@ func newConfig(opts ...Option) *config {
cfg := &config{
ctx: context.Background(),
// analyticsRate: globalconfig.AnalyticsRate(),
analyticsRate: math.NaN(),
}
cfg.dataStreamsEnabled = internal.BoolEnv("DD_DATA_STREAMS_ENABLED", false)
if internal.BoolEnv("DD_TRACE_KAFKA_ANALYTICS_ENABLED", false) {
cfg.analyticsRate = 1.0
analyticsRate: instr.AnalyticsRate(false),
}
cfg.dataStreamsEnabled = instr.DataStreamsEnabled()

cfg.consumerServiceName = namingschema.ServiceName(defaultServiceName)
cfg.producerServiceName = namingschema.ServiceNameOverrideV0(defaultServiceName, defaultServiceName)
cfg.consumerSpanName = namingschema.OpName(namingschema.KafkaInbound)
cfg.producerSpanName = namingschema.OpName(namingschema.KafkaOutbound)
cfg.consumerServiceName = instr.ServiceName(instrumentation.ComponentConsumer, nil)
cfg.producerServiceName = instr.ServiceName(instrumentation.ComponentProducer, nil)
cfg.consumerSpanName = instr.OperationName(instrumentation.ComponentConsumer, nil)
cfg.producerSpanName = instr.OperationName(instrumentation.ComponentProducer, nil)

for _, opt := range opts {
if opt == nil {
Expand Down
12 changes: 4 additions & 8 deletions contrib/confluentinc/confluent-kafka-go/kafka.v2/option_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ import (
"math"
"testing"

"github.com/DataDog/dd-trace-go/v2/internal/globalconfig"

"github.com/stretchr/testify/assert"

"github.com/DataDog/dd-trace-go/v2/instrumentation/testutils"
)

func TestDataStreamsActivation(t *testing.T) {
Expand Down Expand Up @@ -43,9 +43,7 @@ func TestAnalyticsSettings(t *testing.T) {

t.Run("global", func(t *testing.T) {
t.Skip("global flag disabled")
rate := globalconfig.AnalyticsRate()
defer globalconfig.SetAnalyticsRate(rate)
globalconfig.SetAnalyticsRate(0.4)
testutils.SetGlobalAnalyticsRate(t, 0.4)

cfg := newConfig()
assert.Equal(t, 0.4, cfg.analyticsRate)
Expand All @@ -57,9 +55,7 @@ func TestAnalyticsSettings(t *testing.T) {
})

t.Run("override", func(t *testing.T) {
rate := globalconfig.AnalyticsRate()
defer globalconfig.SetAnalyticsRate(rate)
globalconfig.SetAnalyticsRate(0.4)
testutils.SetGlobalAnalyticsRate(t, 0.4)

cfg := newConfig(WithAnalyticsRate(0.2))
assert.Equal(t, 0.2, cfg.analyticsRate)
Expand Down
19 changes: 8 additions & 11 deletions contrib/confluentinc/confluent-kafka-go/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,19 @@ import (
"github.com/DataDog/dd-trace-go/v2/datastreams/options"
"github.com/DataDog/dd-trace-go/v2/ddtrace/ext"
"github.com/DataDog/dd-trace-go/v2/ddtrace/tracer"
"github.com/DataDog/dd-trace-go/v2/internal/log"
"github.com/DataDog/dd-trace-go/v2/internal/telemetry"

"github.com/DataDog/dd-trace-go/v2/instrumentation"
"github.com/confluentinc/confluent-kafka-go/kafka"
)

const (
// make sure these 3 are updated to V2 for the V2 version.
componentName = "confluentinc/confluent-kafka-go/kafka"
packageName = "contrib/confluentinc/confluent-kafka-go/kafka"
integrationName = "github.com/confluentinc/confluent-kafka-go"
componentName = instrumentation.PackageConfluentKafkaGo
pkgPath = "contrib/confluentinc/confluent-kafka-go/kafka"
)

var instr *instrumentation.Instrumentation

func init() {
telemetry.LoadIntegration(componentName)
tracer.MarkIntegrationImported(integrationName)
instr = instrumentation.Load(instrumentation.PackageConfluentKafkaGo)
}

// NewConsumer calls kafka.NewConsumer and wraps the resulting Consumer.
Expand Down Expand Up @@ -67,7 +64,7 @@ func WrapConsumer(c *kafka.Consumer, opts ...Option) *Consumer {
Consumer: c,
cfg: newConfig(opts...),
}
log.Debug("%s: Wrapping Consumer: %#v", packageName, wrapped.cfg)
instr.Logger().Debug("%s: Wrapping Consumer: %#v", pkgPath, wrapped.cfg)
wrapped.events = wrapped.traceEventsChannel(c.Events())
return wrapped
}
Expand Down Expand Up @@ -265,7 +262,7 @@ func WrapProducer(p *kafka.Producer, opts ...Option) *Producer {
events: p.Events(),
libraryVersion: version,
}
log.Debug("%s: Wrapping Producer: %#v", packageName, wrapped.cfg)
instr.Logger().Debug("%s: Wrapping Producer: %#v", pkgPath, wrapped.cfg)
wrapped.produceChannel = wrapped.traceProduceChannel(p.ProduceChannel())
if wrapped.cfg.dataStreamsEnabled {
wrapped.events = wrapped.traceEventsChannel(p.Events())
Expand Down
26 changes: 4 additions & 22 deletions contrib/confluentinc/confluent-kafka-go/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@ import (
"github.com/DataDog/dd-trace-go/v2/ddtrace/ext"
"github.com/DataDog/dd-trace-go/v2/ddtrace/mocktracer"
"github.com/DataDog/dd-trace-go/v2/ddtrace/tracer"
"github.com/DataDog/dd-trace-go/v2/internal/contrib/namingschematest"
internaldsm "github.com/DataDog/dd-trace-go/v2/internal/datastreams"

"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -92,7 +89,7 @@ func produceThenConsume(t *testing.T, consumerAction consumerActionFn, producerO

if c.cfg.dataStreamsEnabled {
backlogs := mt.SentDSMBacklogs()
toMap := func(b []internaldsm.Backlog) map[string]struct{} {
toMap := func(b []mocktracer.DSMBacklog) map[string]struct{} {
m := make(map[string]struct{})
for _, b := range backlogs {
m[strings.Join(b.Tags, "")] = struct{}{}
Expand Down Expand Up @@ -166,7 +163,7 @@ func TestConsumerChannel(t *testing.T) {
assert.Equal(t, float64(1), s.Tag(ext.MessagingKafkaPartition))
assert.Equal(t, 0.3, s.Tag(ext.EventSampleRate))
assert.Equal(t, strconv.Itoa(i+1), s.Tag("offset"))
assert.Equal(t, componentName, s.Tag(ext.Component))
assert.Equal(t, "confluentinc/confluent-kafka-go/kafka", s.Tag(ext.Component))
assert.Equal(t, ext.SpanKindConsumer, s.Tag(ext.SpanKind))
assert.Equal(t, "kafka", s.Tag(ext.MessagingSystem))
}
Expand Down Expand Up @@ -237,7 +234,7 @@ func TestConsumerFunctional(t *testing.T) {
assert.Equal(t, 0.1, s0.Tag(ext.EventSampleRate))
assert.Equal(t, "queue", s0.Tag(ext.SpanType))
assert.Equal(t, float64(0), s0.Tag(ext.MessagingKafkaPartition))
assert.Equal(t, componentName, s0.Tag(ext.Component))
assert.Equal(t, "confluentinc/confluent-kafka-go/kafka", s0.Tag(ext.Component))
assert.Equal(t, ext.SpanKindProducer, s0.Tag(ext.SpanKind))
assert.Equal(t, "kafka", s0.Tag(ext.MessagingSystem))
assert.Equal(t, "127.0.0.1", s0.Tag(ext.KafkaBootstrapServers))
Expand All @@ -249,7 +246,7 @@ func TestConsumerFunctional(t *testing.T) {
assert.Equal(t, nil, s1.Tag(ext.EventSampleRate))
assert.Equal(t, "queue", s1.Tag(ext.SpanType))
assert.Equal(t, float64(0), s1.Tag(ext.MessagingKafkaPartition))
assert.Equal(t, componentName, s1.Tag(ext.Component))
assert.Equal(t, "confluentinc/confluent-kafka-go/kafka", s1.Tag(ext.Component))
assert.Equal(t, ext.SpanKindConsumer, s1.Tag(ext.SpanKind))
assert.Equal(t, "kafka", s1.Tag(ext.MessagingSystem))
assert.Equal(t, "127.0.0.1", s1.Tag(ext.KafkaBootstrapServers))
Expand Down Expand Up @@ -312,18 +309,3 @@ func TestCustomTags(t *testing.T) {
assert.Equal(t, "bar", s.Tag("foo"))
assert.Equal(t, "key1", s.Tag("key"))
}

func TestNamingSchema(t *testing.T) {
genSpans := func(t *testing.T, serviceOverride string) []*mocktracer.Span {
var opts []Option
if serviceOverride != "" {
opts = append(opts, WithService(serviceOverride))
}
consumerAction := consumerActionFn(func(c *Consumer) (*kafka.Message, error) {
return c.ReadMessage(3000 * time.Millisecond)
})
spans, _ := produceThenConsume(t, consumerAction, opts, opts)
return spans
}
namingschematest.NewKafkaTest(genSpans)(t)
}
Loading
Loading