diff --git a/contrib/IBM/sarama.v1/example_test.go b/contrib/IBM/sarama.v1/example_test.go new file mode 100644 index 0000000000..3990625b15 --- /dev/null +++ b/contrib/IBM/sarama.v1/example_test.go @@ -0,0 +1,84 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016 Datadog, Inc. + +package sarama_test + +import ( + "log" + + saramatrace "gopkg.in/DataDog/dd-trace-go.v1/contrib/IBM/sarama.v1" + "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" + + "github.com/IBM/sarama" +) + +func Example_asyncProducer() { + cfg := sarama.NewConfig() + cfg.Version = sarama.V0_11_0_0 // minimum version that supports headers which are required for tracing + + producer, err := sarama.NewAsyncProducer([]string{"localhost:9092"}, cfg) + if err != nil { + panic(err) + } + defer producer.Close() + + producer = saramatrace.WrapAsyncProducer(cfg, producer) + + msg := &sarama.ProducerMessage{ + Topic: "some-topic", + Value: sarama.StringEncoder("Hello World"), + } + producer.Input() <- msg +} + +func Example_syncProducer() { + cfg := sarama.NewConfig() + cfg.Producer.Return.Successes = true + + producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, cfg) + if err != nil { + panic(err) + } + defer producer.Close() + + producer = saramatrace.WrapSyncProducer(cfg, producer) + + msg := &sarama.ProducerMessage{ + Topic: "some-topic", + Value: sarama.StringEncoder("Hello World"), + } + _, _, err = producer.SendMessage(msg) + if err != nil { + panic(err) + } +} + +func Example_consumer() { + consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil) + if err != nil { + panic(err) + } + defer consumer.Close() + + consumer = saramatrace.WrapConsumer(consumer) + + partitionConsumer, err := consumer.ConsumePartition("some-topic", 0, sarama.OffsetNewest) + if err != nil { + panic(err) + } + defer partitionConsumer.Close() + + consumed := 0 + for msg := range partitionConsumer.Messages() { + // if you want to use the kafka message as a parent span: + if spanctx, err := tracer.Extract(saramatrace.NewConsumerMessageCarrier(msg)); err == nil { + // you can create a span using ChildOf(spanctx) + _ = spanctx + } + + log.Printf("Consumed message offset %d\n", msg.Offset) + consumed++ + } +} diff --git a/contrib/IBM/sarama.v1/headers.go b/contrib/IBM/sarama.v1/headers.go new file mode 100644 index 0000000000..288a0bbb19 --- /dev/null +++ b/contrib/IBM/sarama.v1/headers.go @@ -0,0 +1,96 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016 Datadog, Inc. + +package sarama + +import ( + "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" + + "github.com/IBM/sarama" +) + +// A ProducerMessageCarrier injects and extracts traces from a sarama.ProducerMessage. +type ProducerMessageCarrier struct { + msg *sarama.ProducerMessage +} + +var _ interface { + tracer.TextMapReader + tracer.TextMapWriter +} = (*ProducerMessageCarrier)(nil) + +// ForeachKey iterates over every header. +func (c ProducerMessageCarrier) ForeachKey(handler func(key, val string) error) error { + for _, h := range c.msg.Headers { + err := handler(string(h.Key), string(h.Value)) + if err != nil { + return err + } + } + return nil +} + +// Set sets a header. +func (c ProducerMessageCarrier) Set(key, val string) { + // ensure uniqueness of keys + for i := 0; i < len(c.msg.Headers); i++ { + if string(c.msg.Headers[i].Key) == key { + c.msg.Headers = append(c.msg.Headers[:i], c.msg.Headers[i+1:]...) + i-- + } + } + c.msg.Headers = append(c.msg.Headers, sarama.RecordHeader{ + Key: []byte(key), + Value: []byte(val), + }) +} + +// NewProducerMessageCarrier creates a new ProducerMessageCarrier. +func NewProducerMessageCarrier(msg *sarama.ProducerMessage) ProducerMessageCarrier { + return ProducerMessageCarrier{msg} +} + +// A ConsumerMessageCarrier injects and extracts traces from a sarama.ConsumerMessage. +type ConsumerMessageCarrier struct { + msg *sarama.ConsumerMessage +} + +var _ interface { + tracer.TextMapReader + tracer.TextMapWriter +} = (*ConsumerMessageCarrier)(nil) + +// NewConsumerMessageCarrier creates a new ConsumerMessageCarrier. +func NewConsumerMessageCarrier(msg *sarama.ConsumerMessage) ConsumerMessageCarrier { + return ConsumerMessageCarrier{msg} +} + +// ForeachKey iterates over every header. +func (c ConsumerMessageCarrier) ForeachKey(handler func(key, val string) error) error { + for _, h := range c.msg.Headers { + if h != nil { + err := handler(string(h.Key), string(h.Value)) + if err != nil { + return err + } + } + } + return nil +} + +// Set sets a header. +func (c ConsumerMessageCarrier) Set(key, val string) { + // ensure uniqueness of keys + for i := 0; i < len(c.msg.Headers); i++ { + if c.msg.Headers[i] != nil && string(c.msg.Headers[i].Key) == key { + c.msg.Headers = append(c.msg.Headers[:i], c.msg.Headers[i+1:]...) + i-- + } + } + c.msg.Headers = append(c.msg.Headers, &sarama.RecordHeader{ + Key: []byte(key), + Value: []byte(val), + }) +} diff --git a/contrib/IBM/sarama.v1/option.go b/contrib/IBM/sarama.v1/option.go new file mode 100644 index 0000000000..23461fbad8 --- /dev/null +++ b/contrib/IBM/sarama.v1/option.go @@ -0,0 +1,75 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016 Datadog, Inc. + +package sarama + +import ( + "math" + + "gopkg.in/DataDog/dd-trace-go.v1/internal" + "gopkg.in/DataDog/dd-trace-go.v1/internal/namingschema" +) + +const defaultServiceName = "kafka" + +type config struct { + consumerServiceName string + producerServiceName string + consumerSpanName string + producerSpanName string + analyticsRate float64 +} + +func defaults(cfg *config) { + cfg.consumerServiceName = namingschema.NewDefaultServiceName(defaultServiceName).GetName() + cfg.producerServiceName = namingschema.NewDefaultServiceName( + defaultServiceName, + namingschema.WithOverrideV0(defaultServiceName), + ).GetName() + + cfg.consumerSpanName = namingschema.NewKafkaInboundOp().GetName() + cfg.producerSpanName = namingschema.NewKafkaOutboundOp().GetName() + + // cfg.analyticsRate = globalconfig.AnalyticsRate() + if internal.BoolEnv("DD_TRACE_SARAMA_ANALYTICS_ENABLED", false) { + cfg.analyticsRate = 1.0 + } else { + cfg.analyticsRate = math.NaN() + } +} + +// An Option is used to customize the config for the sarama tracer. +type Option func(cfg *config) + +// WithServiceName sets the given service name for the intercepted client. +func WithServiceName(name string) Option { + return func(cfg *config) { + cfg.consumerServiceName = name + cfg.producerServiceName = name + } +} + +// WithAnalytics enables Trace Analytics for all started spans. +func WithAnalytics(on bool) Option { + return func(cfg *config) { + if on { + cfg.analyticsRate = 1.0 + } else { + cfg.analyticsRate = math.NaN() + } + } +} + +// WithAnalyticsRate sets the sampling rate for Trace Analytics events +// correlated to started spans. +func WithAnalyticsRate(rate float64) Option { + return func(cfg *config) { + if rate >= 0.0 && rate <= 1.0 { + cfg.analyticsRate = rate + } else { + cfg.analyticsRate = math.NaN() + } + } +} diff --git a/contrib/IBM/sarama.v1/sarama.go b/contrib/IBM/sarama.v1/sarama.go new file mode 100644 index 0000000000..946a61ef70 --- /dev/null +++ b/contrib/IBM/sarama.v1/sarama.go @@ -0,0 +1,305 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016 Datadog, Inc. + +// Package sarama provides functions to trace the IBM/sarama package (https://github.com/IBM/sarama). +package sarama // import "gopkg.in/DataDog/dd-trace-go.v1/contrib/IBM/sarama" + +import ( + "math" + + "gopkg.in/DataDog/dd-trace-go.v1/ddtrace" + "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext" + "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" + "gopkg.in/DataDog/dd-trace-go.v1/internal/log" + "gopkg.in/DataDog/dd-trace-go.v1/internal/telemetry" + + "github.com/IBM/sarama" +) + +const componentName = "IBM/sarama" + +func init() { + telemetry.LoadIntegration(componentName) + tracer.MarkIntegrationImported("github.com/IBM/sarama") +} + +type partitionConsumer struct { + sarama.PartitionConsumer + messages chan *sarama.ConsumerMessage +} + +// Messages returns the read channel for the messages that are returned by +// the broker. +func (pc *partitionConsumer) Messages() <-chan *sarama.ConsumerMessage { + return pc.messages +} + +// WrapPartitionConsumer wraps a sarama.PartitionConsumer causing each received +// message to be traced. +func WrapPartitionConsumer(pc sarama.PartitionConsumer, opts ...Option) sarama.PartitionConsumer { + cfg := new(config) + defaults(cfg) + for _, opt := range opts { + opt(cfg) + } + log.Debug("contrib/IBM/sarama: Wrapping Partition Consumer: %#v", cfg) + wrapped := &partitionConsumer{ + PartitionConsumer: pc, + messages: make(chan *sarama.ConsumerMessage), + } + go func() { + msgs := pc.Messages() + var prev ddtrace.Span + for msg := range msgs { + // create the next span from the message + opts := []tracer.StartSpanOption{ + tracer.ServiceName(cfg.consumerServiceName), + tracer.ResourceName("Consume Topic " + msg.Topic), + tracer.SpanType(ext.SpanTypeMessageConsumer), + tracer.Tag(ext.MessagingKafkaPartition, msg.Partition), + tracer.Tag("offset", msg.Offset), + tracer.Tag(ext.Component, componentName), + tracer.Tag(ext.SpanKind, ext.SpanKindConsumer), + tracer.Tag(ext.MessagingSystem, ext.MessagingSystemKafka), + tracer.Measured(), + } + if !math.IsNaN(cfg.analyticsRate) { + opts = append(opts, tracer.Tag(ext.EventSampleRate, cfg.analyticsRate)) + } + // kafka supports headers, so try to extract a span context + carrier := NewConsumerMessageCarrier(msg) + if spanctx, err := tracer.Extract(carrier); err == nil { + opts = append(opts, tracer.ChildOf(spanctx)) + } + next := tracer.StartSpan(cfg.consumerSpanName, opts...) + // reinject the span context so consumers can pick it up + tracer.Inject(next.Context(), carrier) + + wrapped.messages <- msg + + // if the next message was received, finish the previous span + if prev != nil { + prev.Finish() + } + prev = next + } + // finish any remaining span + if prev != nil { + prev.Finish() + } + close(wrapped.messages) + }() + return wrapped +} + +type consumer struct { + sarama.Consumer + opts []Option +} + +// ConsumePartition invokes Consumer.ConsumePartition and wraps the resulting +// PartitionConsumer. +func (c *consumer) ConsumePartition(topic string, partition int32, offset int64) (sarama.PartitionConsumer, error) { + pc, err := c.Consumer.ConsumePartition(topic, partition, offset) + if err != nil { + return pc, err + } + return WrapPartitionConsumer(pc, c.opts...), nil +} + +// WrapConsumer wraps a sarama.Consumer wrapping any PartitionConsumer created +// via Consumer.ConsumePartition. +func WrapConsumer(c sarama.Consumer, opts ...Option) sarama.Consumer { + return &consumer{ + Consumer: c, + opts: opts, + } +} + +type syncProducer struct { + sarama.SyncProducer + version sarama.KafkaVersion + cfg *config +} + +// SendMessage calls sarama.SyncProducer.SendMessage and traces the request. +func (p *syncProducer) SendMessage(msg *sarama.ProducerMessage) (partition int32, offset int64, err error) { + span := startProducerSpan(p.cfg, p.version, msg) + partition, offset, err = p.SyncProducer.SendMessage(msg) + finishProducerSpan(span, partition, offset, err) + return partition, offset, err +} + +// SendMessages calls sarama.SyncProducer.SendMessages and traces the requests. +func (p *syncProducer) SendMessages(msgs []*sarama.ProducerMessage) error { + // although there's only one call made to the SyncProducer, the messages are + // treated individually, so we create a span for each one + spans := make([]ddtrace.Span, len(msgs)) + for i, msg := range msgs { + spans[i] = startProducerSpan(p.cfg, p.version, msg) + } + err := p.SyncProducer.SendMessages(msgs) + for i, span := range spans { + finishProducerSpan(span, msgs[i].Partition, msgs[i].Offset, err) + } + return err +} + +// WrapSyncProducer wraps a sarama.SyncProducer so that all produced messages +// are traced. +func WrapSyncProducer(saramaConfig *sarama.Config, producer sarama.SyncProducer, opts ...Option) sarama.SyncProducer { + cfg := new(config) + defaults(cfg) + for _, opt := range opts { + opt(cfg) + } + log.Debug("contrib/IBM/sarama: Wrapping Sync Producer: %#v", cfg) + if saramaConfig == nil { + saramaConfig = sarama.NewConfig() + } + return &syncProducer{ + SyncProducer: producer, + version: saramaConfig.Version, + cfg: cfg, + } +} + +type asyncProducer struct { + sarama.AsyncProducer + input chan *sarama.ProducerMessage + successes chan *sarama.ProducerMessage + errors chan *sarama.ProducerError +} + +// Input returns the input channel. +func (p *asyncProducer) Input() chan<- *sarama.ProducerMessage { + return p.input +} + +// Successes returns the successes channel. +func (p *asyncProducer) Successes() <-chan *sarama.ProducerMessage { + return p.successes +} + +// Errors returns the errors channel. +func (p *asyncProducer) Errors() <-chan *sarama.ProducerError { + return p.errors +} + +// WrapAsyncProducer wraps a sarama.AsyncProducer so that all produced messages +// are traced. It requires the underlying sarama Config so we can know whether +// or not successes will be returned. Tracing requires at least sarama.V0_11_0_0 +// version which is the first version that supports headers. Only spans of +// successfully published messages have partition and offset tags set. +func WrapAsyncProducer(saramaConfig *sarama.Config, p sarama.AsyncProducer, opts ...Option) sarama.AsyncProducer { + cfg := new(config) + defaults(cfg) + for _, opt := range opts { + opt(cfg) + } + log.Debug("contrib/IBM/sarama: Wrapping Async Producer: %#v", cfg) + if saramaConfig == nil { + saramaConfig = sarama.NewConfig() + saramaConfig.Version = sarama.V0_11_0_0 + } else if !saramaConfig.Version.IsAtLeast(sarama.V0_11_0_0) { + log.Error("Tracing Sarama async producer requires at least sarama.V0_11_0_0 version") + } + wrapped := &asyncProducer{ + AsyncProducer: p, + input: make(chan *sarama.ProducerMessage), + successes: make(chan *sarama.ProducerMessage), + errors: make(chan *sarama.ProducerError), + } + go func() { + spans := make(map[uint64]ddtrace.Span) + defer close(wrapped.input) + defer close(wrapped.successes) + defer close(wrapped.errors) + for { + select { + case msg := <-wrapped.input: + span := startProducerSpan(cfg, saramaConfig.Version, msg) + p.Input() <- msg + if saramaConfig.Producer.Return.Successes { + spanID := span.Context().SpanID() + spans[spanID] = span + } else { + // if returning successes isn't enabled, we just finish the + // span right away because there's no way to know when it will + // be done + span.Finish() + } + case msg, ok := <-p.Successes(): + if !ok { + // producer was closed, so exit + return + } + if spanctx, spanFound := getSpanContext(msg); spanFound { + spanID := spanctx.SpanID() + if span, ok := spans[spanID]; ok { + delete(spans, spanID) + finishProducerSpan(span, msg.Partition, msg.Offset, nil) + } + } + wrapped.successes <- msg + case err, ok := <-p.Errors(): + if !ok { + // producer was closed + return + } + if spanctx, spanFound := getSpanContext(err.Msg); spanFound { + spanID := spanctx.SpanID() + if span, ok := spans[spanID]; ok { + delete(spans, spanID) + span.Finish(tracer.WithError(err)) + } + } + wrapped.errors <- err + } + } + }() + return wrapped +} + +func startProducerSpan(cfg *config, version sarama.KafkaVersion, msg *sarama.ProducerMessage) ddtrace.Span { + carrier := NewProducerMessageCarrier(msg) + opts := []tracer.StartSpanOption{ + tracer.ServiceName(cfg.producerServiceName), + tracer.ResourceName("Produce Topic " + msg.Topic), + tracer.SpanType(ext.SpanTypeMessageProducer), + tracer.Tag(ext.Component, componentName), + tracer.Tag(ext.SpanKind, ext.SpanKindProducer), + tracer.Tag(ext.MessagingSystem, ext.MessagingSystemKafka), + } + if !math.IsNaN(cfg.analyticsRate) { + opts = append(opts, tracer.Tag(ext.EventSampleRate, cfg.analyticsRate)) + } + // if there's a span context in the headers, use that as the parent + if spanctx, err := tracer.Extract(carrier); err == nil { + opts = append(opts, tracer.ChildOf(spanctx)) + } + span := tracer.StartSpan(cfg.producerSpanName, opts...) + if version.IsAtLeast(sarama.V0_11_0_0) { + // re-inject the span context so consumers can pick it up + tracer.Inject(span.Context(), carrier) + } + return span +} + +func finishProducerSpan(span ddtrace.Span, partition int32, offset int64, err error) { + span.SetTag(ext.MessagingKafkaPartition, partition) + span.SetTag("offset", offset) + span.Finish(tracer.WithError(err)) +} + +func getSpanContext(msg *sarama.ProducerMessage) (ddtrace.SpanContext, bool) { + carrier := NewProducerMessageCarrier(msg) + spanctx, err := tracer.Extract(carrier) + if err != nil { + return nil, false + } + + return spanctx, true +} diff --git a/contrib/IBM/sarama.v1/sarama_test.go b/contrib/IBM/sarama.v1/sarama_test.go new file mode 100644 index 0000000000..f759640410 --- /dev/null +++ b/contrib/IBM/sarama.v1/sarama_test.go @@ -0,0 +1,393 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016 Datadog, Inc. + +package sarama + +import ( + "context" + "testing" + "time" + + "gopkg.in/DataDog/dd-trace-go.v1/contrib/internal/namingschematest" + "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext" + "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/mocktracer" + "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" + + "github.com/IBM/sarama" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func genTestSpans(t *testing.T, serviceOverride string) []mocktracer.Span { + var opts []Option + if serviceOverride != "" { + opts = append(opts, WithServiceName(serviceOverride)) + } + mt := mocktracer.Start() + defer mt.Stop() + + broker := sarama.NewMockBroker(t, 1) + defer broker.Close() + + broker.SetHandlerByMap(map[string]sarama.MockResponse{ + "MetadataRequest": sarama.NewMockMetadataResponse(t). + SetBroker(broker.Addr(), broker.BrokerID()). + SetLeader("test-topic", 0, broker.BrokerID()), + "OffsetRequest": sarama.NewMockOffsetResponse(t). + SetOffset("test-topic", 0, sarama.OffsetOldest, 0). + SetOffset("test-topic", 0, sarama.OffsetNewest, 1), + "FetchRequest": sarama.NewMockFetchResponse(t, 1). + SetMessage("test-topic", 0, 0, sarama.StringEncoder("hello")), + "ProduceRequest": sarama.NewMockProduceResponse(t). + SetError("test-topic", 0, sarama.ErrNoError), + }) + cfg := sarama.NewConfig() + cfg.Version = sarama.MinVersion + cfg.Producer.Return.Successes = true + cfg.Producer.Flush.Messages = 1 + + producer, err := sarama.NewSyncProducer([]string{broker.Addr()}, cfg) + require.NoError(t, err) + producer = WrapSyncProducer(cfg, producer, opts...) + + c, err := sarama.NewConsumer([]string{broker.Addr()}, cfg) + require.NoError(t, err) + defer func(c sarama.Consumer) { + err := c.Close() + require.NoError(t, err) + }(c) + c = WrapConsumer(c, opts...) + + msg1 := &sarama.ProducerMessage{ + Topic: "test-topic", + Value: sarama.StringEncoder("test 1"), + Metadata: "test", + } + _, _, err = producer.SendMessage(msg1) + require.NoError(t, err) + + pc, err := c.ConsumePartition("test-topic", 0, 0) + if err != nil { + t.Fatal(err) + } + _ = <-pc.Messages() + err = pc.Close() + require.NoError(t, err) + // wait for the channel to be closed + <-pc.Messages() + + spans := mt.FinishedSpans() + require.Len(t, spans, 2) + return spans +} + +func TestConsumer(t *testing.T) { + mt := mocktracer.Start() + defer mt.Stop() + + broker := sarama.NewMockBroker(t, 0) + defer broker.Close() + + broker.SetHandlerByMap(map[string]sarama.MockResponse{ + "MetadataRequest": sarama.NewMockMetadataResponse(t). + SetBroker(broker.Addr(), broker.BrokerID()). + SetLeader("test-topic", 0, broker.BrokerID()), + "OffsetRequest": sarama.NewMockOffsetResponse(t). + SetOffset("test-topic", 0, sarama.OffsetOldest, 0). + SetOffset("test-topic", 0, sarama.OffsetNewest, 1), + "FetchRequest": sarama.NewMockFetchResponse(t, 1). + SetMessage("test-topic", 0, 0, sarama.StringEncoder("hello")). + SetMessage("test-topic", 0, 1, sarama.StringEncoder("world")), + }) + cfg := sarama.NewConfig() + cfg.Version = sarama.MinVersion + client, err := sarama.NewClient([]string{broker.Addr()}, cfg) + if err != nil { + t.Fatal(err) + } + defer client.Close() + + consumer, err := sarama.NewConsumerFromClient(client) + if err != nil { + t.Fatal(err) + } + defer consumer.Close() + + consumer = WrapConsumer(consumer) + + partitionConsumer, err := consumer.ConsumePartition("test-topic", 0, 0) + if err != nil { + t.Fatal(err) + } + msg1 := <-partitionConsumer.Messages() + msg2 := <-partitionConsumer.Messages() + partitionConsumer.Close() + // wait for the channel to be closed + <-partitionConsumer.Messages() + + spans := mt.FinishedSpans() + assert.Len(t, spans, 2) + { + s := spans[0] + spanctx, err := tracer.Extract(NewConsumerMessageCarrier(msg1)) + assert.NoError(t, err) + assert.Equal(t, spanctx.TraceID(), s.TraceID(), + "span context should be injected into the consumer message headers") + + assert.Equal(t, int32(0), s.Tag(ext.MessagingKafkaPartition)) + assert.Equal(t, int64(0), s.Tag("offset")) + assert.Equal(t, "kafka", s.Tag(ext.ServiceName)) + assert.Equal(t, "Consume Topic test-topic", s.Tag(ext.ResourceName)) + assert.Equal(t, "queue", s.Tag(ext.SpanType)) + assert.Equal(t, "kafka.consume", s.OperationName()) + assert.Equal(t, "IBM/sarama", s.Tag(ext.Component)) + assert.Equal(t, ext.SpanKindConsumer, s.Tag(ext.SpanKind)) + assert.Equal(t, "kafka", s.Tag(ext.MessagingSystem)) + } + { + s := spans[1] + spanctx, err := tracer.Extract(NewConsumerMessageCarrier(msg2)) + assert.NoError(t, err) + assert.Equal(t, spanctx.TraceID(), s.TraceID(), + "span context should be injected into the consumer message headers") + + assert.Equal(t, int32(0), s.Tag(ext.MessagingKafkaPartition)) + assert.Equal(t, int64(1), s.Tag("offset")) + assert.Equal(t, "kafka", s.Tag(ext.ServiceName)) + assert.Equal(t, "Consume Topic test-topic", s.Tag(ext.ResourceName)) + assert.Equal(t, "queue", s.Tag(ext.SpanType)) + assert.Equal(t, "kafka.consume", s.OperationName()) + assert.Equal(t, "IBM/sarama", s.Tag(ext.Component)) + assert.Equal(t, ext.SpanKindConsumer, s.Tag(ext.SpanKind)) + assert.Equal(t, "kafka", s.Tag(ext.MessagingSystem)) + } +} + +func TestSyncProducer(t *testing.T) { + mt := mocktracer.Start() + defer mt.Stop() + + seedBroker := sarama.NewMockBroker(t, 1) + defer seedBroker.Close() + + leader := sarama.NewMockBroker(t, 2) + defer leader.Close() + + metadataResponse := new(sarama.MetadataResponse) + metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) + metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, sarama.ErrNoError) + seedBroker.Returns(metadataResponse) + + prodSuccess := new(sarama.ProduceResponse) + prodSuccess.AddTopicPartition("my_topic", 0, sarama.ErrNoError) + leader.Returns(prodSuccess) + + cfg := sarama.NewConfig() + cfg.Version = sarama.MinVersion + cfg.Producer.Return.Successes = true + + producer, err := sarama.NewSyncProducer([]string{seedBroker.Addr()}, cfg) + if err != nil { + t.Fatal(err) + } + producer = WrapSyncProducer(cfg, producer) + + msg1 := &sarama.ProducerMessage{ + Topic: "my_topic", + Value: sarama.StringEncoder("test 1"), + Metadata: "test", + } + producer.SendMessage(msg1) + + spans := mt.FinishedSpans() + assert.Len(t, spans, 1) + { + s := spans[0] + assert.Equal(t, "kafka", s.Tag(ext.ServiceName)) + assert.Equal(t, "queue", s.Tag(ext.SpanType)) + assert.Equal(t, "Produce Topic my_topic", s.Tag(ext.ResourceName)) + assert.Equal(t, "kafka.produce", s.OperationName()) + assert.Equal(t, int32(0), s.Tag(ext.MessagingKafkaPartition)) + assert.Equal(t, int64(0), s.Tag("offset")) + assert.Equal(t, "IBM/sarama", s.Tag(ext.Component)) + assert.Equal(t, ext.SpanKindProducer, s.Tag(ext.SpanKind)) + assert.Equal(t, "kafka", s.Tag(ext.MessagingSystem)) + } +} + +func TestSyncProducerSendMessages(t *testing.T) { + mt := mocktracer.Start() + defer mt.Stop() + + seedBroker := sarama.NewMockBroker(t, 1) + defer seedBroker.Close() + leader := sarama.NewMockBroker(t, 2) + defer leader.Close() + + metadataResponse := new(sarama.MetadataResponse) + metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) + metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, sarama.ErrNoError) + seedBroker.Returns(metadataResponse) + + prodSuccess := new(sarama.ProduceResponse) + prodSuccess.AddTopicPartition("my_topic", 0, sarama.ErrNoError) + leader.Returns(prodSuccess) + + cfg := sarama.NewConfig() + cfg.Version = sarama.MinVersion + cfg.Producer.Return.Successes = true + cfg.Producer.Flush.Messages = 2 + + producer, err := sarama.NewSyncProducer([]string{seedBroker.Addr()}, cfg) + if err != nil { + t.Fatal(err) + } + producer = WrapSyncProducer(cfg, producer) + + msg1 := &sarama.ProducerMessage{ + Topic: "my_topic", + Value: sarama.StringEncoder("test 1"), + Metadata: "test", + } + msg2 := &sarama.ProducerMessage{ + Topic: "my_topic", + Value: sarama.StringEncoder("test 2"), + Metadata: "test", + } + producer.SendMessages([]*sarama.ProducerMessage{msg1, msg2}) + spans := mt.FinishedSpans() + assert.Len(t, spans, 2) + for _, s := range spans { + assert.Equal(t, "kafka", s.Tag(ext.ServiceName)) + assert.Equal(t, "queue", s.Tag(ext.SpanType)) + assert.Equal(t, "Produce Topic my_topic", s.Tag(ext.ResourceName)) + assert.Equal(t, "kafka.produce", s.OperationName()) + assert.Equal(t, int32(0), s.Tag(ext.MessagingKafkaPartition)) + assert.Equal(t, "IBM/sarama", s.Tag(ext.Component)) + assert.Equal(t, ext.SpanKindProducer, s.Tag(ext.SpanKind)) + assert.Equal(t, "kafka", s.Tag(ext.MessagingSystem)) + } +} + +func TestAsyncProducer(t *testing.T) { + // the default for producers is a fire-and-forget model that doesn't return + // successes + t.Run("Without Successes", func(t *testing.T) { + t.Skip("Skipping test because sarama.MockBroker doesn't work with versions >= sarama.V0_11_0_0 " + + "https://github.com/IBM/sarama/issues/1665") + mt := mocktracer.Start() + defer mt.Stop() + + broker := newMockBroker(t) + + cfg := sarama.NewConfig() + cfg.Version = sarama.V0_11_0_0 + producer, err := sarama.NewAsyncProducer([]string{broker.Addr()}, cfg) + if err != nil { + t.Fatal(err) + } + producer = WrapAsyncProducer(nil, producer) + + msg1 := &sarama.ProducerMessage{ + Topic: "my_topic", + Value: sarama.StringEncoder("test 1"), + } + producer.Input() <- msg1 + + waitForSpans(mt, 1) + + spans := mt.FinishedSpans() + assert.Len(t, spans, 1) + { + s := spans[0] + assert.Equal(t, "kafka", s.Tag(ext.ServiceName)) + assert.Equal(t, "queue", s.Tag(ext.SpanType)) + assert.Equal(t, "Produce Topic my_topic", s.Tag(ext.ResourceName)) + assert.Equal(t, "kafka.produce", s.OperationName()) + assert.Equal(t, int32(0), s.Tag(ext.MessagingKafkaPartition)) + assert.Equal(t, int64(0), s.Tag("offset")) + assert.Equal(t, "IBM/sarama", s.Tag(ext.Component)) + assert.Equal(t, ext.SpanKindProducer, s.Tag(ext.SpanKind)) + assert.Equal(t, "kafka", s.Tag(ext.MessagingSystem)) + } + }) + + t.Run("With Successes", func(t *testing.T) { + t.Skip("Skipping test because sarama.MockBroker doesn't work with versions >= sarama.V0_11_0_0 " + + "https://github.com/IBM/sarama/issues/1665") + mt := mocktracer.Start() + defer mt.Stop() + + broker := newMockBroker(t) + + cfg := sarama.NewConfig() + cfg.Version = sarama.V0_11_0_0 + cfg.Producer.Return.Successes = true + + producer, err := sarama.NewAsyncProducer([]string{broker.Addr()}, cfg) + if err != nil { + t.Fatal(err) + } + producer = WrapAsyncProducer(cfg, producer) + + msg1 := &sarama.ProducerMessage{ + Topic: "my_topic", + Value: sarama.StringEncoder("test 1"), + } + producer.Input() <- msg1 + <-producer.Successes() + + spans := mt.FinishedSpans() + assert.Len(t, spans, 1) + { + s := spans[0] + assert.Equal(t, "kafka", s.Tag(ext.ServiceName)) + assert.Equal(t, "queue", s.Tag(ext.SpanType)) + assert.Equal(t, "Produce Topic my_topic", s.Tag(ext.ResourceName)) + assert.Equal(t, "kafka.produce", s.OperationName()) + assert.Equal(t, int32(0), s.Tag(ext.MessagingKafkaPartition)) + assert.Equal(t, int64(0), s.Tag("offset")) + assert.Equal(t, "IBM/sarama", s.Tag(ext.Component)) + assert.Equal(t, ext.SpanKindProducer, s.Tag(ext.SpanKind)) + assert.Equal(t, "kafka", s.Tag(ext.MessagingSystem)) + } + }) +} + +func TestNamingSchema(t *testing.T) { + namingschematest.NewKafkaTest(genTestSpans)(t) +} + +func newMockBroker(t *testing.T) *sarama.MockBroker { + broker := sarama.NewMockBroker(t, 1) + + metadataResponse := new(sarama.MetadataResponse) + metadataResponse.AddBroker(broker.Addr(), broker.BrokerID()) + metadataResponse.AddTopicPartition("my_topic", 0, broker.BrokerID(), nil, nil, nil, sarama.ErrNoError) + broker.Returns(metadataResponse) + + prodSuccess := new(sarama.ProduceResponse) + prodSuccess.AddTopicPartition("my_topic", 0, sarama.ErrNoError) + for i := 0; i < 10; i++ { + broker.Returns(prodSuccess) + } + return broker +} + +// waitForSpans polls the mock tracer until the expected number of spans +// appear +func waitForSpans(mt mocktracer.Tracer, sz int) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + + for len(mt.FinishedSpans()) < sz { + select { + case <-ctx.Done(): + return + default: + } + time.Sleep(time.Millisecond * 100) + } +} diff --git a/ddtrace/tracer/option.go b/ddtrace/tracer/option.go index 4c57cb8db5..8dda18ad0f 100644 --- a/ddtrace/tracer/option.go +++ b/ddtrace/tracer/option.go @@ -85,7 +85,8 @@ var contribIntegrations = map[string]struct { "gopkg.in/olivere/elastic.v3": {"Elasticsearch v3", false}, "github.com/redis/go-redis/v9": {"Redis v9", false}, "github.com/segmentio/kafka-go": {"Kafka v0", false}, - "github.com/Shopify/sarama": {"Kafka (sarama)", false}, + "github.com/IBM/sarama": {"IBM sarama", false}, + "github.com/Shopify/sarama": {"Shopify sarama", false}, "github.com/sirupsen/logrus": {"Logrus", false}, "github.com/syndtr/goleveldb": {"LevelDB", false}, "github.com/tidwall/buntdb": {"BuntDB", false}, diff --git a/ddtrace/tracer/option_test.go b/ddtrace/tracer/option_test.go index 26c5d8ffe4..02e7133aac 100644 --- a/ddtrace/tracer/option_test.go +++ b/ddtrace/tracer/option_test.go @@ -246,7 +246,7 @@ func TestAgentIntegration(t *testing.T) { defer srv.Close() cfg := newConfig(WithAgentAddr(strings.TrimPrefix(srv.URL, "http://"))) assert.NotNil(t, cfg.integrations) - assert.Equal(t, len(cfg.integrations), 53) + assert.Equal(t, len(cfg.integrations), 54) }) t.Run("uninstrumented", func(t *testing.T) { diff --git a/go.mod b/go.mod index 1bbbf94148..55f6e7b0a6 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,7 @@ require ( github.com/DataDog/go-libddwaf v1.5.0 github.com/DataDog/gostackparse v0.6.0 github.com/DataDog/sketches-go v1.4.2 + github.com/IBM/sarama v1.40.0 github.com/Shopify/sarama v1.38.1 github.com/aws/aws-sdk-go v1.44.327 github.com/aws/aws-sdk-go-v2 v1.20.3 diff --git a/go.sum b/go.sum index dbcf26bfa4..ea74e09093 100644 --- a/go.sum +++ b/go.sum @@ -642,6 +642,8 @@ github.com/DataDog/gostackparse v0.6.0 h1:egCGQviIabPwsyoWpGvIBGrEnNWez35aEO7OJ1 github.com/DataDog/gostackparse v0.6.0/go.mod h1:lTfqcJKqS9KnXQGnyQMCugq3u1FP6UZMfWR0aitKFMM= github.com/DataDog/sketches-go v1.4.2 h1:gppNudE9d19cQ98RYABOetxIhpTCl4m7CnbRZjvVA/o= github.com/DataDog/sketches-go v1.4.2/go.mod h1:xJIXldczJyyjnbDop7ZZcLxJdV3+7Kra7H1KMgpgkLk= +github.com/IBM/sarama v1.40.0 h1:QTVmX+gMKye52mT5x+Ve/Bod2D0Gy7ylE2Wslv+RHtc= +github.com/IBM/sarama v1.40.0/go.mod h1:6pBloAs1WanL/vsq5qFTyTGulJUntZHhMLOUYEIs9mg= github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c/go.mod h1:X0CRv0ky0k6m906ixxpzmDRLvX58TFUKS2eePweuyxk= github.com/Microsoft/go-winio v0.4.11/go.mod h1:VhR8bwka0BXejwEJY73c50VrPtXAaKcyvVC4A4RozmA= github.com/Microsoft/go-winio v0.4.14/go.mod h1:qXqCSQ3Xa7+6tgxaGTIe4Kpcdsi+P8jBhyzoq1bpyYA=