Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

contrib/segmentio/kafka.go.v0: refactor tracing code #2885

Merged
merged 13 commits into from
Oct 16, 2024
Merged
28 changes: 16 additions & 12 deletions .github/workflows/unit-integration-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -175,20 +175,24 @@ jobs:
image: memcached:1.5.9
ports:
- 11211:11211
zookeeper:
image: bitnami/zookeeper:latest
env:
ALLOW_ANONYMOUS_LOGIN: "yes"
ports:
- 2181:2181
kafka:
image: darccio/kafka:2.13-2.8.1
image: confluentinc/confluent-local:7.5.0
env:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
KAFKA_CREATE_TOPICS: gotest:1:1,gosegtest:1:1
KAFKA_BROKER_ID: 1
KAFKA_LISTENERS: "PLAINTEXT://0.0.0.0:9093,BROKER://0.0.0.0:9092,CONTROLLER://0.0.0.0:9094"
KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://localhost:9093,BROKER://localhost:9092"
KAFKA_REST_BOOTSTRAP_SERVERS: "PLAINTEXT://0.0.0.0:9093,BROKER://0.0.0.0:9092"
KAFKA_CONTROLLER_QUORUM_VOTERS: "1@localhost:9094"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT"
KAFKA_INTER_BROKER_LISTENER_NAME: "BROKER"
KAFKA_BROKER_ID: "1"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: "1"
KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS: "1"
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: "1"
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: "1"
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: "0"
KAFKA_NODE_ID: "1"
KAFKA_PROCESS_ROLES: "broker,controller"
KAFKA_CONTROLLER_LISTENER_NAMES: "CONTROLLER"
ports:
- 9092:9092
localstack:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,13 @@
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2024 Datadog, Inc.

// Package tracing contains tracing logic for the cloud.google.com/go/pubsub.v1 instrumentation.
//
// WARNING: this package SHOULD NOT import cloud.google.com/go/pubsub.
//
// The motivation of this package is to support orchestrion, which cannot use the main package because it imports
// the cloud.google.com/go/pubsub package, and since orchestrion modifies the library code itself,
// this would cause an import cycle.
package tracing

import (
Expand Down
24 changes: 0 additions & 24 deletions contrib/confluentinc/confluent-kafka-go/kafka.v2/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,30 +179,6 @@ func TestConsumerChannel(t *testing.T) {
}
}

/*
to run the integration test locally:

docker network create confluent

docker run --rm \
--name zookeeper \
--network confluent \
-p 2181:2181 \
-e ZOOKEEPER_CLIENT_PORT=2181 \
confluentinc/cp-zookeeper:5.0.0

docker run --rm \
--name kafka \
--network confluent \
-p 9092:9092 \
-e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
-e KAFKA_CREATE_TOPICS=gotest:1:1 \
-e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
confluentinc/cp-kafka:5.0.0
*/

