Skip to content

Commit

Permalink
feat(contrib/confluentinc/confluent-kafka-go/kafka): evolve Producer …
Browse files Browse the repository at this point in the history
…instrumentation
  • Loading branch information
darccio committed Oct 1, 2024
1 parent b5ed714 commit 79dcb90
Show file tree
Hide file tree
Showing 6 changed files with 126 additions and 104 deletions.
54 changes: 54 additions & 0 deletions contrib/confluentinc/confluent-kafka-go/kafka/internal/option.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2024 Datadog, Inc.

package internal

import (
"context"
"math"

"github.com/confluentinc/confluent-kafka-go/kafka"
"gopkg.in/DataDog/dd-trace-go.v1/internal"
"gopkg.in/DataDog/dd-trace-go.v1/internal/namingschema"
)

const defaultServiceName = "kafka"

type Config struct {
Ctx context.Context
ConsumerServiceName string
ProducerServiceName string
ConsumerSpanName string
ProducerSpanName string
AnalyticsRate float64
BootstrapServers string
GroupID string
TagFns map[string]func(msg *kafka.Message) interface{}
DataStreamsEnabled bool
}

type Option func(cfg *Config)

func NewConfig(opts ...Option) *Config {
cfg := &Config{
Ctx: context.Background(),
// analyticsRate: globalconfig.AnalyticsRate(),
AnalyticsRate: math.NaN(),
}
cfg.DataStreamsEnabled = internal.BoolEnv("DD_DATA_STREAMS_ENABLED", false)
if internal.BoolEnv("DD_TRACE_KAFKA_ANALYTICS_ENABLED", false) {
cfg.AnalyticsRate = 1.0
}

cfg.ConsumerServiceName = namingschema.ServiceName(defaultServiceName)
cfg.ProducerServiceName = namingschema.ServiceNameOverrideV0(defaultServiceName, defaultServiceName)
cfg.ConsumerSpanName = namingschema.OpName(namingschema.KafkaInbound)
cfg.ProducerSpanName = namingschema.OpName(namingschema.KafkaOutbound)

for _, opt := range opts {
opt(cfg)
}
return cfg
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (pt *ProducerTracer) traceEventsChannel(in chan kafka.Event) {
pt.Events = out
}

func (pt *ProducerTracer) WrapProduce(produceFn func(*kafka.Message, chan kafka.Event) error, msg *kafka.Message, deliveryChan chan kafka.Event) error {
func (pt *ProducerTracer) AroundProduce(msg *kafka.Message, deliveryChan chan kafka.Event) func(error) {
span := pt.startSpan(msg)

// if the user has selected a delivery channel, we will wrap it and
Expand All @@ -98,12 +98,12 @@ func (pt *ProducerTracer) WrapProduce(produceFn func(*kafka.Message, chan kafka.

setProduceCheckpoint(pt.DataStreamsEnabled, pt.LibraryVersion, msg)

err := produceFn(msg, deliveryChan)
// with no delivery channel or enqueue error, finish immediately
if err != nil || deliveryChan == nil {
span.Finish(tracer.WithError(err))
return func(err error) {
// with no delivery channel or enqueue error, finish immediately
if err != nil || deliveryChan == nil {
span.Finish(tracer.WithError(err))
}
}
return err
}

func (pt *ProducerTracer) startSpan(msg *kafka.Message) ddtrace.Span {
Expand Down
37 changes: 21 additions & 16 deletions contrib/confluentinc/confluent-kafka-go/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ package kafka // import "gopkg.in/DataDog/dd-trace-go.v1/contrib/confluentinc/co
import (
"time"

"gopkg.in/DataDog/dd-trace-go.v1/contrib/confluentinc/confluent-kafka-go/kafka/internal"
"gopkg.in/DataDog/dd-trace-go.v1/contrib/confluentinc/confluent-kafka-go/kafka/internal/tracing"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
"gopkg.in/DataDog/dd-trace-go.v1/internal/log"
Expand Down Expand Up @@ -52,22 +53,22 @@ func NewProducer(conf *kafka.ConfigMap, opts ...Option) (*Producer, error) {
// A Consumer wraps a kafka.Consumer.
type Consumer struct {
*kafka.Consumer
cfg *config
cfg *internal.Config
tracer *tracing.ConsumerTracer
}

// WrapConsumer wraps a kafka.Consumer so that any consumed events are traced.
func WrapConsumer(c *kafka.Consumer, opts ...Option) *Consumer {
cfg := newConfig(opts...)
cfg := internal.NewConfig(opts...)
wrapped := &Consumer{
Consumer: c,
cfg: cfg,
tracer: tracing.NewConsumerTracer(cfg.ctx, c, cfg.dataStreamsEnabled, cfg.groupID, tracing.StartSpanConfig{
Service: cfg.consumerServiceName,
Operation: cfg.consumerSpanName,
BootstrapServers: cfg.bootstrapServers,
AnalyticsRate: cfg.analyticsRate,
TagFns: cfg.tagFns,
tracer: tracing.NewConsumerTracer(cfg.Ctx, c, cfg.DataStreamsEnabled, cfg.GroupID, tracing.StartSpanConfig{
Service: cfg.ConsumerServiceName,
Operation: cfg.ConsumerSpanName,
BootstrapServers: cfg.BootstrapServers,
AnalyticsRate: cfg.AnalyticsRate,
TagFns: cfg.TagFns,
}),
}
log.Debug("%s: Wrapping Consumer: %#v", packageName, wrapped.cfg)
Expand Down Expand Up @@ -126,21 +127,21 @@ func (c *Consumer) CommitOffsets(offsets []kafka.TopicPartition) ([]kafka.TopicP
// A Producer wraps a kafka.Producer.
type Producer struct {
*kafka.Producer
cfg *config
cfg *internal.Config
tracer *tracing.ProducerTracer
}

// WrapProducer wraps a kafka.Producer so requests are traced.
func WrapProducer(p *kafka.Producer, opts ...Option) *Producer {
cfg := newConfig(opts...)
cfg := internal.NewConfig(opts...)
wrapped := &Producer{
Producer: p,
cfg: cfg,
tracer: tracing.NewProducerTracer(cfg.ctx, p, cfg.dataStreamsEnabled, tracing.StartSpanConfig{
Service: cfg.producerServiceName,
Operation: cfg.producerSpanName,
BootstrapServers: cfg.bootstrapServers,
AnalyticsRate: cfg.analyticsRate,
tracer: tracing.NewProducerTracer(cfg.Ctx, p, cfg.DataStreamsEnabled, tracing.StartSpanConfig{
Service: cfg.ProducerServiceName,
Operation: cfg.ProducerSpanName,
BootstrapServers: cfg.BootstrapServers,
AnalyticsRate: cfg.AnalyticsRate,
}),
}
log.Debug("%s: Wrapping Producer: %#v", packageName, wrapped.cfg)
Expand All @@ -162,7 +163,11 @@ func (p *Producer) Close() {

// Produce calls the underlying Producer.Produce and traces the request.
func (p *Producer) Produce(msg *kafka.Message, deliveryChan chan kafka.Event) error {
return p.tracer.WrapProduce(p.Producer.Produce, msg, deliveryChan)
var err error
stop := p.tracer.AroundProduce(msg, deliveryChan)
defer stop(err)

err = p.Producer.Produce(msg, deliveryChan)
}

Check failure on line 171 in contrib/confluentinc/confluent-kafka-go/kafka/kafka.go

View workflow job for this annotation

GitHub Actions / PR Unit and Integration Tests / test-contrib

missing return

Check failure on line 171 in contrib/confluentinc/confluent-kafka-go/kafka/kafka.go

View workflow job for this annotation

GitHub Actions / PR Unit and Integration Tests / test-contrib

missing return

// ProduceChannel returns a channel which can receive kafka Messages and will
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func produceThenConsume(t *testing.T, consumerAction consumerActionFn, producerO
// they should be linked via headers
assert.Equal(t, spans[0].TraceID(), spans[1].TraceID())

if c.cfg.dataStreamsEnabled {
if c.cfg.DataStreamsEnabled {
backlogs := mt.SentDSMBacklogs()
toMap := func(b []internaldsm.Backlog) map[string]struct{} {
m := make(map[string]struct{})
Expand Down
92 changes: 27 additions & 65 deletions contrib/confluentinc/confluent-kafka-go/kafka/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,114 +11,76 @@ import (
"net"
"strings"

"gopkg.in/DataDog/dd-trace-go.v1/internal"
"gopkg.in/DataDog/dd-trace-go.v1/internal/namingschema"

"github.com/confluentinc/confluent-kafka-go/kafka"
)

const defaultServiceName = "kafka"

type config struct {
ctx context.Context
consumerServiceName string
producerServiceName string
consumerSpanName string
producerSpanName string
analyticsRate float64
bootstrapServers string
groupID string
tagFns map[string]func(msg *kafka.Message) interface{}
dataStreamsEnabled bool
}

// An Option customizes the config.
type Option func(cfg *config)

func newConfig(opts ...Option) *config {
cfg := &config{
ctx: context.Background(),
// analyticsRate: globalconfig.AnalyticsRate(),
analyticsRate: math.NaN(),
}
cfg.dataStreamsEnabled = internal.BoolEnv("DD_DATA_STREAMS_ENABLED", false)
if internal.BoolEnv("DD_TRACE_KAFKA_ANALYTICS_ENABLED", false) {
cfg.analyticsRate = 1.0
}

cfg.consumerServiceName = namingschema.ServiceName(defaultServiceName)
cfg.producerServiceName = namingschema.ServiceNameOverrideV0(defaultServiceName, defaultServiceName)
cfg.consumerSpanName = namingschema.OpName(namingschema.KafkaInbound)
cfg.producerSpanName = namingschema.OpName(namingschema.KafkaOutbound)
"gopkg.in/DataDog/dd-trace-go.v1/contrib/confluentinc/confluent-kafka-go/kafka/internal"
)

for _, opt := range opts {
opt(cfg)
}
return cfg
}
// An Option customizes the Config.
type Option = internal.Option

// WithContext sets the config context to ctx.
// WithContext sets the internal.Config context to ctx.
// Deprecated: This is deprecated in favor of passing the context
// via the message headers
func WithContext(ctx context.Context) Option {
return func(cfg *config) {
cfg.ctx = ctx
return func(cfg *internal.Config) {
cfg.Ctx = ctx
}
}

// WithServiceName sets the config service name to serviceName.
// WithServiceName sets the internal.Config service name to serviceName.
func WithServiceName(serviceName string) Option {
return func(cfg *config) {
cfg.consumerServiceName = serviceName
cfg.producerServiceName = serviceName
return func(cfg *internal.Config) {
cfg.ConsumerServiceName = serviceName
cfg.ProducerServiceName = serviceName
}
}

// WithAnalytics enables Trace Analytics for all started spans.
func WithAnalytics(on bool) Option {
return func(cfg *config) {
return func(cfg *internal.Config) {
if on {
cfg.analyticsRate = 1.0
cfg.AnalyticsRate = 1.0
} else {
cfg.analyticsRate = math.NaN()
cfg.AnalyticsRate = math.NaN()
}
}
}

// WithAnalyticsRate sets the sampling rate for Trace Analytics events
// correlated to started spans.
func WithAnalyticsRate(rate float64) Option {
return func(cfg *config) {
return func(cfg *internal.Config) {
if rate >= 0.0 && rate <= 1.0 {
cfg.analyticsRate = rate
cfg.AnalyticsRate = rate
} else {
cfg.analyticsRate = math.NaN()
cfg.AnalyticsRate = math.NaN()
}
}
}

// WithCustomTag will cause the given tagFn to be evaluated after executing
// a query and attach the result to the span tagged by the key.
func WithCustomTag(tag string, tagFn func(msg *kafka.Message) interface{}) Option {
return func(cfg *config) {
if cfg.tagFns == nil {
cfg.tagFns = make(map[string]func(msg *kafka.Message) interface{})
return func(cfg *internal.Config) {
if cfg.TagFns == nil {
cfg.TagFns = make(map[string]func(msg *kafka.Message) interface{})
}
cfg.tagFns[tag] = tagFn
cfg.TagFns[tag] = tagFn
}
}

// WithConfig extracts the config information for the client to be tagged
// WithConfig extracts the internal.Config information for the client to be tagged
func WithConfig(cg *kafka.ConfigMap) Option {
return func(cfg *config) {
return func(cfg *internal.Config) {
if groupID, err := cg.Get("group.id", ""); err == nil {
cfg.groupID = groupID.(string)
cfg.GroupID = groupID.(string)
}
if bs, err := cg.Get("bootstrap.servers", ""); err == nil && bs != "" {
for _, addr := range strings.Split(bs.(string), ",") {
host, _, err := net.SplitHostPort(addr)
if err == nil {
cfg.bootstrapServers = host
cfg.BootstrapServers = host
return
}
}
Expand All @@ -128,7 +90,7 @@ func WithConfig(cg *kafka.ConfigMap) Option {

// WithDataStreams enables the Data Streams monitoring product features: https://www.datadoghq.com/product/data-streams-monitoring/
func WithDataStreams() Option {
return func(cfg *config) {
cfg.dataStreamsEnabled = true
return func(cfg *internal.Config) {
cfg.DataStreamsEnabled = true
}
}
33 changes: 17 additions & 16 deletions contrib/confluentinc/confluent-kafka-go/kafka/option_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,36 +9,37 @@ import (
"math"
"testing"

"gopkg.in/DataDog/dd-trace-go.v1/contrib/confluentinc/confluent-kafka-go/kafka/internal"
"gopkg.in/DataDog/dd-trace-go.v1/internal/globalconfig"

"github.com/stretchr/testify/assert"
)

func TestDataStreamsActivation(t *testing.T) {
t.Run("default", func(t *testing.T) {
cfg := newConfig()
assert.False(t, cfg.dataStreamsEnabled)
cfg := internal.NewConfig()
assert.False(t, cfg.DataStreamsEnabled)
})
t.Run("withOption", func(t *testing.T) {
cfg := newConfig(WithDataStreams())
assert.True(t, cfg.dataStreamsEnabled)
cfg := internal.NewConfig(WithDataStreams())
assert.True(t, cfg.DataStreamsEnabled)
})
t.Run("withEnv", func(t *testing.T) {
t.Setenv("DD_DATA_STREAMS_ENABLED", "true")
cfg := newConfig()
assert.True(t, cfg.dataStreamsEnabled)
cfg := internal.NewConfig()
assert.True(t, cfg.DataStreamsEnabled)
})
t.Run("optionOverridesEnv", func(t *testing.T) {
t.Setenv("DD_DATA_STREAMS_ENABLED", "false")
cfg := newConfig(WithDataStreams())
assert.True(t, cfg.dataStreamsEnabled)
cfg := internal.NewConfig(WithDataStreams())
assert.True(t, cfg.DataStreamsEnabled)
})
}

func TestAnalyticsSettings(t *testing.T) {
t.Run("defaults", func(t *testing.T) {
cfg := newConfig()
assert.True(t, math.IsNaN(cfg.analyticsRate))
cfg := internal.NewConfig()
assert.True(t, math.IsNaN(cfg.AnalyticsRate))
})

t.Run("global", func(t *testing.T) {
Expand All @@ -47,21 +48,21 @@ func TestAnalyticsSettings(t *testing.T) {
defer globalconfig.SetAnalyticsRate(rate)
globalconfig.SetAnalyticsRate(0.4)

cfg := newConfig()
assert.Equal(t, 0.4, cfg.analyticsRate)
cfg := internal.NewConfig()
assert.Equal(t, 0.4, cfg.AnalyticsRate)
})

t.Run("enabled", func(t *testing.T) {
cfg := newConfig(WithAnalytics(true))
assert.Equal(t, 1.0, cfg.analyticsRate)
cfg := internal.NewConfig(WithAnalytics(true))
assert.Equal(t, 1.0, cfg.AnalyticsRate)
})

t.Run("override", func(t *testing.T) {
rate := globalconfig.AnalyticsRate()
defer globalconfig.SetAnalyticsRate(rate)
globalconfig.SetAnalyticsRate(0.4)

cfg := newConfig(WithAnalyticsRate(0.2))
assert.Equal(t, 0.2, cfg.analyticsRate)
cfg := internal.NewConfig(WithAnalyticsRate(0.2))
assert.Equal(t, 0.2, cfg.AnalyticsRate)
})
}

0 comments on commit 79dcb90

Please sign in to comment.