Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support confluent-kafka-go v1 and v2 #320

Open
wants to merge 18 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions _integration-tests/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ go 1.22.6

replace github.com/DataDog/orchestrion => ../

replace gopkg.in/DataDog/dd-trace-go.v1 => github.com/DataDog/dd-trace-go v1.39.0-alpha.1.0.20241007155017-aad992f3461a

require (
cloud.google.com/go/pubsub v1.42.0
github.com/99designs/gqlgen v0.17.36
Expand All @@ -15,6 +17,8 @@ require (
github.com/aws/aws-sdk-go-v2/config v1.27.30
github.com/aws/aws-sdk-go-v2/credentials v1.17.29
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.34.6
github.com/confluentinc/confluent-kafka-go v1.9.2
github.com/confluentinc/confluent-kafka-go/v2 v2.5.4
github.com/dave/jennifer v1.7.0
github.com/docker/go-connections v0.5.0
github.com/gin-gonic/gin v1.10.0
Expand All @@ -30,7 +34,7 @@ require (
github.com/graph-gophers/graphql-go v1.5.0
github.com/graphql-go/graphql v0.8.1
github.com/graphql-go/handler v0.2.3
github.com/hashicorp/vault/api v1.14.0
github.com/hashicorp/vault/api v1.15.0
github.com/jinzhu/gorm v1.9.16
github.com/labstack/echo/v4 v4.12.0
github.com/mattn/go-sqlite3 v1.14.22
Expand Down Expand Up @@ -65,7 +69,6 @@ require (
cloud.google.com/go/iam v1.2.0 // indirect
cloud.google.com/go/storage v1.43.0 // indirect
dario.cat/mergo v1.0.1 // indirect
github.com/AdaLogics/go-fuzz-headers v0.0.0-20230811130428-ced1acdcaa24 // indirect
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.14.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.7.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/internal v1.10.0 // indirect
Expand All @@ -75,7 +78,7 @@ require (
github.com/Azure/go-autorest/autorest/to v0.4.0 // indirect
github.com/AzureAD/microsoft-authentication-library-for-go v1.2.2 // indirect
github.com/BurntSushi/locker v0.0.0-20171006230638-a6e239ea1c69 // indirect
github.com/DataDog/appsec-internal-go v1.7.0 // indirect
github.com/DataDog/appsec-internal-go v1.8.0 // indirect
github.com/DataDog/datadog-agent/pkg/obfuscate v0.56.0 // indirect
github.com/DataDog/datadog-agent/pkg/remoteconfig/state v0.57.0 // indirect
github.com/DataDog/datadog-go/v5 v5.5.0 // indirect
Expand Down Expand Up @@ -134,7 +137,6 @@ require (
github.com/bep/tmc v0.5.1 // indirect
github.com/bytedance/sonic v1.12.1 // indirect
github.com/bytedance/sonic/loader v0.2.0 // indirect
github.com/cenkalti/backoff/v3 v3.2.2 // indirect
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/charmbracelet/lipgloss v0.13.0 // indirect
Expand Down Expand Up @@ -336,7 +338,6 @@ require (
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.54.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.54.0 // indirect
go.opentelemetry.io/otel v1.29.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.19.0 // indirect
go.opentelemetry.io/otel/metric v1.29.0 // indirect
go.opentelemetry.io/otel/trace v1.29.0 // indirect
go.uber.org/atomic v1.11.0 // indirect
Expand Down
211 changes: 199 additions & 12 deletions _integration-tests/go.sum

Large diffs are not rendered by default.

19 changes: 19 additions & 0 deletions _integration-tests/tests/confluent-kafka-go.v1/gen_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

145 changes: 145 additions & 0 deletions _integration-tests/tests/confluent-kafka-go.v1/kafka.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2023-present Datadog, Inc.

//go:build integration

package kafka

import (
"context"
"strings"
"testing"
"time"

"orchestrion/integration/utils"
"orchestrion/integration/validator/trace"

"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/stretchr/testify/require"
kafkatest "github.com/testcontainers/testcontainers-go/modules/kafka"
)

var (
topic = "gotest"
consumerGroup = "gotest"
partition = int32(0)
)

type TestCase struct {
container *kafkatest.KafkaContainer
addr []string
}

func (tc *TestCase) Setup(t *testing.T) {
container, addr := utils.StartKafkaTestContainer(t)
tc.container = container
tc.addr = []string{addr}
}

func (tc *TestCase) Run(t *testing.T) {
tc.produceMessage(t)
tc.consumeMessage(t)
}

func (tc *TestCase) kafkaBootstrapServers() string {
return strings.Join(tc.addr, ",")
}

func (tc *TestCase) produceMessage(t *testing.T) {
t.Helper()

cfg := &kafka.ConfigMap{
"bootstrap.servers": tc.kafkaBootstrapServers(),
"go.delivery.reports": true,
}
delivery := make(chan kafka.Event, 1)

producer, err := kafka.NewProducer(cfg)
require.NoError(t, err, "failed to create producer")
defer func() {
<-delivery
producer.Close()
}()

err = producer.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{
Topic: &topic,
Partition: partition,
},
Key: []byte("key2"),
Value: []byte("value2"),
}, delivery)
require.NoError(t, err, "failed to send message")
}

func (tc *TestCase) consumeMessage(t *testing.T) {
t.Helper()

cfg := &kafka.ConfigMap{
"group.id": consumerGroup,
"bootstrap.servers": tc.kafkaBootstrapServers(),
"fetch.wait.max.ms": 500,
"socket.timeout.ms": 1500,
"session.timeout.ms": 1500,
"enable.auto.offset.store": false,
}
c, err := kafka.NewConsumer(cfg)
require.NoError(t, err, "failed to create consumer")
defer c.Close()

err = c.Assign([]kafka.TopicPartition{
{Topic: &topic, Partition: 0},
})
require.NoError(t, err)

m, err := c.ReadMessage(3000 * time.Millisecond)
require.NoError(t, err)

_, err = c.CommitMessage(m)
require.NoError(t, err)

require.Equal(t, "key2", string(m.Key))
}

func (tc *TestCase) Teardown(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()

require.NoError(t, tc.container.Terminate(ctx))
}

func (*TestCase) ExpectedTraces() trace.Traces {
return trace.Traces{
{
Tags: map[string]any{
"name": "kafka.produce",
"type": "queue",
"service": "kafka",
"resource": "Produce Topic gotest",
},
Meta: map[string]string{
"span.kind": "producer",
"component": "confluentinc/confluent-kafka-go/kafka",
"messaging.system": "kafka",
},
Children: trace.Traces{
{
Tags: map[string]any{
"name": "kafka.consume",
"type": "queue",
"service": "kafka",
"resource": "Consume Topic gotest",
},
Meta: map[string]string{
"span.kind": "consumer",
"component": "confluentinc/confluent-kafka-go/kafka",
"messaging.system": "kafka",
"messaging.kafka.bootstrap.servers": "localhost",
},
},
},
},
}
}
19 changes: 19 additions & 0 deletions _integration-tests/tests/confluent-kafka-go.v2/gen_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

145 changes: 145 additions & 0 deletions _integration-tests/tests/confluent-kafka-go.v2/kafka.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2023-present Datadog, Inc.

//go:build integration

package kafka

import (
"context"
"strings"
"testing"
"time"

"orchestrion/integration/utils"
"orchestrion/integration/validator/trace"

"github.com/confluentinc/confluent-kafka-go/v2/kafka"
"github.com/stretchr/testify/require"
kafkatest "github.com/testcontainers/testcontainers-go/modules/kafka"
)

var (
topic = "gotest"
consumerGroup = "gotest"
partition = int32(0)
)

type TestCase struct {
container *kafkatest.KafkaContainer
addr []string
}

func (tc *TestCase) Setup(t *testing.T) {
container, addr := utils.StartKafkaTestContainer(t)
tc.container = container
tc.addr = []string{addr}
}

func (tc *TestCase) Run(t *testing.T) {
tc.produceMessage(t)
tc.consumeMessage(t)
}

func (tc *TestCase) kafkaBootstrapServers() string {
return strings.Join(tc.addr, ",")
}

func (tc *TestCase) produceMessage(t *testing.T) {
t.Helper()

cfg := &kafka.ConfigMap{
"bootstrap.servers": tc.kafkaBootstrapServers(),
"go.delivery.reports": true,
}
delivery := make(chan kafka.Event, 1)

producer, err := kafka.NewProducer(cfg)
require.NoError(t, err, "failed to create producer")
defer func() {
<-delivery
producer.Close()
}()

err = producer.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{
Topic: &topic,
Partition: partition,
},
Key: []byte("key2"),
Value: []byte("value2"),
}, delivery)
require.NoError(t, err, "failed to send message")
}

func (tc *TestCase) consumeMessage(t *testing.T) {
t.Helper()

cfg := &kafka.ConfigMap{
"group.id": consumerGroup,
"bootstrap.servers": tc.kafkaBootstrapServers(),
"fetch.wait.max.ms": 500,
"socket.timeout.ms": 1500,
"session.timeout.ms": 1500,
"enable.auto.offset.store": false,
}
c, err := kafka.NewConsumer(cfg)
require.NoError(t, err, "failed to create consumer")
defer c.Close()

err = c.Assign([]kafka.TopicPartition{
{Topic: &topic, Partition: 0},
})
require.NoError(t, err)

m, err := c.ReadMessage(3000 * time.Millisecond)
require.NoError(t, err)

_, err = c.CommitMessage(m)
require.NoError(t, err)

require.Equal(t, "key2", string(m.Key))
}

func (tc *TestCase) Teardown(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()

require.NoError(t, tc.container.Terminate(ctx))
}

func (*TestCase) ExpectedTraces() trace.Traces {
return trace.Traces{
{
Tags: map[string]any{
"name": "kafka.produce",
"type": "queue",
"service": "kafka",
"resource": "Produce Topic gotest",
},
Meta: map[string]string{
"span.kind": "producer",
"component": "confluentinc/confluent-kafka-go/kafka.v2",
"messaging.system": "kafka",
},
Children: trace.Traces{
{
Tags: map[string]any{
"name": "kafka.consume",
"type": "queue",
"service": "kafka",
"resource": "Consume Topic gotest",
},
Meta: map[string]string{
"span.kind": "consumer",
"component": "confluentinc/confluent-kafka-go/kafka.v2",
"messaging.system": "kafka",
"messaging.kafka.bootstrap.servers": "localhost",
},
},
},
},
}
}
Loading
Loading