func TestConsumerFunctional(t *testing.T) {
for _, tt := range []struct {
name string
Expand Down
24 changes: 0 additions & 24 deletions contrib/confluentinc/confluent-kafka-go/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,30 +179,6 @@ func TestConsumerChannel(t *testing.T) {
}
}

/*
to run the integration test locally:

docker network create confluent

docker run --rm \
--name zookeeper \
--network confluent \
-p 2181:2181 \
-e ZOOKEEPER_CLIENT_PORT=2181 \
confluentinc/cp-zookeeper:5.0.0

docker run --rm \
--name kafka \
--network confluent \
-p 9092:9092 \
-e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
-e KAFKA_CREATE_TOPICS=gotest:1:1 \
-e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
confluentinc/cp-kafka:5.0.0
*/

func TestConsumerFunctional(t *testing.T) {
for _, tt := range []struct {
name string
Expand Down
2 changes: 1 addition & 1 deletion contrib/segmentio/kafka.go.v0/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
kafkatrace "gopkg.in/DataDog/dd-trace-go.v1/contrib/segmentio/kafka.go.v0"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"

kafka "github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go"
)

func ExampleWriter() {
Expand Down
43 changes: 4 additions & 39 deletions contrib/segmentio/kafka.go.v0/headers.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,49 +6,14 @@
package kafka

import (
"github.com/segmentio/kafka-go"

"gopkg.in/DataDog/dd-trace-go.v1/contrib/segmentio/kafka.go.v0/internal/tracing"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"

"github.com/segmentio/kafka-go"
)

// A messageCarrier implements TextMapReader/TextMapWriter for extracting/injecting traces on a kafka.Message
type messageCarrier struct {
msg *kafka.Message
}

var _ interface {
tracer.TextMapReader
tracer.TextMapWriter
} = (*messageCarrier)(nil)

// ForeachKey conforms to the TextMapReader interface.
func (c messageCarrier) ForeachKey(handler func(key, val string) error) error {
for _, h := range c.msg.Headers {
err := handler(h.Key, string(h.Value))
if err != nil {
return err
}
}
return nil
}

// Set implements TextMapWriter
func (c messageCarrier) Set(key, val string) {
// ensure uniqueness of keys
for i := 0; i < len(c.msg.Headers); i++ {
if string(c.msg.Headers[i].Key) == key {
c.msg.Headers = append(c.msg.Headers[:i], c.msg.Headers[i+1:]...)
i--
}
}
c.msg.Headers = append(c.msg.Headers, kafka.Header{
Key: key,
Value: []byte(val),
})
}

// ExtractSpanContext retrieves the SpanContext from a kafka.Message
func ExtractSpanContext(msg kafka.Message) (ddtrace.SpanContext, error) {
return tracer.Extract(messageCarrier{&msg})
return tracer.Extract(tracing.NewMessageCarrier(wrapMessage(&msg)))
}
86 changes: 86 additions & 0 deletions contrib/segmentio/kafka.go.v0/internal/tracing/dsm.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2024 Datadog, Inc.

package tracing

import (
"context"

"gopkg.in/DataDog/dd-trace-go.v1/datastreams"
"gopkg.in/DataDog/dd-trace-go.v1/datastreams/options"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
)

func (tr *Tracer) SetConsumeDSMCheckpoint(msg Message) {
if !tr.dataStreamsEnabled || msg == nil {
return
}
edges := []string{"direction:in", "topic:" + msg.GetTopic(), "type:kafka"}
if tr.kafkaCfg.ConsumerGroupID != "" {
edges = append(edges, "group:"+tr.kafkaCfg.ConsumerGroupID)
}
carrier := NewMessageCarrier(msg)
ctx, ok := tracer.SetDataStreamsCheckpointWithParams(
datastreams.ExtractFromBase64Carrier(context.Background(), carrier),
options.CheckpointParams{PayloadSize: getConsumerMsgSize(msg)},
edges...,
)
if !ok {
return
}
datastreams.InjectToBase64Carrier(ctx, carrier)
if tr.kafkaCfg.ConsumerGroupID != "" {
// only track Kafka lag if a consumer group is set.
// since there is no ack mechanism, we consider that messages read are committed right away.
tracer.TrackKafkaCommitOffset(tr.kafkaCfg.ConsumerGroupID, msg.GetTopic(), int32(msg.GetPartition()), msg.GetOffset())
}
}

func (tr *Tracer) SetProduceDSMCheckpoint(msg Message, writer Writer) {
if !tr.dataStreamsEnabled || msg == nil {
return
}

var topic string
if writer.GetTopic() != "" {
topic = writer.GetTopic()
} else {
topic = msg.GetTopic()
}

edges := []string{"direction:out", "topic:" + topic, "type:kafka"}
carrier := MessageCarrier{msg}
ctx, ok := tracer.SetDataStreamsCheckpointWithParams(
datastreams.ExtractFromBase64Carrier(context.Background(), carrier),
options.CheckpointParams{PayloadSize: getProducerMsgSize(msg)},
edges...,
)
if !ok {
return
}

// Headers will be dropped if the current protocol does not support them
datastreams.InjectToBase64Carrier(ctx, carrier)
}

func getProducerMsgSize(msg Message) (size int64) {
for _, header := range msg.GetHeaders() {
size += int64(len(header.GetKey()) + len(header.GetValue()))
}
if msg.GetValue() != nil {
size += int64(len(msg.GetValue()))
}
if msg.GetKey() != nil {
size += int64(len(msg.GetKey()))
}
return size
}

func getConsumerMsgSize(msg Message) (size int64) {
for _, header := range msg.GetHeaders() {
size += int64(len(header.GetKey()) + len(header.GetValue()))
}
return size + int64(len(msg.GetValue())+len(msg.GetKey()))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2024 Datadog, Inc.

package tracing

import (
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
)

// A MessageCarrier implements TextMapReader/TextMapWriter for extracting/injecting traces on a kafka.Message
type MessageCarrier struct {
msg Message
}

var _ interface {
tracer.TextMapReader
tracer.TextMapWriter
} = (*MessageCarrier)(nil)

// ForeachKey conforms to the TextMapReader interface.
func (c MessageCarrier) ForeachKey(handler func(key, val string) error) error {
for _, h := range c.msg.GetHeaders() {
err := handler(h.GetKey(), string(h.GetValue()))
if err != nil {
return err
}
}
return nil
}

// Set implements TextMapWriter
func (c MessageCarrier) Set(key, val string) {
headers := c.msg.GetHeaders()
// ensure uniqueness of keys
for i := 0; i < len(headers); i++ {
if headers[i].GetKey() == key {
headers = append(headers[:i], headers[i+1:]...)
i--
}
}
headers = append(headers, KafkaHeader{
Key: key,
Value: []byte(val),
})
c.msg.SetHeaders(headers)
}

func NewMessageCarrier(msg Message) MessageCarrier {
return MessageCarrier{msg: msg}
}
Loading
Loading