Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support segmentio/kafka.go.v0 #293

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
6 changes: 5 additions & 1 deletion _integration-tests/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ go 1.22.6

replace github.com/DataDog/orchestrion => ../

// TODO: remove
replace gopkg.in/DataDog/dd-trace-go.v1 => gopkg.in/DataDog/dd-trace-go.v1 v1.39.0-alpha.1.0.20241008195158-49ec2955c1d0

require (
cloud.google.com/go/pubsub v1.42.0
github.com/DataDog/orchestrion v0.7.4
Expand All @@ -30,6 +33,7 @@ require (
github.com/labstack/echo/v4 v4.12.0
github.com/mattn/go-sqlite3 v1.14.22
github.com/redis/go-redis/v9 v9.6.1
github.com/segmentio/kafka-go v0.4.42
github.com/stretchr/testify v1.9.0
github.com/testcontainers/testcontainers-go v0.33.0
github.com/testcontainers/testcontainers-go/modules/cassandra v0.33.0
Expand Down Expand Up @@ -69,7 +73,7 @@ require (
github.com/Azure/go-autorest/autorest/to v0.4.0 // indirect
github.com/AzureAD/microsoft-authentication-library-for-go v1.2.2 // indirect
github.com/BurntSushi/locker v0.0.0-20171006230638-a6e239ea1c69 // indirect
github.com/DataDog/appsec-internal-go v1.7.0 // indirect
github.com/DataDog/appsec-internal-go v1.8.0 // indirect
github.com/DataDog/datadog-agent/pkg/obfuscate v0.56.0 // indirect
github.com/DataDog/datadog-agent/pkg/remoteconfig/state v0.57.0 // indirect
github.com/DataDog/datadog-go/v5 v5.5.0 // indirect
Expand Down
10 changes: 6 additions & 4 deletions _integration-tests/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -657,8 +657,8 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03
github.com/BurntSushi/toml v1.3.2 h1:o7IhLm0Msx3BaB+n3Ag7L8EVlByGnpq14C4YWiu/gL8=
github.com/BurntSushi/toml v1.3.2/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/DataDog/appsec-internal-go v1.7.0 h1:iKRNLih83dJeVya3IoUfK+6HLD/hQsIbyBlfvLmAeb0=
github.com/DataDog/appsec-internal-go v1.7.0/go.mod h1:wW0cRfWBo4C044jHGwYiyh5moQV2x0AhnwqMuiX7O/g=
github.com/DataDog/appsec-internal-go v1.8.0 h1:1Tfn3LEogntRqZtf88twSApOCAAO3V+NILYhuQIo4J4=
github.com/DataDog/appsec-internal-go v1.8.0/go.mod h1:wW0cRfWBo4C044jHGwYiyh5moQV2x0AhnwqMuiX7O/g=
github.com/DataDog/datadog-agent/pkg/obfuscate v0.56.0 h1:UdBCkJ1a4uxQNzggUEEbPylagIUaCWvFDxuf9QKWMXE=
github.com/DataDog/datadog-agent/pkg/obfuscate v0.56.0/go.mod h1:NHgTieB5DpTc4AZrzx1xE+tPCWTJ7Hw3TVRiWuK505U=
github.com/DataDog/datadog-agent/pkg/remoteconfig/state v0.57.0 h1:LplNAmMgZvGU7kKA0+4c1xWOjz828xweW5TCi8Mw9Q0=
Expand Down Expand Up @@ -1512,6 +1512,8 @@ github.com/sanity-io/litter v1.5.5 h1:iE+sBxPBzoK6uaEP5Lt3fHNgpKcHXc/A2HGETy0uJQ
github.com/sanity-io/litter v1.5.5/go.mod h1:9gzJgR2i4ZpjZHsKvUXIRQVk7P+yM3e+jAF7bU2UI5U=
github.com/secure-systems-lab/go-securesystemslib v0.8.0 h1:mr5An6X45Kb2nddcFlbmfHkLguCE9laoZCUzEEpIZXA=
github.com/secure-systems-lab/go-securesystemslib v0.8.0/go.mod h1:UH2VZVuJfCYR8WgMlCU1uFsOUU+KeyrTWcSS73NBOzU=
github.com/segmentio/kafka-go v0.4.42 h1:qffhBZCz4WcWyNuHEclHjIMLs2slp6mZO8px+5W5tfU=
github.com/segmentio/kafka-go v0.4.42/go.mod h1:d0g15xPMqoUookug0OU75DhGZxXwCFxSLeJ4uphwJzg=
github.com/sergi/go-diff v1.1.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM=
github.com/sergi/go-diff v1.3.1 h1:xkr+Oxo4BOQKmkn/B9eMK0g5Kg/983T9DqqPHwYqD+8=
github.com/sergi/go-diff v1.3.1/go.mod h1:aMJSSKb2lpPvRNec0+w3fl7LP9IOFzdc9Pa4NFbPK1I=
Expand Down Expand Up @@ -2372,8 +2374,8 @@ google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqw
google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg=
google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw=
gopkg.in/DataDog/dd-trace-go.v1 v1.68.0 h1:8WPoOHJcMAtcxTVKM0DYnFweBjxxfNit3Sjo/rf+Hkw=
gopkg.in/DataDog/dd-trace-go.v1 v1.68.0/go.mod h1:mkZpWVLO/ERW5NqlW+w5d8waQKNvMSTUQLJfoI0vlvw=
gopkg.in/DataDog/dd-trace-go.v1 v1.39.0-alpha.1.0.20241008195158-49ec2955c1d0 h1:rRceEz0Vi5laWcU1xcn7X8Mgo9vHOGgAsKQN5HfA1vk=
gopkg.in/DataDog/dd-trace-go.v1 v1.39.0-alpha.1.0.20241008195158-49ec2955c1d0/go.mod h1:U9AOeBHNAL95JXcd/SPf4a7O5GNeF/yD13sJtli/yaU=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
19 changes: 19 additions & 0 deletions _integration-tests/tests/segmentio_kafka.v0/gen_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

242 changes: 242 additions & 0 deletions _integration-tests/tests/segmentio_kafka.v0/segmentio_kafka.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,242 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2023-present Datadog, Inc.

package segmentio_kafka_v0

import (
"context"
"errors"
"net"
"strconv"
"testing"
"time"

"github.com/segmentio/kafka-go"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
kafkatest "github.com/testcontainers/testcontainers-go/modules/kafka"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"

"orchestrion/integration/utils"
"orchestrion/integration/validator/trace"
)

const (
topicA = "topic-A"
topicB = "topic-B"
consumerGroup = "group-A"
)

type TestCase struct {
kafka *kafkatest.KafkaContainer
addr string
writer *kafka.Writer
}

func (tc *TestCase) Setup(t *testing.T) {
tc.kafka, tc.addr = utils.StartKafkaTestContainer(t)

tc.writer = &kafka.Writer{
Addr: kafka.TCP(tc.addr),
Balancer: &kafka.LeastBytes{},
}
tc.createTopic(t)
}

func (tc *TestCase) newReader(topic string) *kafka.Reader {
return kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{tc.addr},
GroupID: consumerGroup,
Topic: topic,
MaxWait: 10 * time.Millisecond,
MaxBytes: 10e6, // 10MB
})
}

func (tc *TestCase) createTopic(t *testing.T) {
conn, err := kafka.Dial("tcp", tc.addr)
require.NoError(t, err)
defer conn.Close()

controller, err := conn.Controller()
require.NoError(t, err)

controllerConn, err := kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
require.NoError(t, err)
defer controllerConn.Close()

topicConfigs := []kafka.TopicConfig{
{
Topic: topicA,
NumPartitions: 1,
ReplicationFactor: 1,
},
{
Topic: topicB,
NumPartitions: 1,
ReplicationFactor: 1,
},
}
err = controllerConn.CreateTopics(topicConfigs...)
require.NoError(t, err)
}

func (tc *TestCase) Run(t *testing.T) {
tc.produce(t)
tc.consume(t)
}

func (tc *TestCase) produce(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

span, ctx := tracer.StartSpanFromContext(ctx, "test.root")
defer span.Finish()

messages := []kafka.Message{
{
Topic: topicA,
Key: []byte("Key-A"),
Value: []byte("Hello World!"),
},
{
Topic: topicB,
Key: []byte("Key-A"),
Value: []byte("Second message"),
},
{
Topic: topicB,
Key: []byte("Key-A"),
Value: []byte("Third message"),
},
}
const (
maxRetries = 10
retryDelay = 100 * time.Millisecond
)
var (
retryCount int
err error
)
for retryCount < maxRetries {
err = tc.writer.WriteMessages(ctx, messages...)
if err == nil {
break
}
// This error happens sometimes with brand-new topics, as there is a delay between when the topic is created
// on the broker, and when the topic can actually be written to.
if errors.Is(err, kafka.UnknownTopicOrPartition) {
retryCount++
t.Logf("failed to produce kafka messages, will retry in %s (retryCount: %d)", retryDelay, retryCount)
time.Sleep(retryDelay)
}
}
require.NoError(t, err)
require.NoError(t, tc.writer.Close())
}

func (tc *TestCase) consume(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

readerA := tc.newReader(topicA)
m, err := readerA.ReadMessage(ctx)
require.NoError(t, err)
assert.Equal(t, "Hello World!", string(m.Value))
assert.Equal(t, "Key-A", string(m.Key))
require.NoError(t, readerA.Close())

readerB := tc.newReader(topicB)
m, err = readerB.FetchMessage(ctx)
require.NoError(t, err)
assert.Equal(t, "Second message", string(m.Value))
assert.Equal(t, "Key-A", string(m.Key))
err = readerB.CommitMessages(ctx, m)
require.NoError(t, err)
require.NoError(t, readerB.Close())
}

func (tc *TestCase) Teardown(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
require.NoError(t, tc.kafka.Terminate(ctx))
}

func (*TestCase) ExpectedTraces() trace.Traces {
return trace.Traces{
{
Tags: map[string]any{
"name": "test.root",
},
Children: trace.Traces{
{
Tags: map[string]any{
"name": "kafka.produce",
"type": "queue",
"service": "kafka",
"resource": "Produce Topic topic-A",
},
Meta: map[string]string{
"span.kind": "producer",
"component": "segmentio/kafka.go.v0",
},
Children: trace.Traces{
{
Tags: map[string]any{
"name": "kafka.consume",
"type": "queue",
"service": "kafka",
"resource": "Consume Topic topic-A",
},
Meta: map[string]string{
"span.kind": "consumer",
"component": "segmentio/kafka.go.v0",
},
},
},
},
{
Tags: map[string]any{
"name": "kafka.produce",
"type": "queue",
"service": "kafka",
"resource": "Produce Topic topic-B",
},
Meta: map[string]string{
"span.kind": "producer",
"component": "segmentio/kafka.go.v0",
},
Children: trace.Traces{
{
Tags: map[string]any{
"name": "kafka.consume",
"type": "queue",
"service": "kafka",
"resource": "Consume Topic topic-B",
},
Meta: map[string]string{
"span.kind": "consumer",
"component": "segmentio/kafka.go.v0",
},
},
},
},
{
Tags: map[string]any{
"name": "kafka.produce",
"type": "queue",
"service": "kafka",
"resource": "Produce Topic topic-B",
},
Meta: map[string]string{
"span.kind": "producer",
"component": "segmentio/kafka.go.v0",
},
Children: nil,
},
},
},
}
}
5 changes: 4 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ module github.com/DataDog/orchestrion

go 1.22.6

// TODO: remove
replace gopkg.in/DataDog/dd-trace-go.v1 => gopkg.in/DataDog/dd-trace-go.v1 v1.39.0-alpha.1.0.20241008195158-49ec2955c1d0

require (
github.com/charmbracelet/lipgloss v0.13.0
github.com/dave/dst v0.27.3
Expand Down Expand Up @@ -45,7 +48,7 @@ require (
github.com/Azure/go-autorest/autorest/to v0.4.0 // indirect
github.com/AzureAD/microsoft-authentication-library-for-go v1.2.2 // indirect
github.com/BurntSushi/locker v0.0.0-20171006230638-a6e239ea1c69 // indirect
github.com/DataDog/appsec-internal-go v1.7.0 // indirect
github.com/DataDog/appsec-internal-go v1.8.0 // indirect
github.com/DataDog/datadog-agent/pkg/obfuscate v0.56.0 // indirect
github.com/DataDog/datadog-agent/pkg/remoteconfig/state v0.57.0 // indirect
github.com/DataDog/datadog-go/v5 v5.5.0 // indirect
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03
github.com/BurntSushi/toml v1.3.2 h1:o7IhLm0Msx3BaB+n3Ag7L8EVlByGnpq14C4YWiu/gL8=
github.com/BurntSushi/toml v1.3.2/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/DataDog/appsec-internal-go v1.7.0 h1:iKRNLih83dJeVya3IoUfK+6HLD/hQsIbyBlfvLmAeb0=
github.com/DataDog/appsec-internal-go v1.7.0/go.mod h1:wW0cRfWBo4C044jHGwYiyh5moQV2x0AhnwqMuiX7O/g=
github.com/DataDog/appsec-internal-go v1.8.0 h1:1Tfn3LEogntRqZtf88twSApOCAAO3V+NILYhuQIo4J4=
github.com/DataDog/appsec-internal-go v1.8.0/go.mod h1:wW0cRfWBo4C044jHGwYiyh5moQV2x0AhnwqMuiX7O/g=
github.com/DataDog/datadog-agent/pkg/obfuscate v0.56.0 h1:UdBCkJ1a4uxQNzggUEEbPylagIUaCWvFDxuf9QKWMXE=
github.com/DataDog/datadog-agent/pkg/obfuscate v0.56.0/go.mod h1:NHgTieB5DpTc4AZrzx1xE+tPCWTJ7Hw3TVRiWuK505U=
github.com/DataDog/datadog-agent/pkg/remoteconfig/state v0.57.0 h1:LplNAmMgZvGU7kKA0+4c1xWOjz828xweW5TCi8Mw9Q0=
Expand Down Expand Up @@ -1272,8 +1272,8 @@ google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp0
google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg=
google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw=
gopkg.in/DataDog/dd-trace-go.v1 v1.68.0 h1:8WPoOHJcMAtcxTVKM0DYnFweBjxxfNit3Sjo/rf+Hkw=
gopkg.in/DataDog/dd-trace-go.v1 v1.68.0/go.mod h1:mkZpWVLO/ERW5NqlW+w5d8waQKNvMSTUQLJfoI0vlvw=
gopkg.in/DataDog/dd-trace-go.v1 v1.39.0-alpha.1.0.20241008195158-49ec2955c1d0 h1:rRceEz0Vi5laWcU1xcn7X8Mgo9vHOGgAsKQN5HfA1vk=
gopkg.in/DataDog/dd-trace-go.v1 v1.39.0-alpha.1.0.20241008195158-49ec2955c1d0/go.mod h1:U9AOeBHNAL95JXcd/SPf4a7O5GNeF/yD13sJtli/yaU=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
Loading