From 3b0e7b8ef2afbccfd6393a03b5a71ed08306888b Mon Sep 17 00:00:00 2001 From: "shalk(xiao kun)" Date: Wed, 30 Oct 2024 23:29:57 +0800 Subject: [PATCH] convert kafka stream test from groovy to java (#12437) Co-authored-by: Jay DeLuca Co-authored-by: Lauri Tulmin --- .../test/groovy/KafkaStreamsBaseTest.groovy | 122 -------- .../groovy/KafkaStreamsDefaultTest.groovy | 232 -------------- ...afkaStreamsSuppressReceiveSpansTest.groovy | 186 ------------ .../kafkastreams/KafkaStreamsBaseTest.java | 183 ++++++++++++ .../kafkastreams/KafkaStreamsDefaultTest.java | 282 ++++++++++++++++++ .../KafkaStreamsReflectionUtil.java | 105 +++++++ .../KafkaStreamsSuppressReceiveSpansTest.java | 212 +++++++++++++ 7 files changed, 782 insertions(+), 540 deletions(-) delete mode 100644 instrumentation/kafka/kafka-streams-0.11/javaagent/src/test/groovy/KafkaStreamsBaseTest.groovy delete mode 100644 instrumentation/kafka/kafka-streams-0.11/javaagent/src/test/groovy/KafkaStreamsDefaultTest.groovy delete mode 100644 instrumentation/kafka/kafka-streams-0.11/javaagent/src/test/groovy/KafkaStreamsSuppressReceiveSpansTest.groovy create mode 100644 instrumentation/kafka/kafka-streams-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsBaseTest.java create mode 100644 instrumentation/kafka/kafka-streams-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsDefaultTest.java create mode 100644 instrumentation/kafka/kafka-streams-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsReflectionUtil.java create mode 100644 instrumentation/kafka/kafka-streams-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsSuppressReceiveSpansTest.java diff --git a/instrumentation/kafka/kafka-streams-0.11/javaagent/src/test/groovy/KafkaStreamsBaseTest.groovy b/instrumentation/kafka/kafka-streams-0.11/javaagent/src/test/groovy/KafkaStreamsBaseTest.groovy deleted file mode 100644 index 96b1827d03f0..000000000000 --- a/instrumentation/kafka/kafka-streams-0.11/javaagent/src/test/groovy/KafkaStreamsBaseTest.groovy +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification -import org.apache.kafka.clients.admin.AdminClient -import org.apache.kafka.clients.admin.NewTopic -import org.apache.kafka.clients.consumer.Consumer -import org.apache.kafka.clients.consumer.ConsumerRebalanceListener -import org.apache.kafka.clients.consumer.KafkaConsumer -import org.apache.kafka.clients.producer.KafkaProducer -import org.apache.kafka.clients.producer.Producer -import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.serialization.IntegerDeserializer -import org.apache.kafka.common.serialization.IntegerSerializer -import org.apache.kafka.common.serialization.StringDeserializer -import org.apache.kafka.common.serialization.StringSerializer -import org.slf4j.Logger -import org.slf4j.LoggerFactory -import org.testcontainers.containers.output.Slf4jLogConsumer -import org.testcontainers.containers.wait.strategy.Wait -import org.testcontainers.kafka.KafkaContainer -import org.testcontainers.utility.DockerImageName -import spock.lang.Shared - -import java.time.Duration -import java.util.concurrent.CountDownLatch -import java.util.concurrent.TimeUnit - -class KafkaStreamsBaseTest extends AgentInstrumentationSpecification { - private static final Logger logger = LoggerFactory.getLogger("io.opentelemetry.KafkaStreamsBaseTest") - - protected static final STREAM_PENDING = "test.pending" - protected static final STREAM_PROCESSED = "test.processed" - - @Shared - static KafkaContainer kafka - @Shared - static Producer producer - @Shared - static Consumer consumer - @Shared - static CountDownLatch consumerReady = new CountDownLatch(1) - - def setupSpec() { - kafka = new KafkaContainer(DockerImageName.parse("apache/kafka:3.8.0")) - .withEnv("KAFKA_HEAP_OPTS", "-Xmx256m") - .withLogConsumer(new Slf4jLogConsumer(logger)) - .waitingFor(Wait.forLogMessage(".*started \\(kafka.server.Kafka.*Server\\).*", 1)) - .withStartupTimeout(Duration.ofMinutes(1)) - kafka.start() - - // create test topic - AdminClient.create(["bootstrap.servers": kafka.bootstrapServers]).withCloseable { admin -> - admin.createTopics([ - new NewTopic(STREAM_PENDING, 1, (short) 1), - new NewTopic(STREAM_PROCESSED, 1, (short) 1), - ]).all().get(10, TimeUnit.SECONDS) - } - - producer = new KafkaProducer<>(producerProps(kafka.bootstrapServers)) - - def consumerProps = [ - "bootstrap.servers" : kafka.bootstrapServers, - "group.id" : "test", - "enable.auto.commit" : "true", - "auto.commit.interval.ms": "10", - "session.timeout.ms" : "30000", - "key.deserializer" : IntegerDeserializer, - "value.deserializer" : StringDeserializer - ] - consumer = new KafkaConsumer<>(consumerProps) - - consumer.subscribe([STREAM_PROCESSED], new ConsumerRebalanceListener() { - @Override - void onPartitionsRevoked(Collection collection) { - } - - @Override - void onPartitionsAssigned(Collection collection) { - consumerReady.countDown() - } - }) - } - - def cleanupSpec() { - consumer?.close() - producer?.close() - kafka.stop() - } - - static Map producerProps(String servers) { - // values copied from spring's KafkaTestUtils - return [ - "bootstrap.servers": servers, - "retries" : 0, - "batch.size" : "16384", - "linger.ms" : 1, - "buffer.memory" : "33554432", - "key.serializer" : IntegerSerializer, - "value.serializer" : StringSerializer - ] - } - - // Kafka's eventual consistency behavior forces us to do a couple of empty poll() calls until it gets properly assigned a topic partition - static void awaitUntilConsumerIsReady() { - if (consumerReady.await(0, TimeUnit.SECONDS)) { - return - } - for (i in 0..<10) { - consumer.poll(0) - if (consumerReady.await(1, TimeUnit.SECONDS)) { - break - } - } - if (consumerReady.getCount() != 0) { - throw new AssertionError("Consumer wasn't assigned any partitions!") - } - consumer.seekToBeginning([]) - } -} diff --git a/instrumentation/kafka/kafka-streams-0.11/javaagent/src/test/groovy/KafkaStreamsDefaultTest.groovy b/instrumentation/kafka/kafka-streams-0.11/javaagent/src/test/groovy/KafkaStreamsDefaultTest.groovy deleted file mode 100644 index e15b14301097..000000000000 --- a/instrumentation/kafka/kafka-streams-0.11/javaagent/src/test/groovy/KafkaStreamsDefaultTest.groovy +++ /dev/null @@ -1,232 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -import io.opentelemetry.api.trace.Span -import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator -import io.opentelemetry.context.Context -import io.opentelemetry.context.propagation.TextMapGetter -import io.opentelemetry.sdk.trace.data.SpanData -import io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes -import org.apache.kafka.clients.producer.ProducerRecord -import org.apache.kafka.common.header.Headers -import org.apache.kafka.common.serialization.Serdes -import org.apache.kafka.streams.KafkaStreams -import org.apache.kafka.streams.StreamsConfig -import org.apache.kafka.streams.kstream.KStream -import org.apache.kafka.streams.kstream.ValueMapper - -import java.time.Duration - -import static io.opentelemetry.api.trace.SpanKind.CONSUMER -import static io.opentelemetry.api.trace.SpanKind.PRODUCER - -class KafkaStreamsDefaultTest extends KafkaStreamsBaseTest { - - def "test kafka produce and consume with streams in-between"() { - setup: - def config = new Properties() - config.putAll(producerProps(KafkaStreamsBaseTest.kafka.bootstrapServers)) - config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-application") - config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName()) - config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()) - - // CONFIGURE PROCESSOR - def builder - try { - // Different class names for test and latestDepTest. - builder = Class.forName("org.apache.kafka.streams.kstream.KStreamBuilder").newInstance() - } catch (ClassNotFoundException | NoClassDefFoundError e) { - builder = Class.forName("org.apache.kafka.streams.StreamsBuilder").newInstance() - } - KStream textLines = builder.stream(STREAM_PENDING) - def values = textLines - .mapValues(new ValueMapper() { - @Override - String apply(String textLine) { - Span.current().setAttribute("asdf", "testing") - return textLine.toLowerCase() - } - }) - - KafkaStreams streams - try { - // Different api for test and latestDepTest. - values.to(Serdes.Integer(), Serdes.String(), STREAM_PROCESSED) - streams = new KafkaStreams(builder, config) - } catch (MissingMethodException e) { - def producer = Class.forName("org.apache.kafka.streams.kstream.Produced") - .with(Serdes.Integer(), Serdes.String()) - values.to(STREAM_PROCESSED, producer) - streams = new KafkaStreams(builder.build(), config) - } - streams.start() - - when: - String greeting = "TESTING TESTING 123!" - KafkaStreamsBaseTest.producer.send(new ProducerRecord<>(STREAM_PENDING, 10, greeting)) - - then: - awaitUntilConsumerIsReady() - def records = KafkaStreamsBaseTest.consumer.poll(Duration.ofSeconds(10).toMillis()) - Headers receivedHeaders = null - for (record in records) { - Span.current().setAttribute("testing", 123) - - assert record.key() == 10 - assert record.value() == greeting.toLowerCase() - - if (receivedHeaders == null) { - receivedHeaders = record.headers() - } - } - - assertTraces(3) { - traces.sort(orderByRootSpanName( - STREAM_PENDING + " publish", - STREAM_PENDING + " receive", - STREAM_PROCESSED + " receive")) - - SpanData producerPending, producerProcessed - - trace(0, 1) { - // kafka-clients PRODUCER - span(0) { - name STREAM_PENDING + " publish" - kind PRODUCER - hasNoParent() - attributes { - "$MessagingIncubatingAttributes.MESSAGING_SYSTEM" "kafka" - "$MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME" STREAM_PENDING - "$MessagingIncubatingAttributes.MESSAGING_OPERATION" "publish" - "messaging.client_id" { it.startsWith("producer") } - "$MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID" String - "$MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET" 0 - "$MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_KEY" "10" - } - } - - producerPending = span(0) - } - trace(1, 3) { - // kafka-clients CONSUMER receive - span(0) { - name STREAM_PENDING + " receive" - kind CONSUMER - hasNoParent() - attributes { - "$MessagingIncubatingAttributes.MESSAGING_SYSTEM" "kafka" - "$MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME" STREAM_PENDING - "$MessagingIncubatingAttributes.MESSAGING_OPERATION" "receive" - "messaging.client_id" { it.endsWith("consumer") } - "$MessagingIncubatingAttributes.MESSAGING_BATCH_MESSAGE_COUNT" 1 - if (Boolean.getBoolean("testLatestDeps")) { - "$MessagingIncubatingAttributes.MESSAGING_KAFKA_CONSUMER_GROUP" "test-application" - } - } - } - // kafka-stream CONSUMER - span(1) { - name STREAM_PENDING + " process" - kind CONSUMER - childOf span(0) - hasLink(producerPending) - attributes { - "$MessagingIncubatingAttributes.MESSAGING_SYSTEM" "kafka" - "$MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME" STREAM_PENDING - "$MessagingIncubatingAttributes.MESSAGING_OPERATION" "process" - "messaging.client_id" { it.endsWith("consumer") } - "$MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE" Long - "$MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID" String - "$MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET" 0 - "$MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_KEY" "10" - "kafka.record.queue_time_ms" { it >= 0 } - "asdf" "testing" - if (Boolean.getBoolean("testLatestDeps")) { - "$MessagingIncubatingAttributes.MESSAGING_KAFKA_CONSUMER_GROUP" "test-application" - } - } - } - // kafka-clients PRODUCER - span(2) { - name STREAM_PROCESSED + " publish" - kind PRODUCER - childOf span(1) - attributes { - "$MessagingIncubatingAttributes.MESSAGING_SYSTEM" "kafka" - "$MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME" STREAM_PROCESSED - "$MessagingIncubatingAttributes.MESSAGING_OPERATION" "publish" - "messaging.client_id" { it.endsWith("producer") } - "$MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID" String - "$MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET" 0 - } - } - - producerProcessed = span(2) - } - trace(2, 2) { - // kafka-clients CONSUMER receive - span(0) { - name STREAM_PROCESSED + " receive" - kind CONSUMER - hasNoParent() - attributes { - "$MessagingIncubatingAttributes.MESSAGING_SYSTEM" "kafka" - "$MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME" STREAM_PROCESSED - "$MessagingIncubatingAttributes.MESSAGING_OPERATION" "receive" - "messaging.client_id" { it.startsWith("consumer") } - "$MessagingIncubatingAttributes.MESSAGING_BATCH_MESSAGE_COUNT" 1 - if (Boolean.getBoolean("testLatestDeps")) { - "$MessagingIncubatingAttributes.MESSAGING_KAFKA_CONSUMER_GROUP" "test" - } - } - } - // kafka-clients CONSUMER process - span(1) { - name STREAM_PROCESSED + " process" - kind CONSUMER - childOf span(0) - hasLink producerProcessed - attributes { - "$MessagingIncubatingAttributes.MESSAGING_SYSTEM" "kafka" - "$MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME" STREAM_PROCESSED - "$MessagingIncubatingAttributes.MESSAGING_OPERATION" "process" - "messaging.client_id" { it.startsWith("consumer") } - "$MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE" Long - "$MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID" String - "$MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET" 0 - "$MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_KEY" "10" - if (Boolean.getBoolean("testLatestDeps")) { - "$MessagingIncubatingAttributes.MESSAGING_KAFKA_CONSUMER_GROUP" "test" - } - "kafka.record.queue_time_ms" { it >= 0 } - "testing" 123 - } - } - } - } - - receivedHeaders.iterator().hasNext() - def traceparent = new String(receivedHeaders.headers("traceparent").iterator().next().value()) - Context context = W3CTraceContextPropagator.instance.extract(Context.root(), "", new TextMapGetter() { - @Override - Iterable keys(String carrier) { - return Collections.singleton("traceparent") - } - - @Override - String get(String carrier, String key) { - if (key == "traceparent") { - return traceparent - } - return null - } - }) - def spanContext = Span.fromContext(context).getSpanContext() - def streamTrace = traces.find { it.size() == 3 } - def streamSendSpan = streamTrace[2] - spanContext.traceId == streamSendSpan.traceId - spanContext.spanId == streamSendSpan.spanId - } -} diff --git a/instrumentation/kafka/kafka-streams-0.11/javaagent/src/test/groovy/KafkaStreamsSuppressReceiveSpansTest.groovy b/instrumentation/kafka/kafka-streams-0.11/javaagent/src/test/groovy/KafkaStreamsSuppressReceiveSpansTest.groovy deleted file mode 100644 index 3e6422053d99..000000000000 --- a/instrumentation/kafka/kafka-streams-0.11/javaagent/src/test/groovy/KafkaStreamsSuppressReceiveSpansTest.groovy +++ /dev/null @@ -1,186 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -import io.opentelemetry.api.trace.Span -import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator -import io.opentelemetry.context.Context -import io.opentelemetry.context.propagation.TextMapGetter -import io.opentelemetry.sdk.trace.data.SpanData -import io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes -import org.apache.kafka.clients.producer.ProducerRecord -import org.apache.kafka.common.header.Headers -import org.apache.kafka.common.serialization.Serdes -import org.apache.kafka.streams.KafkaStreams -import org.apache.kafka.streams.StreamsConfig -import org.apache.kafka.streams.kstream.KStream -import org.apache.kafka.streams.kstream.ValueMapper - -import java.time.Duration - -import static io.opentelemetry.api.trace.SpanKind.CONSUMER -import static io.opentelemetry.api.trace.SpanKind.PRODUCER - -class KafkaStreamsSuppressReceiveSpansTest extends KafkaStreamsBaseTest { - - def "test kafka produce and consume with streams in-between"() { - setup: - def config = new Properties() - config.putAll(producerProps(KafkaStreamsBaseTest.kafka.bootstrapServers)) - config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-application") - config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName()) - config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()) - - // CONFIGURE PROCESSOR - def builder - try { - // Different class names for test and latestDepTest. - builder = Class.forName("org.apache.kafka.streams.kstream.KStreamBuilder").newInstance() - } catch (ClassNotFoundException | NoClassDefFoundError e) { - builder = Class.forName("org.apache.kafka.streams.StreamsBuilder").newInstance() - } - KStream textLines = builder.stream(STREAM_PENDING) - def values = textLines - .mapValues(new ValueMapper() { - @Override - String apply(String textLine) { - Span.current().setAttribute("asdf", "testing") - return textLine.toLowerCase() - } - }) - - KafkaStreams streams - try { - // Different api for test and latestDepTest. - values.to(Serdes.Integer(), Serdes.String(), STREAM_PROCESSED) - streams = new KafkaStreams(builder, config) - } catch (MissingMethodException e) { - def producer = Class.forName("org.apache.kafka.streams.kstream.Produced") - .with(Serdes.Integer(), Serdes.String()) - values.to(STREAM_PROCESSED, producer) - streams = new KafkaStreams(builder.build(), config) - } - streams.start() - - when: - String greeting = "TESTING TESTING 123!" - KafkaStreamsBaseTest.producer.send(new ProducerRecord<>(STREAM_PENDING, 10, greeting)) - - then: - // check that the message was received - def records = KafkaStreamsBaseTest.consumer.poll(Duration.ofSeconds(10).toMillis()) - Headers receivedHeaders = null - for (record in records) { - Span.current().setAttribute("testing", 123) - - assert record.key() == 10 - assert record.value() == greeting.toLowerCase() - - if (receivedHeaders == null) { - receivedHeaders = record.headers() - } - } - - SpanData streamSendSpan - - assertTraces(1) { - trace(0, 4) { - // kafka-clients PRODUCER - span(0) { - name STREAM_PENDING + " publish" - kind PRODUCER - hasNoParent() - attributes { - "$MessagingIncubatingAttributes.MESSAGING_SYSTEM" "kafka" - "$MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME" STREAM_PENDING - "$MessagingIncubatingAttributes.MESSAGING_OPERATION" "publish" - "messaging.client_id" "producer-1" - "$MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID" String - "$MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET" 0 - "$MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_KEY" "10" - } - } - // kafka-stream CONSUMER - span(1) { - name STREAM_PENDING + " process" - kind CONSUMER - childOf span(0) - attributes { - "$MessagingIncubatingAttributes.MESSAGING_SYSTEM" "kafka" - "$MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME" STREAM_PENDING - "$MessagingIncubatingAttributes.MESSAGING_OPERATION" "process" - "messaging.client_id" { it.endsWith("consumer") } - "$MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE" Long - "$MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID" String - "$MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET" 0 - "$MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_KEY" "10" - "kafka.record.queue_time_ms" { it >= 0 } - "asdf" "testing" - if (Boolean.getBoolean("testLatestDeps")) { - "$MessagingIncubatingAttributes.MESSAGING_KAFKA_CONSUMER_GROUP" "test-application" - } - } - } - - streamSendSpan = span(2) - - // kafka-clients PRODUCER - span(2) { - name STREAM_PROCESSED + " publish" - kind PRODUCER - childOf span(1) - attributes { - "$MessagingIncubatingAttributes.MESSAGING_SYSTEM" "kafka" - "$MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME" STREAM_PROCESSED - "$MessagingIncubatingAttributes.MESSAGING_OPERATION" "publish" - "messaging.client_id" String - "$MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID" String - "$MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET" 0 - } - } - // kafka-clients CONSUMER process - span(3) { - name STREAM_PROCESSED + " process" - kind CONSUMER - childOf span(2) - attributes { - "$MessagingIncubatingAttributes.MESSAGING_SYSTEM" "kafka" - "$MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME" STREAM_PROCESSED - "$MessagingIncubatingAttributes.MESSAGING_OPERATION" "process" - "messaging.client_id" { it.startsWith("consumer") } - "$MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE" Long - "$MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID" String - "$MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET" 0 - "$MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_KEY" "10" - if (Boolean.getBoolean("testLatestDeps")) { - "$MessagingIncubatingAttributes.MESSAGING_KAFKA_CONSUMER_GROUP" "test" - } - "kafka.record.queue_time_ms" { it >= 0 } - "testing" 123 - } - } - } - } - - receivedHeaders.iterator().hasNext() - def traceparent = new String(receivedHeaders.headers("traceparent").iterator().next().value()) - Context context = W3CTraceContextPropagator.instance.extract(Context.root(), "", new TextMapGetter() { - @Override - Iterable keys(String carrier) { - return Collections.singleton("traceparent") - } - - @Override - String get(String carrier, String key) { - if (key == "traceparent") { - return traceparent - } - return null - } - }) - def spanContext = Span.fromContext(context).getSpanContext() - spanContext.traceId == streamSendSpan.traceId - spanContext.spanId == streamSendSpan.spanId - } -} diff --git a/instrumentation/kafka/kafka-streams-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsBaseTest.java b/instrumentation/kafka/kafka-streams-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsBaseTest.java new file mode 100644 index 000000000000..854fee4f10bd --- /dev/null +++ b/instrumentation/kafka/kafka-streams-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsBaseTest.java @@ -0,0 +1,183 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.kafkastreams; + +import static java.util.Arrays.asList; +import static java.util.Collections.singleton; + +import com.google.common.collect.ImmutableMap; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.propagation.TextMapGetter; +import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.kafka.KafkaContainer; +import org.testcontainers.utility.DockerImageName; + +abstract class KafkaStreamsBaseTest { + + @RegisterExtension + static final InstrumentationExtension testing = AgentInstrumentationExtension.create(); + + protected static final AttributeKey MESSAGING_CLIENT_ID = + AttributeKey.stringKey("messaging.client_id"); + protected static final String STREAM_PENDING = "test.pending"; + protected static final String STREAM_PROCESSED = "test.processed"; + + static KafkaContainer kafka; + static Producer producer; + static Consumer consumer; + static CountDownLatch consumerReady = new CountDownLatch(1); + + @BeforeAll + static void setup() throws ExecutionException, InterruptedException, TimeoutException { + kafka = + new KafkaContainer(DockerImageName.parse("apache/kafka:3.8.0")) + .withEnv("KAFKA_HEAP_OPTS", "-Xmx256m") + .waitingFor(Wait.forLogMessage(".*started \\(kafka.server.Kafka.*Server\\).*", 1)) + .withStartupTimeout(Duration.ofMinutes(1)); + kafka.start(); + + // create test topic + try (AdminClient adminClient = + AdminClient.create(ImmutableMap.of("bootstrap.servers", kafka.getBootstrapServers()))) { + adminClient + .createTopics( + asList( + new NewTopic(STREAM_PENDING, 1, (short) 1), + new NewTopic(STREAM_PROCESSED, 1, (short) 1))) + .all() + .get(10, TimeUnit.SECONDS); + } + + producer = new KafkaProducer<>(producerProps(kafka.getBootstrapServers())); + + Map consumerProps = + ImmutableMap.of( + "bootstrap.servers", + kafka.getBootstrapServers(), + "group.id", + "test", + "enable.auto.commit", + "true", + "auto.commit.interval.ms", + "10", + "session.timeout.ms", + "30000", + "key.deserializer", + IntegerDeserializer.class, + "value.deserializer", + StringDeserializer.class); + consumer = new KafkaConsumer<>(consumerProps); + consumer.subscribe( + singleton(STREAM_PROCESSED), + new ConsumerRebalanceListener() { + @Override + public void onPartitionsRevoked(Collection collection) {} + + @Override + public void onPartitionsAssigned(Collection collection) { + consumerReady.countDown(); + } + }); + } + + @AfterAll + static void cleanup() { + consumer.close(); + producer.close(); + kafka.stop(); + } + + static Map producerProps(String servers) { + // values copied from spring's KafkaTestUtils + return ImmutableMap.of( + "bootstrap.servers", + servers, + "retries", + 0, + "batch.size", + "16384", + "linger.ms", + 1, + "buffer.memory", + "33554432", + "key.serializer", + IntegerSerializer.class, + "value.serializer", + StringSerializer.class); + } + + // Kafka's eventual consistency behavior forces us to do a couple of empty poll() calls until it + // gets properly assigned a topic partition + @SuppressWarnings("PreferJavaTimeOverload") + static void awaitUntilConsumerIsReady() throws InterruptedException { + if (consumerReady.await(0, TimeUnit.SECONDS)) { + return; + } + for (int i = 0; i < 10; i++) { + consumer.poll(0); + if (consumerReady.await(1, TimeUnit.SECONDS)) { + break; + } + } + if (consumerReady.getCount() != 0) { + throw new AssertionError("Consumer wasn't assigned any partitions!"); + } + consumer.seekToBeginning(Collections.emptyList()); + } + + static Context getContext(Headers headers) { + String traceparent = + new String( + headers.headers("traceparent").iterator().next().value(), StandardCharsets.UTF_8); + return W3CTraceContextPropagator.getInstance() + .extract( + Context.root(), + "", + new TextMapGetter() { + @Override + public String get(String carrier, String key) { + if ("traceparent".equals(key)) { + return traceparent; + } + return null; + } + + @Override + public Iterable keys(String carrier) { + return Collections.singleton("traceparent"); + } + }); + } +} diff --git a/instrumentation/kafka/kafka-streams-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsDefaultTest.java b/instrumentation/kafka/kafka-streams-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsDefaultTest.java new file mode 100644 index 000000000000..46caa0a78c90 --- /dev/null +++ b/instrumentation/kafka/kafka-streams-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsDefaultTest.java @@ -0,0 +1,282 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.kafkastreams; + +import static io.opentelemetry.api.common.AttributeKey.longKey; +import static io.opentelemetry.api.common.AttributeKey.stringKey; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies; +import static java.util.Arrays.asList; +import static org.assertj.core.api.Assertions.assertThat; + +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanContext; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.instrumentation.testing.util.TelemetryDataUtil; +import io.opentelemetry.sdk.testing.assertj.AttributeAssertion; +import io.opentelemetry.sdk.trace.data.LinkData; +import io.opentelemetry.sdk.trace.data.SpanData; +import io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Locale; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.kstream.KStream; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +class KafkaStreamsDefaultTest extends KafkaStreamsBaseTest { + + @SuppressWarnings("deprecation") // using deprecated semconv + @DisplayName("test kafka produce and consume with streams in-between") + @Test + void testKafkaProduceAndConsumeWithStreamsInBetween() throws Exception { + Properties config = new Properties(); + config.putAll(producerProps(KafkaStreamsBaseTest.kafka.getBootstrapServers())); + config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-application"); + config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName()); + config.put( + StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + + // CONFIGURE PROCESSOR + KafkaStreamsReflectionUtil.StreamBuilder streamBuilder = + KafkaStreamsReflectionUtil.createBuilder(); + KStream textLines = streamBuilder.stream(STREAM_PENDING); + KStream values = + textLines.mapValues( + textLine -> { + Span.current().setAttribute("asdf", "testing"); + return textLine.toLowerCase(Locale.ROOT); + }); + + KafkaStreams streams = streamBuilder.createStreams(values, config, STREAM_PROCESSED); + streams.start(); + + String greeting = "TESTING TESTING 123!"; + KafkaStreamsBaseTest.producer.send(new ProducerRecord<>(STREAM_PENDING, 10, greeting)); + + awaitUntilConsumerIsReady(); + @SuppressWarnings("PreferJavaTimeOverload") + ConsumerRecords records = + KafkaStreamsBaseTest.consumer.poll(Duration.ofSeconds(10).toMillis()); + Headers receivedHeaders = null; + for (ConsumerRecord record : records) { + Span.current().setAttribute("testing", 123); + + assertThat(record.key()).isEqualTo(10); + assertThat(record.value()).isEqualTo(greeting.toLowerCase(Locale.ROOT)); + + if (receivedHeaders == null) { + receivedHeaders = record.headers(); + } + } + assertThat(receivedHeaders).isNotEmpty(); + SpanContext receivedContext = Span.fromContext(getContext(receivedHeaders)).getSpanContext(); + + AtomicReference producerPendingRef = new AtomicReference<>(); + AtomicReference producerProcessedRef = new AtomicReference<>(); + + // Add your assertTraces logic here + testing.waitAndAssertSortedTraces( + TelemetryDataUtil.orderByRootSpanName( + STREAM_PENDING + " publish", + STREAM_PENDING + " receive", + STREAM_PROCESSED + " receive"), + trace -> { + trace.hasSpansSatisfyingExactly( + // kafka-clients PRODUCER + span -> + span.hasName(STREAM_PENDING + " publish") + .hasKind(SpanKind.PRODUCER) + .hasNoParent() + .hasAttributesSatisfyingExactly( + equalTo( + MessagingIncubatingAttributes.MESSAGING_SYSTEM, + MessagingIncubatingAttributes.MessagingSystemIncubatingValues.KAFKA), + equalTo( + MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME, + STREAM_PENDING), + equalTo(MessagingIncubatingAttributes.MESSAGING_OPERATION, "publish"), + satisfies(MESSAGING_CLIENT_ID, k -> k.startsWith("producer")), + satisfies( + MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID, + k -> k.isInstanceOf(String.class)), + equalTo(MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET, 0), + equalTo( + MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_KEY, "10"))); + producerPendingRef.set(trace.getSpan(0)); + }, + trace -> { + trace.hasSpansSatisfyingExactly( + // kafka-clients CONSUMER receive + span -> { + List assertions = + new ArrayList<>( + asList( + equalTo( + MessagingIncubatingAttributes.MESSAGING_SYSTEM, + MessagingIncubatingAttributes.MessagingSystemIncubatingValues + .KAFKA), + equalTo( + MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME, + STREAM_PENDING), + equalTo(MessagingIncubatingAttributes.MESSAGING_OPERATION, "receive"), + satisfies(MESSAGING_CLIENT_ID, k -> k.endsWith("consumer")), + equalTo( + MessagingIncubatingAttributes.MESSAGING_BATCH_MESSAGE_COUNT, 1))); + if (Boolean.getBoolean("testLatestDeps")) { + assertions.add( + equalTo( + MessagingIncubatingAttributes.MESSAGING_KAFKA_CONSUMER_GROUP, + "test-application")); + } + span.hasName(STREAM_PENDING + " receive") + .hasKind(SpanKind.CONSUMER) + .hasNoParent() + .hasAttributesSatisfyingExactly(assertions); + }, + // kafka-stream CONSUMER + span -> { + List assertions = + new ArrayList<>( + asList( + equalTo( + MessagingIncubatingAttributes.MESSAGING_SYSTEM, + MessagingIncubatingAttributes.MessagingSystemIncubatingValues + .KAFKA), + equalTo( + MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME, + STREAM_PENDING), + equalTo(MessagingIncubatingAttributes.MESSAGING_OPERATION, "process"), + satisfies(MESSAGING_CLIENT_ID, k -> k.endsWith("consumer")), + satisfies( + MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE, + k -> k.isInstanceOf(Long.class)), + satisfies( + MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID, + k -> k.isInstanceOf(String.class)), + equalTo( + MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET, 0), + equalTo( + MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_KEY, "10"), + satisfies( + longKey("kafka.record.queue_time_ms"), + k -> k.isGreaterThanOrEqualTo(0)), + equalTo(stringKey("asdf"), "testing"))); + if (Boolean.getBoolean("testLatestDeps")) { + assertions.add( + equalTo( + MessagingIncubatingAttributes.MESSAGING_KAFKA_CONSUMER_GROUP, + "test-application")); + } + span.hasName(STREAM_PENDING + " process") + .hasKind(SpanKind.CONSUMER) + .hasParent(trace.getSpan(0)) + .hasLinks(LinkData.create(producerPendingRef.get().getSpanContext())) + .hasAttributesSatisfyingExactly(assertions); + }, + // kafka-clients PRODUCER + span -> + span.hasName(STREAM_PROCESSED + " publish") + .hasKind(SpanKind.PRODUCER) + .hasParent(trace.getSpan(1)) + .hasTraceId(receivedContext.getTraceId()) + .hasSpanId(receivedContext.getSpanId()) + .hasAttributesSatisfyingExactly( + equalTo( + MessagingIncubatingAttributes.MESSAGING_SYSTEM, + MessagingIncubatingAttributes.MessagingSystemIncubatingValues.KAFKA), + equalTo( + MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME, + STREAM_PROCESSED), + equalTo(MessagingIncubatingAttributes.MESSAGING_OPERATION, "publish"), + satisfies(MESSAGING_CLIENT_ID, k -> k.endsWith("producer")), + satisfies( + MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID, + k -> k.isInstanceOf(String.class)), + equalTo( + MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET, 0))); + + producerProcessedRef.set(trace.getSpan(2)); + }, + trace -> + trace.hasSpansSatisfyingExactly( + // kafka-clients CONSUMER receive + span -> { + List assertions = + new ArrayList<>( + asList( + equalTo( + MessagingIncubatingAttributes.MESSAGING_SYSTEM, + MessagingIncubatingAttributes.MessagingSystemIncubatingValues + .KAFKA), + equalTo( + MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME, + STREAM_PROCESSED), + equalTo(MessagingIncubatingAttributes.MESSAGING_OPERATION, "receive"), + satisfies(MESSAGING_CLIENT_ID, k -> k.startsWith("consumer")), + equalTo( + MessagingIncubatingAttributes.MESSAGING_BATCH_MESSAGE_COUNT, 1))); + if (Boolean.getBoolean("testLatestDeps")) { + assertions.add( + equalTo( + MessagingIncubatingAttributes.MESSAGING_KAFKA_CONSUMER_GROUP, "test")); + } + span.hasName(STREAM_PROCESSED + " receive") + .hasKind(SpanKind.CONSUMER) + .hasNoParent() + .hasAttributesSatisfyingExactly(assertions); + }, + // kafka-clients CONSUMER process + span -> { + List assertions = + new ArrayList<>( + asList( + equalTo( + MessagingIncubatingAttributes.MESSAGING_SYSTEM, + MessagingIncubatingAttributes.MessagingSystemIncubatingValues + .KAFKA), + equalTo( + MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME, + STREAM_PROCESSED), + equalTo(MessagingIncubatingAttributes.MESSAGING_OPERATION, "process"), + satisfies(MESSAGING_CLIENT_ID, k -> k.startsWith("consumer")), + satisfies( + MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE, + k -> k.isInstanceOf(Long.class)), + satisfies( + MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID, + k -> k.isInstanceOf(String.class)), + equalTo( + MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET, 0), + equalTo( + MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_KEY, "10"), + satisfies( + longKey("kafka.record.queue_time_ms"), + k -> k.isGreaterThanOrEqualTo(0)), + equalTo(longKey("testing"), 123))); + if (Boolean.getBoolean("testLatestDeps")) { + assertions.add( + equalTo( + MessagingIncubatingAttributes.MESSAGING_KAFKA_CONSUMER_GROUP, "test")); + } + span.hasName(STREAM_PROCESSED + " process") + .hasKind(SpanKind.CONSUMER) + .hasParent(trace.getSpan(0)) + .hasLinks(LinkData.create(producerProcessedRef.get().getSpanContext())) + .hasAttributesSatisfyingExactly(assertions); + })); + } +} diff --git a/instrumentation/kafka/kafka-streams-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsReflectionUtil.java b/instrumentation/kafka/kafka-streams-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsReflectionUtil.java new file mode 100644 index 000000000000..d4a1a5fd1039 --- /dev/null +++ b/instrumentation/kafka/kafka-streams-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsReflectionUtil.java @@ -0,0 +1,105 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.kafkastreams; + +import java.lang.reflect.Constructor; +import java.lang.reflect.Method; +import java.util.Properties; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.kstream.KStream; + +/** + * Kafka streams reflection util which is used to be compatible with different versions of kafka + * streams. + */ +class KafkaStreamsReflectionUtil { + + private KafkaStreamsReflectionUtil() {} + + static class StreamBuilder { + private final Object builder; + + StreamBuilder(Object builder) { + this.builder = builder; + } + + @SuppressWarnings("unchecked") + KStream stream(String topic) + throws Exception { // Different api for test and latestDepTest. + Method method; + Object[] arguments; + try { + // equivalent to: + // ((org.apache.kafka.streams.kstream.KStreamBuilder)builder).stream(STREAM_PENDING); + method = builder.getClass().getMethod("stream", String[].class); + String[] topics = new String[] {topic}; + arguments = new Object[] {topics}; + } catch (Exception exception) { + // equivalent to: + // ((org.apache.kafka.streams.StreamsBuilder)builder).stream(STREAM_PENDING); + method = builder.getClass().getMethod("stream", String.class); + arguments = new Object[] {topic}; + } + + return (KStream) method.invoke(builder, arguments); + } + + KafkaStreams createStreams(KStream values, Properties config, String topic) + throws Exception { + Constructor constructor; + // Different api for test and latestDepTest. + try { + // equivalent to: + // values.to(Serdes.Integer(), Serdes.String(), STREAM_PROCESSED); + // return new KafkaStreams(builder, config); + KStream.class + .getMethod("to", Serde.class, Serde.class, String.class) + .invoke(values, Serdes.Integer(), Serdes.String(), topic); + + Class topologyBuilderClass = + Class.forName("org.apache.kafka.streams.processor.TopologyBuilder"); + constructor = KafkaStreams.class.getConstructor(topologyBuilderClass, Properties.class); + } catch (Exception exception) { + constructor = null; + } + if (constructor != null) { + return (KafkaStreams) constructor.newInstance(builder, config); + } + + // equivalent to: + // Produced produced = Produced.with(Serdes.Integer(), Serdes.String()); + // values.to(STREAM_PROCESSED, produced); + // + // Topology topology = builder.build(); + // new KafkaStreams(topology, props); + Class producedClass = Class.forName("org.apache.kafka.streams.kstream.Produced"); + Method producedWith = producedClass.getMethod("with", Serde.class, Serde.class); + Object producer = producedWith.invoke(null, Serdes.Integer(), Serdes.String()); + + KStream.class.getMethod("to", String.class, producedClass).invoke(values, topic, producer); + + Object topology = builder.getClass().getMethod("build").invoke(builder); + + Class topologyClass = Class.forName("org.apache.kafka.streams.Topology"); + constructor = KafkaStreams.class.getConstructor(topologyClass, Properties.class); + + return (KafkaStreams) constructor.newInstance(topology, config); + } + } + + static StreamBuilder createBuilder() throws Exception { + Class builderClass; + try { + // Different class names for test and latestDepTest. + builderClass = Class.forName("org.apache.kafka.streams.kstream.KStreamBuilder"); + } catch (Exception e) { + builderClass = Class.forName("org.apache.kafka.streams.StreamsBuilder"); + } + return new StreamBuilder(builderClass.getConstructor().newInstance()); + } +} diff --git a/instrumentation/kafka/kafka-streams-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsSuppressReceiveSpansTest.java b/instrumentation/kafka/kafka-streams-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsSuppressReceiveSpansTest.java new file mode 100644 index 000000000000..ceb2b10c03c4 --- /dev/null +++ b/instrumentation/kafka/kafka-streams-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsSuppressReceiveSpansTest.java @@ -0,0 +1,212 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.kafkastreams; + +import static io.opentelemetry.api.common.AttributeKey.longKey; +import static io.opentelemetry.api.common.AttributeKey.stringKey; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies; +import static java.util.Arrays.asList; +import static org.assertj.core.api.Assertions.assertThat; + +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanContext; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.sdk.testing.assertj.AttributeAssertion; +import io.opentelemetry.sdk.trace.data.SpanData; +import io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Locale; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.kstream.KStream; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +class KafkaStreamsSuppressReceiveSpansTest extends KafkaStreamsBaseTest { + + @SuppressWarnings("deprecation") // using deprecated semconv + @DisplayName("test kafka produce and consume with streams in-between") + @Test + void testKafkaProduceAndConsumeWithStreamsInBetween() throws Exception { + Properties config = new Properties(); + config.putAll(producerProps(KafkaStreamsBaseTest.kafka.getBootstrapServers())); + config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-application"); + config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName()); + config.put( + StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + + // CONFIGURE PROCESSOR + KafkaStreamsReflectionUtil.StreamBuilder streamBuilder = + KafkaStreamsReflectionUtil.createBuilder(); + KStream textLines = streamBuilder.stream(STREAM_PENDING); + KStream values = + textLines.mapValues( + textLine -> { + Span.current().setAttribute("asdf", "testing"); + return textLine.toLowerCase(Locale.ROOT); + }); + + KafkaStreams streams = streamBuilder.createStreams(values, config, STREAM_PROCESSED); + streams.start(); + + String greeting = "TESTING TESTING 123!"; + KafkaStreamsBaseTest.producer.send(new ProducerRecord<>(STREAM_PENDING, 10, greeting)); + + // check that the message was received + @SuppressWarnings("PreferJavaTimeOverload") + ConsumerRecords records = + KafkaStreamsBaseTest.consumer.poll(Duration.ofSeconds(10).toMillis()); + Headers receivedHeaders = null; + for (ConsumerRecord record : records) { + Span.current().setAttribute("testing", 123); + + assertThat(record.key()).isEqualTo(10); + assertThat(record.value()).isEqualTo(greeting.toLowerCase(Locale.ROOT)); + + if (receivedHeaders == null) { + receivedHeaders = record.headers(); + } + } + assertThat(receivedHeaders).isNotEmpty(); + SpanContext receivedContext = Span.fromContext(getContext(receivedHeaders)).getSpanContext(); + + AtomicReference streamSendSpanRef = new AtomicReference<>(); + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + // kafka-clients PRODUCER + span -> + span.hasName(STREAM_PENDING + " publish") + .hasKind(SpanKind.PRODUCER) + .hasNoParent() + .hasAttributesSatisfyingExactly( + equalTo( + MessagingIncubatingAttributes.MESSAGING_SYSTEM, + MessagingIncubatingAttributes.MessagingSystemIncubatingValues + .KAFKA), + equalTo( + MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME, + STREAM_PENDING), + equalTo(MessagingIncubatingAttributes.MESSAGING_OPERATION, "publish"), + equalTo(MESSAGING_CLIENT_ID, "producer-1"), + satisfies( + MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID, + k -> k.isInstanceOf(String.class)), + equalTo( + MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET, 0), + equalTo( + MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_KEY, "10")), + // kafka-stream CONSUMER + span -> { + List assertions = + new ArrayList<>( + asList( + equalTo( + MessagingIncubatingAttributes.MESSAGING_SYSTEM, + MessagingIncubatingAttributes.MessagingSystemIncubatingValues + .KAFKA), + equalTo( + MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME, + STREAM_PENDING), + equalTo(MessagingIncubatingAttributes.MESSAGING_OPERATION, "process"), + satisfies(MESSAGING_CLIENT_ID, k -> k.endsWith("consumer")), + satisfies( + MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE, + k -> k.isInstanceOf(Long.class)), + satisfies( + MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID, + k -> k.isInstanceOf(String.class)), + equalTo( + MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET, 0), + equalTo( + MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_KEY, "10"), + satisfies( + longKey("kafka.record.queue_time_ms"), + k -> k.isGreaterThanOrEqualTo(0)), + equalTo(stringKey("asdf"), "testing"))); + if (Boolean.getBoolean("testLatestDeps")) { + assertions.add( + equalTo( + MessagingIncubatingAttributes.MESSAGING_KAFKA_CONSUMER_GROUP, + "test-application")); + } + span.hasName(STREAM_PENDING + " process") + .hasKind(SpanKind.CONSUMER) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly(assertions); + }, + // kafka-clients PRODUCER + span -> { + streamSendSpanRef.set(trace.getSpan(2)); + span.hasName(STREAM_PROCESSED + " publish") + .hasKind(SpanKind.PRODUCER) + .hasParent(trace.getSpan(1)) + .hasTraceId(receivedContext.getTraceId()) + .hasSpanId(receivedContext.getSpanId()) + .hasAttributesSatisfyingExactly( + equalTo( + MessagingIncubatingAttributes.MESSAGING_SYSTEM, + MessagingIncubatingAttributes.MessagingSystemIncubatingValues.KAFKA), + equalTo( + MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME, + STREAM_PROCESSED), + equalTo(MessagingIncubatingAttributes.MESSAGING_OPERATION, "publish"), + satisfies(MESSAGING_CLIENT_ID, k -> k.isInstanceOf(String.class)), + satisfies( + MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID, + k -> k.isInstanceOf(String.class)), + equalTo(MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET, 0)); + }, + // kafka-clients CONSUMER process + span -> { + List assertions = + new ArrayList<>( + asList( + equalTo( + MessagingIncubatingAttributes.MESSAGING_SYSTEM, + MessagingIncubatingAttributes.MessagingSystemIncubatingValues + .KAFKA), + equalTo( + MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME, + STREAM_PROCESSED), + equalTo(MessagingIncubatingAttributes.MESSAGING_OPERATION, "process"), + satisfies(MESSAGING_CLIENT_ID, k -> k.startsWith("consumer")), + satisfies( + MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE, + k -> k.isInstanceOf(Long.class)), + satisfies( + MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID, + k -> k.isInstanceOf(String.class)), + equalTo( + MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET, 0), + equalTo( + MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_KEY, "10"), + satisfies( + longKey("kafka.record.queue_time_ms"), + k -> k.isGreaterThanOrEqualTo(0)), + equalTo(longKey("testing"), 123))); + if (Boolean.getBoolean("testLatestDeps")) { + assertions.add( + equalTo( + MessagingIncubatingAttributes.MESSAGING_KAFKA_CONSUMER_GROUP, "test")); + } + span.hasName(STREAM_PROCESSED + " process") + .hasKind(SpanKind.CONSUMER) + .hasParent(trace.getSpan(2)) + .hasAttributesSatisfyingExactly(assertions); + })); + } +}