Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support rocketmq #1412

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions brave-bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,16 @@
<artifactId>brave-instrumentation-okhttp3</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>brave-instrumentation-rocketmq-client</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
JoeKerouac marked this conversation as resolved.
Show resolved Hide resolved
<artifactId>brave-instrumentation-rocketmq-clients</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>brave-instrumentation-rpc</artifactId>
Expand Down
1 change: 1 addition & 0 deletions instrumentation/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ Here's a brief overview of what's packaged here:
* [mysql8](mysql8/README.md) - Tracing MySQL v8 statement interceptor
* [netty-codec-http](netty-codec-http/README.md) - Tracing handler for [Netty](http://netty.io/) 4.x http servers
* [okhttp3](okhttp3/README.md) - Tracing decorators for [OkHttp](https://github.com/square/okhttp) 3.x
* [rocketmq-client](rocketmq-client/README.md) - Tracing decorators for RocketMQ producers and consumers.
* [servlet](servlet/README.md) - Tracing filter for Servlet 2.5+ (including Async)
* [spring-rabbit](spring-rabbit/README.md) - Tracing MessagePostProcessor and ListenerAdvice for [Spring Rabbit](https://spring.io/guides/gs/messaging-rabbitmq/)
* [spring-web](spring-web/README.md) - Tracing interceptor for [Spring RestTemplate](https://spring.io/guides/gs/consuming-rest/)
Expand Down
1 change: 1 addition & 0 deletions instrumentation/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
<module>mysql6</module>
<module>mysql8</module>
<module>okhttp3</module>
<module>rocketmq-client</module>
<module>rpc</module>
<module>servlet</module>
<module>servlet-jakarta</module>
Expand Down
110 changes: 110 additions & 0 deletions instrumentation/rocketmq-client/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
# brave-instrumentation-rocketmq-client

## Tracing for RocketMQ Client

This module provides instrumentation for RocketMQ based services.

## example

### producer

The key is to register our hook to the producer

```java
package brave.rocketmq.client;

import brave.Tracing;
import brave.messaging.MessagingRequest;
import brave.messaging.MessagingTracing;
import brave.sampler.SamplerFunction;
import brave.sampler.SamplerFunctions;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;

public class ProducerExample {

public static void main(String[] args) throws Exception {
// todo Replaced with actual tracing construct
Tracing tracing = Tracing.newBuilder().build();
SamplerFunction<MessagingRequest> producerSampler = SamplerFunctions.deferDecision();
RocketMQTracing producerTracing = RocketMQTracing.create(
MessagingTracing.newBuilder(tracing).producerSampler(producerSampler).build());

String topic = "testSend";
Message message = new Message(topic, "JoeKerouac", "hello".getBytes());
DefaultMQProducer producer = new DefaultMQProducer("testSend");
// todo This is the key, register the hook to the producer
producer.getDefaultMQProducerImpl()
.registerSendMessageHook(new SendMessageBraveHookImpl(producerTracing));
// Replace with actual address
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
producer.send(message);

producer.shutdown();
}
}

```

### consumer

wrap `org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently`
using `brave.rocketmq.client.RocketMQTracing.wrap(long, org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly)`,
or alternatively, wrap `org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly`
using `brave.rocketmq.client.RocketMQTracing.wrap(int, org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently)`;

```java
package brave.rocketmq.client;

import java.util.List;
import java.util.Optional;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import brave.Span;
import brave.Tracer;
import brave.Tracing;
import brave.messaging.MessagingRequest;
import brave.messaging.MessagingTracing;
import brave.rocketmq.client.RocketMQTracing;
import brave.sampler.SamplerFunction;
import brave.sampler.SamplerFunctions;

public class ProducerExample {

public static void main(String[] args) throws Exception {
// todo Replaced with actual tracing construct
Tracing tracing = Tracing.newBuilder().build();
SamplerFunction<MessagingRequest> producerSampler = SamplerFunctions.deferDecision();
RocketMQTracing rocketMQTracing = RocketMQTracing.create(
MessagingTracing.newBuilder(tracing).producerSampler(producerSampler).build());

String topic = "testPushConsumer";
String nameserverAddr = "127.0.0.1:9876";

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("testPushConsumer");
consumer.setNamesrvAddr(nameserverAddr);
consumer.subscribe(topic, "*");
MessageListenerConcurrently messageListenerConcurrently = rocketMQTracing.wrap(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
Span span =
Optional.ofNullable(Tracing.currentTracer()).map(Tracer::currentSpan).orElse(null);
// do something
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.registerMessageListener(messageListenerConcurrently);

consumer.start();
}

}

```

6 changes: 6 additions & 0 deletions instrumentation/rocketmq-client/bnd.bnd
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# We use brave.internal.Nullable, but it is not used at runtime.
Import-Package: \
!brave.internal*,\
*
Export-Package: \
brave.rocketmq.client
68 changes: 68 additions & 0 deletions instrumentation/rocketmq-client/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
<?xml version="1.0"?>
<!--

Copyright 2013-2024 The OpenZipkin Authors

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License
is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
or implied. See the License for the specific language governing permissions and limitations under
the License.

-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>io.zipkin.brave</groupId>
<artifactId>brave-instrumentation-parent</artifactId>
<version>6.0.1-SNAPSHOT</version>
</parent>

<artifactId>brave-instrumentation-rocketmq-client</artifactId>
<name>Brave Instrumentation: RocketMQ Client</name>

<properties>
<!-- Matches Export-Package in bnd.bnd -->
<module.name>brave.rocketmq.client</module.name>

<main.basedir>${project.basedir}/../..</main.basedir>

<rocketmq.version>5.1.4</rocketmq.version>

<maven-failsafe-plugin.argLine>--add-opens java.base/java.nio=ALL-UNNAMED</maven-failsafe-plugin.argLine>
</properties>

<dependencies>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>brave-instrumentation-messaging</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>${rocketmq.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>brave-tests</artifactId>
<scope>test</scope>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright 2013-2024 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package brave.rocketmq.client;

public class RocketMQTags {

public static final String TO_PREFIX = "To_";
public static final String FROM_PREFIX = "From_";
public static final String ROCKETMQ_SERVICE = "rocketmq";

// TODO: maybe like HttpTags.PATH if we support extended tags on first version
public static final String ROCKETMQ_TAGS = "rocketmq.tags";
public static final String ROCKETMQ_TOPIC = "rocketmq.topic";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

delete this file as we will only start with rocketmq.topic

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/*
* Copyright 2013-2024 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package brave.rocketmq.client;

import brave.Span;
import brave.Tracer;
import brave.Tracing;
import brave.messaging.MessagingRequest;
import brave.messaging.MessagingTracing;
import brave.propagation.Propagation;
import brave.propagation.TraceContext.Extractor;
import brave.propagation.TraceContext.Injector;
import brave.propagation.TraceContextOrSamplingFlags;
import brave.sampler.SamplerFunction;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;

import java.util.Map;

public class RocketMQTracing {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public class RocketMQTracing {
/**
* Use this class to decorate your RocketMQ consumer / producer and enable Tracing.
*
* @since 6.1
*/
public final class RocketMQTracing {


Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note this won't be public, and this is like jms, rabbit, etc.

Suggested change
static final String ROCKETMQ_TOPIC = "rocketmq.topic";

private static final long defaultSuspendCurrentQueueTimeMillis = 1000;
private static final int defaultDelayLevelWhenNextConsume = 0;

public static RocketMQTracing create(Tracing tracing) {
return new RocketMQTracing(MessagingTracing.create(tracing), RocketMQTags.ROCKETMQ_SERVICE);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't pass around this constant.. you'll see why, but it is simpler and less parameters

}

public static RocketMQTracing create(MessagingTracing messagingTracing) {
return new RocketMQTracing(messagingTracing, RocketMQTags.ROCKETMQ_SERVICE);
}

public static RocketMQTracing create(MessagingTracing messagingTracing,
String remoteServiceName) {
return new RocketMQTracing(messagingTracing, remoteServiceName);
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Main thing is to be similar to other code, look at jms and you'll see this.. adopt that and adapt it for rabbit. This makes it simpler for configuration tools who don't need to look for different conventions depending on the library.

  public static JmsTracing create(Tracing tracing) {
    return newBuilder(tracing).build();
  }

  public static JmsTracing create(MessagingTracing messagingTracing) {
    return newBuilder(messagingTracing).build();
  }

  public static Builder newBuilder(Tracing tracing) {
    return newBuilder(MessagingTracing.create(tracing));
  }

  public static Builder newBuilder(MessagingTracing messagingTracing) {
    return new Builder(messagingTracing);
  }

  public static final class Builder {
    final MessagingTracing messagingTracing;
    String remoteServiceName = "jms";

    Builder(MessagingTracing messagingTracing) {

final Tracing tracing;
final Tracer tracer;
final Extractor<TracingProducerRequest> producerExtractor;
final Extractor<TracingConsumerRequest> consumerExtractor;
final Injector<TracingProducerRequest> producerInjector;
final Injector<TracingConsumerRequest> consumerInjector;
final String[] traceIdHeaders;
final SamplerFunction<MessagingRequest> producerSampler, consumerSampler;
final String remoteServiceName;

RocketMQTracing(MessagingTracing messagingTracing,
String remoteServiceName) { // intentionally hidden constructor
this.tracing = messagingTracing.tracing();
this.tracer = tracing.tracer();
Propagation<String> propagation = messagingTracing.propagation();
this.producerExtractor = propagation.extractor(TracingProducerRequest.GETTER);
this.consumerExtractor = propagation.extractor(TracingConsumerRequest.GETTER);
this.producerInjector = propagation.injector(TracingProducerRequest.SETTER);
this.consumerInjector = propagation.injector(TracingConsumerRequest.SETTER);
this.producerSampler = messagingTracing.producerSampler();
this.consumerSampler = messagingTracing.consumerSampler();
this.remoteServiceName = remoteServiceName;

// We clear the trace ID headers, so that a stale consumer span is not preferred over current
// listener. We intentionally don't clear BaggagePropagation.allKeyNames as doing so will
// application fields "user_id" or "country_code"
this.traceIdHeaders = propagation.keys().toArray(new String[0]);
}

<R> TraceContextOrSamplingFlags extractAndClearTraceIdHeaders(Extractor<R> extractor,
R request,
Map<String, String> properties) {
TraceContextOrSamplingFlags extracted = extractor.extract(request);
// Clear any propagation keys present in the headers
if (extracted.samplingFlags() == null) { // then trace IDs were extracted
if (properties != null) {
clearTraceIdHeaders(properties);
}
}
return extracted;
}

/** Creates a potentially noop remote span representing this request. */
Span nextMessagingSpan(SamplerFunction<MessagingRequest> sampler, MessagingRequest request,
TraceContextOrSamplingFlags extracted) {
Boolean sampled = extracted.sampled();
// only recreate the context if the messaging sampler made a decision
if (sampled == null && (sampled = sampler.trySample(request)) != null) {
extracted = extracted.sampled(sampled);
}
return tracer.nextSpan(extracted);
}

// We can't just skip clearing headers we use because we might inject B3 single, yet have stale B3
// multi, or vice versa.
void clearTraceIdHeaders(Map<String, String> headers) {
for (String traceIDHeader : traceIdHeaders)
headers.remove(traceIDHeader);
}

public Tracing tracing() {
JoeKerouac marked this conversation as resolved.
Show resolved Hide resolved
return tracing;
}

public Tracer tracer() {
JoeKerouac marked this conversation as resolved.
Show resolved Hide resolved
return tracer;
}

public MessageListenerOrderly wrap(MessageListenerOrderly messageListenerOrderly) {
JoeKerouac marked this conversation as resolved.
Show resolved Hide resolved
return new TracingMessageListenerOrderly(defaultSuspendCurrentQueueTimeMillis, this, messageListenerOrderly);
}

public MessageListenerOrderly wrap(long suspendCurrentQueueTimeMillis, MessageListenerOrderly messageListenerOrderly) {
return new TracingMessageListenerOrderly(suspendCurrentQueueTimeMillis, this, messageListenerOrderly);
}

public MessageListenerConcurrently wrap(MessageListenerConcurrently messageListenerConcurrently) {
return new TracingMessageListenerConcurrently(defaultDelayLevelWhenNextConsume, this, messageListenerConcurrently);
}

public MessageListenerConcurrently wrap(int delayLevelWhenNextConsume, MessageListenerConcurrently messageListenerConcurrently) {
return new TracingMessageListenerConcurrently(delayLevelWhenNextConsume, this, messageListenerConcurrently);
}

public MessageListenerOrderly unwrap(MessageListenerOrderly messageListenerOrderly) {
JoeKerouac marked this conversation as resolved.
Show resolved Hide resolved
if (messageListenerOrderly instanceof TracingMessageListenerOrderly) {
return ((TracingMessageListenerOrderly)messageListenerOrderly).messageListenerOrderly;
}
return null;
}

public MessageListenerConcurrently unwrap(MessageListenerConcurrently messageListenerConcurrently) {
JoeKerouac marked this conversation as resolved.
Show resolved Hide resolved
if (messageListenerConcurrently instanceof TracingMessageListenerConcurrently) {
return ((TracingMessageListenerConcurrently)messageListenerConcurrently).messageListenerConcurrently;
}
return null;
}

}
Loading
Loading