From 35452f885521015abe8d7a62a932733f4b4f1f2b Mon Sep 17 00:00:00 2001 From: JoeKerouac Date: Thu, 12 Jan 2023 16:21:31 +0800 Subject: [PATCH 1/7] adds RocketMQTracing Signed-off-by: Adrian Cole --- brave-bom/pom.xml | 10 + instrumentation/README.md | 1 + instrumentation/pom.xml | 1 + instrumentation/rocketmq-client/README.md | 100 +++++++ instrumentation/rocketmq-client/bnd.bnd | 6 + instrumentation/rocketmq-client/pom.xml | 68 +++++ .../rocketmq/client/RocketMQTracing.java | 109 ++++++++ .../java/brave/rocketmq/client/SpanUtil.java | 49 ++++ .../brave/rocketmq/client/StringUtils.java | 26 ++ .../brave/rocketmq/client/TraceConstants.java | 26 ++ .../client/TracingConsumerRequest.java | 80 ++++++ .../TracingMessageListenerConcurrently.java | 71 +++++ .../client/TracingMessageListenerOrderly.java | 70 +++++ .../client/TracingProducerRequest.java | 79 ++++++ .../client/TracingSendMessageHook.java | 76 +++++ .../client/ITRocketMQTracingTest.java | 260 ++++++++++++++++++ .../rocketmq/client/RocketMQContainer.java | 66 +++++ .../src/test/resources/broker.conf | 3 + .../src/test/resources/log4j2.properties | 9 + .../src/test/resources/start.sh | 30 ++ 20 files changed, 1140 insertions(+) create mode 100644 instrumentation/rocketmq-client/README.md create mode 100644 instrumentation/rocketmq-client/bnd.bnd create mode 100644 instrumentation/rocketmq-client/pom.xml create mode 100644 instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/RocketMQTracing.java create mode 100644 instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/SpanUtil.java create mode 100644 instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/StringUtils.java create mode 100644 instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TraceConstants.java create mode 100644 instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TracingConsumerRequest.java create mode 100644 instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TracingMessageListenerConcurrently.java create mode 100644 instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TracingMessageListenerOrderly.java create mode 100644 instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TracingProducerRequest.java create mode 100644 instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TracingSendMessageHook.java create mode 100644 instrumentation/rocketmq-client/src/test/java/brave/rocketmq/client/ITRocketMQTracingTest.java create mode 100644 instrumentation/rocketmq-client/src/test/java/brave/rocketmq/client/RocketMQContainer.java create mode 100644 instrumentation/rocketmq-client/src/test/resources/broker.conf create mode 100644 instrumentation/rocketmq-client/src/test/resources/log4j2.properties create mode 100644 instrumentation/rocketmq-client/src/test/resources/start.sh diff --git a/brave-bom/pom.xml b/brave-bom/pom.xml index bba48e8cd9..4769371f91 100644 --- a/brave-bom/pom.xml +++ b/brave-bom/pom.xml @@ -226,6 +226,16 @@ brave-instrumentation-okhttp3 ${project.version} + + ${project.groupId} + brave-instrumentation-rocketmq-client + ${project.version} + + + ${project.groupId} + brave-instrumentation-rocketmq-clients + ${project.version} + ${project.groupId} brave-instrumentation-rpc diff --git a/instrumentation/README.md b/instrumentation/README.md index 8e76945ad6..ae07ffd68e 100644 --- a/instrumentation/README.md +++ b/instrumentation/README.md @@ -20,6 +20,7 @@ Here's a brief overview of what's packaged here: * [mysql8](mysql8/README.md) - Tracing MySQL v8 statement interceptor * [netty-codec-http](netty-codec-http/README.md) - Tracing handler for [Netty](http://netty.io/) 4.x http servers * [okhttp3](okhttp3/README.md) - Tracing decorators for [OkHttp](https://github.com/square/okhttp) 3.x +* [rocketmq-client](rocketmq-client/README.md) - Tracing decorators for RocketMQ producers and consumers. * [servlet](servlet/README.md) - Tracing filter for Servlet 2.5+ (including Async) * [spring-rabbit](spring-rabbit/README.md) - Tracing MessagePostProcessor and ListenerAdvice for [Spring Rabbit](https://spring.io/guides/gs/messaging-rabbitmq/) * [spring-web](spring-web/README.md) - Tracing interceptor for [Spring RestTemplate](https://spring.io/guides/gs/consuming-rest/) diff --git a/instrumentation/pom.xml b/instrumentation/pom.xml index c0e2f5a4d4..c2dd48809d 100644 --- a/instrumentation/pom.xml +++ b/instrumentation/pom.xml @@ -51,6 +51,7 @@ mysql6 mysql8 okhttp3 + rocketmq-client rpc servlet servlet-jakarta diff --git a/instrumentation/rocketmq-client/README.md b/instrumentation/rocketmq-client/README.md new file mode 100644 index 0000000000..c447f15103 --- /dev/null +++ b/instrumentation/rocketmq-client/README.md @@ -0,0 +1,100 @@ +# brave-instrumentation-rocketmq-client + +## Tracing for RocketMQ Client + +This module provides instrumentation for RocketMQ based services. + +## example + +### producer + +The key is to register our hook to the producer + +```java +package brave.rocketmq.client; + +import brave.Tracing; +import brave.messaging.MessagingRequest; +import brave.messaging.MessagingTracing; +import brave.sampler.SamplerFunction; +import brave.sampler.SamplerFunctions; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.common.message.Message; + +public class ProducerExample { + + public static void main(String[] args) throws Exception { + // todo Replaced with actual tracing construct + Tracing tracing = Tracing.newBuilder().build(); + SamplerFunction producerSampler = SamplerFunctions.deferDecision(); + RocketMQTracing producerTracing = RocketMQTracing.create( + MessagingTracing.newBuilder(tracing).producerSampler(producerSampler).build()); + + String topic = "testSend"; + Message message = new Message(topic, "JoeKerouac", "hello".getBytes()); + DefaultMQProducer producer = new DefaultMQProducer("testSend"); + // todo This is the key, register the hook to the producer + producer.getDefaultMQProducerImpl() + .registerSendMessageHook(new SendMessageBraveHookImpl(producerTracing)); + // Replace with actual address + producer.setNamesrvAddr("127.0.0.1:9876"); + producer.start(); + producer.send(message); + + producer.shutdown(); + } +} + +``` + +### consumer + +Replace `org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently` +with `brave.rocketmq.client.TracingMessageListenerConcurrently` +or `org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly` +with `brave.rocketmq.client.TracingMessageListenerOrderly`; + +```java +package brave.rocketmq.client; + +import brave.Span; +import brave.Tracer; +import brave.Tracing; +import brave.messaging.MessagingRequest; +import brave.messaging.MessagingTracing; +import brave.sampler.SamplerFunction; +import brave.sampler.SamplerFunctions; +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.common.message.MessageExt; + +import java.util.Optional; + +public class ProducerExample { + + public static void main(String[] args) throws Exception { + // todo Replaced with actual tracing construct + Tracing tracing = Tracing.newBuilder().build(); + SamplerFunction producerSampler = SamplerFunctions.deferDecision(); + RocketMQTracing producerTracing = RocketMQTracing.create( + MessagingTracing.newBuilder(tracing).producerSampler(producerSampler).build()); + + String topic = "testPushConsumer"; + String nameserverAddr = "127.0.0.1:9876"; + + DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("testPushConsumer"); + consumer.setNamesrvAddr(nameserverAddr); + consumer.subscribe(topic, "*"); + consumer.registerMessageListener(new TraceableMessageListenerConcurrently(0, producerTracing) { + @Override + protected void handleMessage(MessageExt messageExt) { + Span span = + Optional.ofNullable(Tracing.currentTracer()).map(Tracer::currentSpan).orElse(null); + // do something + } + }); + consumer.start(); + } +} + +``` + diff --git a/instrumentation/rocketmq-client/bnd.bnd b/instrumentation/rocketmq-client/bnd.bnd new file mode 100644 index 0000000000..ff1f9dad33 --- /dev/null +++ b/instrumentation/rocketmq-client/bnd.bnd @@ -0,0 +1,6 @@ +# We use brave.internal.Nullable, but it is not used at runtime. +Import-Package: \ + !brave.internal*,\ + * +Export-Package: \ + brave.rocketmq.client diff --git a/instrumentation/rocketmq-client/pom.xml b/instrumentation/rocketmq-client/pom.xml new file mode 100644 index 0000000000..3b62306596 --- /dev/null +++ b/instrumentation/rocketmq-client/pom.xml @@ -0,0 +1,68 @@ + + + + 4.0.0 + + io.zipkin.brave + brave-instrumentation-parent + 6.0.1-SNAPSHOT + + + brave-instrumentation-rocketmq-client + Brave Instrumentation: RocketMQ Client + + + + brave.rocketmq.client + + ${project.basedir}/../.. + + 4.7.0 + + --add-opens java.base/java.nio=ALL-UNNAMED + + + + + ${project.groupId} + brave-instrumentation-messaging + ${project.version} + + + org.apache.rocketmq + rocketmq-client + ${rocketmq.version} + provided + + + + ${project.groupId} + brave-tests + test + ${project.version} + + + + org.testcontainers + junit-jupiter + ${testcontainers.version} + test + + + diff --git a/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/RocketMQTracing.java b/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/RocketMQTracing.java new file mode 100644 index 0000000000..ca107c98d2 --- /dev/null +++ b/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/RocketMQTracing.java @@ -0,0 +1,109 @@ +/* + * Copyright 2013-2024 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package brave.rocketmq.client; + +import brave.Span; +import brave.Tracer; +import brave.Tracing; +import brave.messaging.MessagingRequest; +import brave.messaging.MessagingTracing; +import brave.propagation.Propagation; +import brave.propagation.TraceContext.Extractor; +import brave.propagation.TraceContext.Injector; +import brave.propagation.TraceContextOrSamplingFlags; +import brave.sampler.SamplerFunction; +import java.util.Map; + +public class RocketMQTracing { + public static RocketMQTracing create(Tracing tracing) { + return new RocketMQTracing(MessagingTracing.create(tracing), TraceConstants.ROCKETMQ_SERVICE); + } + + public static RocketMQTracing create(MessagingTracing messagingTracing) { + return new RocketMQTracing(messagingTracing, TraceConstants.ROCKETMQ_SERVICE); + } + + public static RocketMQTracing create(MessagingTracing messagingTracing, + String remoteServiceName) { + return new RocketMQTracing(messagingTracing, remoteServiceName); + } + + final Tracing tracing; + final Tracer tracer; + final Extractor producerExtractor; + final Extractor consumerExtractor; + final Injector producerInjector; + final Injector consumerInjector; + final String[] traceIdHeaders; + final SamplerFunction producerSampler, consumerSampler; + final String remoteServiceName; + + RocketMQTracing(MessagingTracing messagingTracing, + String remoteServiceName) { // intentionally hidden constructor + this.tracing = messagingTracing.tracing(); + this.tracer = tracing.tracer(); + Propagation propagation = messagingTracing.propagation(); + this.producerExtractor = propagation.extractor(TracingProducerRequest.GETTER); + this.consumerExtractor = propagation.extractor(TracingConsumerRequest.GETTER); + this.producerInjector = propagation.injector(TracingProducerRequest.SETTER); + this.consumerInjector = propagation.injector(TracingConsumerRequest.SETTER); + this.producerSampler = messagingTracing.producerSampler(); + this.consumerSampler = messagingTracing.consumerSampler(); + this.remoteServiceName = remoteServiceName; + + // We clear the trace ID headers, so that a stale consumer span is not preferred over current + // listener. We intentionally don't clear BaggagePropagation.allKeyNames as doing so will + // application fields "user_id" or "country_code" + this.traceIdHeaders = propagation.keys().toArray(new String[0]); + } + + TraceContextOrSamplingFlags extractAndClearTraceIdHeaders(Extractor extractor, + R request, + Map properties) { + TraceContextOrSamplingFlags extracted = extractor.extract(request); + // Clear any propagation keys present in the headers + if (extracted.samplingFlags() == null) { // then trace IDs were extracted + if (properties != null) { + clearTraceIdHeaders(properties); + } + } + return extracted; + } + + /** Creates a potentially noop remote span representing this request. */ + Span nextMessagingSpan(SamplerFunction sampler, MessagingRequest request, + TraceContextOrSamplingFlags extracted) { + Boolean sampled = extracted.sampled(); + // only recreate the context if the messaging sampler made a decision + if (sampled == null && (sampled = sampler.trySample(request)) != null) { + extracted = extracted.sampled(sampled); + } + return tracer.nextSpan(extracted); + } + + // We can't just skip clearing headers we use because we might inject B3 single, yet have stale B3 + // multi, or vice versa. + void clearTraceIdHeaders(Map headers) { + for (String traceIDHeader : traceIdHeaders) + headers.remove(traceIDHeader); + } + + public Tracing tracing() { + return tracing; + } + + public Tracer tracer() { + return tracer; + } +} diff --git a/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/SpanUtil.java b/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/SpanUtil.java new file mode 100644 index 0000000000..18fd080925 --- /dev/null +++ b/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/SpanUtil.java @@ -0,0 +1,49 @@ +/* + * Copyright 2013-2024 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package brave.rocketmq.client; + +import brave.Span; +import brave.Tracer; +import brave.messaging.MessagingRequest; +import brave.propagation.CurrentTraceContext; +import brave.propagation.TraceContext; +import brave.propagation.TraceContextOrSamplingFlags; +import brave.sampler.SamplerFunction; +import java.util.Map; + +class SpanUtil { + static Span createAndStartSpan(RocketMQTracing tracing, + TraceContext.Extractor extractor, SamplerFunction sampler, T request, + Map props) { + Tracer tracer = tracing.tracer; + CurrentTraceContext currentTraceContext = tracing.tracing.currentTraceContext(); + TraceContext traceContext = currentTraceContext.get(); + Span span; + + if (traceContext == null) { + TraceContextOrSamplingFlags extracted = + tracing.extractAndClearTraceIdHeaders(extractor, request, props); + span = tracing.nextMessagingSpan(sampler, request, extracted); + } else { + span = tracer.newChild(traceContext); + } + + span.kind(request.spanKind()); + span.remoteServiceName(tracing.remoteServiceName); + span.tag(TraceConstants.ROCKETMQ_TOPIC, request.channelName()); + long timestamp = tracing.tracing.clock(span.context()).currentTimeMicroseconds(); + span.start(timestamp); + return span; + } +} diff --git a/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/StringUtils.java b/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/StringUtils.java new file mode 100644 index 0000000000..9d03d1d3df --- /dev/null +++ b/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/StringUtils.java @@ -0,0 +1,26 @@ +/* + * Copyright 2013-2024 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package brave.rocketmq.client; + +class StringUtils { + + // TODO: we shouldn't add tags with empty values! + static String getOrEmpty(String obj) { + if (obj == null) { + return ""; + } else { + return obj; + } + } +} diff --git a/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TraceConstants.java b/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TraceConstants.java new file mode 100644 index 0000000000..fec6c3904d --- /dev/null +++ b/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TraceConstants.java @@ -0,0 +1,26 @@ +/* + * Copyright 2013-2024 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package brave.rocketmq.client; + +public class TraceConstants { + + public static final String TO_PREFIX = "To_"; + public static final String FROM_PREFIX = "From_"; + public static final String ROCKETMQ_SERVICE = "rocketmq"; + + // TODO: maybe like HttpTags.PATH if we support extended tags on first version + public static final String ROCKETMQ_TAGS = "rocketmq.tags"; + public static final String ROCKETMQ_TOPIC = "rocketmq.topic"; +} diff --git a/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TracingConsumerRequest.java b/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TracingConsumerRequest.java new file mode 100644 index 0000000000..b63886aab3 --- /dev/null +++ b/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TracingConsumerRequest.java @@ -0,0 +1,80 @@ +/* + * Copyright 2013-2024 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package brave.rocketmq.client; + +import brave.Span; +import brave.messaging.ConsumerRequest; +import brave.propagation.Propagation.RemoteGetter; +import brave.propagation.Propagation.RemoteSetter; +import org.apache.rocketmq.common.message.MessageExt; + +final class TracingConsumerRequest extends ConsumerRequest { + + static final RemoteGetter GETTER = + new RemoteGetter() { + @Override public Span.Kind spanKind() { + return Span.Kind.CONSUMER; + } + + @Override public String get(TracingConsumerRequest request, String name) { + return request.delegate.getUserProperty(name); + } + + @Override public String toString() { + return "Message::getUserProperty"; + } + }; + + static final RemoteSetter SETTER = + new RemoteSetter() { + @Override public Span.Kind spanKind() { + return Span.Kind.CONSUMER; + } + + @Override public void put(TracingConsumerRequest request, String name, String value) { + request.delegate.putUserProperty(name, value); + } + + @Override public String toString() { + return "Message::putUserProperty"; + } + }; + + final MessageExt delegate; + + TracingConsumerRequest(MessageExt delegate) { + if (delegate == null) throw new NullPointerException("delegate == null"); + this.delegate = delegate; + } + + @Override public MessageExt unwrap() { + return delegate; + } + + @Override public String operation() { + return "receive"; + } + + @Override public String channelKind() { + return "topic"; + } + + @Override public String channelName() { + return delegate.getTopic(); + } + + @Override public String messageId() { + return delegate.getMsgId(); + } +} diff --git a/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TracingMessageListenerConcurrently.java b/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TracingMessageListenerConcurrently.java new file mode 100644 index 0000000000..3e60625c8b --- /dev/null +++ b/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TracingMessageListenerConcurrently.java @@ -0,0 +1,71 @@ +/* + * Copyright 2013-2024 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package brave.rocketmq.client; + +import brave.Span; +import brave.Tracer; +import java.util.List; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.common.message.MessageExt; + +// TODO: I think we don't want to expose a custom class rather wrap in context and prove a user can +// do custom tagging via their own MessageListenerConcurrently. +// Maybe expose RocketMQTracing.messageListenerConcurrently() to wrap theirs or make spans default +// and not expose this. +public abstract class TracingMessageListenerConcurrently implements MessageListenerConcurrently { + + private final int delayLevelWhenNextConsume; + + private final RocketMQTracing tracing; + + public TracingMessageListenerConcurrently(int delayLevelWhenNextConsume, + RocketMQTracing tracing) { + this.delayLevelWhenNextConsume = delayLevelWhenNextConsume; + this.tracing = tracing; + } + + @Override + public final ConsumeConcurrentlyStatus consumeMessage(List msgs, + ConsumeConcurrentlyContext context) { + for (MessageExt msg : msgs) { + TracingConsumerRequest request = new TracingConsumerRequest(msg); + Span span = + SpanUtil.createAndStartSpan(tracing, tracing.consumerExtractor, tracing.consumerSampler, + request, msg.getProperties()); + span.name(TraceConstants.FROM_PREFIX + msg.getTopic()); + + ConsumeConcurrentlyStatus result; + try (Tracer.SpanInScope scope = tracing.tracer().withSpanInScope(span)) { + result = handleMessage(msg, context); + } catch (Exception e) { + context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume); + result = ConsumeConcurrentlyStatus.RECONSUME_LATER; + } finally { + long timestamp = tracing.tracing.clock(span.context()).currentTimeMicroseconds(); + span.finish(timestamp); + } + + if (result != ConsumeConcurrentlyStatus.CONSUME_SUCCESS) { + return result; + } + } + + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + } + + protected abstract ConsumeConcurrentlyStatus handleMessage(MessageExt messageExt, + ConsumeConcurrentlyContext context); +} diff --git a/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TracingMessageListenerOrderly.java b/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TracingMessageListenerOrderly.java new file mode 100644 index 0000000000..472ff7d6ee --- /dev/null +++ b/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TracingMessageListenerOrderly.java @@ -0,0 +1,70 @@ +/* + * Copyright 2013-2024 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package brave.rocketmq.client; + +import brave.Span; +import brave.Tracer; +import java.util.List; +import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; +import org.apache.rocketmq.common.message.MessageExt; + +// TODO: I think we don't want to expose a custom class rather wrap in context and prove a user can +// do custom tagging via their own MessageListenerOrderly. +// Maybe expose RocketMQTracing.messageListenerOrderly() to wrap theirs or make spans default +// and not expose this. +public abstract class TracingMessageListenerOrderly implements MessageListenerOrderly { + private final long suspendCurrentQueueTimeMillis; + private final RocketMQTracing tracing; + + public TracingMessageListenerOrderly(long suspendCurrentQueueTimeMillis, + RocketMQTracing tracing) { + this.suspendCurrentQueueTimeMillis = suspendCurrentQueueTimeMillis; + this.tracing = tracing; + } + + @Override + public final ConsumeOrderlyStatus consumeMessage(List msgs, + ConsumeOrderlyContext context) { + for (MessageExt msg : msgs) { + TracingConsumerRequest request = new TracingConsumerRequest(msg); + Span span = + SpanUtil.createAndStartSpan(tracing, tracing.consumerExtractor, tracing.consumerSampler, + request, msg.getProperties()); + span.name(TraceConstants.FROM_PREFIX + msg.getTopic()); + + ConsumeOrderlyStatus result; + try (Tracer.SpanInScope scope = tracing.tracer().withSpanInScope(span)) { + result = handleMessage(msg, context); + } catch (Exception e) { + span.error(e); + context.setSuspendCurrentQueueTimeMillis(suspendCurrentQueueTimeMillis); + result = ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; + } finally { + long timestamp = tracing.tracing.clock(span.context()).currentTimeMicroseconds(); + span.finish(timestamp); + } + + if (result != ConsumeOrderlyStatus.SUCCESS) { + return result; + } + } + + return ConsumeOrderlyStatus.SUCCESS; + } + + protected abstract ConsumeOrderlyStatus handleMessage(MessageExt messageExt, + ConsumeOrderlyContext context); +} diff --git a/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TracingProducerRequest.java b/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TracingProducerRequest.java new file mode 100644 index 0000000000..62695d7d4c --- /dev/null +++ b/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TracingProducerRequest.java @@ -0,0 +1,79 @@ +/* + * Copyright 2013-2024 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package brave.rocketmq.client; + +import brave.Span.Kind; +import brave.messaging.ProducerRequest; +import brave.propagation.Propagation.RemoteGetter; +import brave.propagation.Propagation.RemoteSetter; +import org.apache.rocketmq.common.message.Message; + +final class TracingProducerRequest extends ProducerRequest { + static final RemoteGetter GETTER = + new RemoteGetter() { + @Override public Kind spanKind() { + return Kind.PRODUCER; + } + + @Override public String get(TracingProducerRequest request, String name) { + return request.delegate.getUserProperty(name); + } + + @Override public String toString() { + return "Message::getUserProperty"; + } + }; + + static final RemoteSetter SETTER = + new RemoteSetter() { + @Override public Kind spanKind() { + return Kind.PRODUCER; + } + + @Override public void put(TracingProducerRequest request, String name, String value) { + request.delegate.putUserProperty(name, value); + } + + @Override public String toString() { + return "Message::putUserProperty"; + } + }; + + final Message delegate; + + TracingProducerRequest(Message delegate) { + if (delegate == null) throw new NullPointerException("delegate == null"); + this.delegate = delegate; + } + + @Override public Message unwrap() { + return delegate; + } + + @Override public String operation() { + return "send"; + } + + @Override public String channelKind() { + return "topic"; + } + + @Override public String channelName() { + return delegate.getTopic(); + } + + @Override public String messageId() { + return null; + } +} diff --git a/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TracingSendMessageHook.java b/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TracingSendMessageHook.java new file mode 100644 index 0000000000..a11e21a6e2 --- /dev/null +++ b/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TracingSendMessageHook.java @@ -0,0 +1,76 @@ +/* + * Copyright 2013-2024 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package brave.rocketmq.client; + +import brave.Span; +import org.apache.rocketmq.client.hook.SendMessageContext; +import org.apache.rocketmq.client.hook.SendMessageHook; +import org.apache.rocketmq.client.impl.CommunicationMode; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.common.message.Message; + +final class TracingSendMessageHook implements SendMessageHook { + final RocketMQTracing tracing; + + public TracingSendMessageHook(RocketMQTracing tracing) { + this.tracing = tracing; + } + + @Override + public String hookName() { + return "SendMessageBraveHook"; + } + + @Override + public void sendMessageBefore(SendMessageContext context) { + if (context == null || context.getMessage() == null) { + return; + } + + Message msg = context.getMessage(); + TracingProducerRequest request = new TracingProducerRequest(msg); + Span span = + SpanUtil.createAndStartSpan(tracing, tracing.producerExtractor, tracing.producerSampler, + request, + msg.getProperties()); + span.name(TraceConstants.TO_PREFIX + msg.getTopic()); + span.tag(TraceConstants.ROCKETMQ_TAGS, StringUtils.getOrEmpty(msg.getTags())); + context.setMqTraceContext(span); + tracing.producerInjector.inject(span.context(), request); + } + + @Override + public void sendMessageAfter(SendMessageContext context) { + if (context == null || context.getMessage() == null || context.getMqTraceContext() == null) { + return; + } + + SendResult sendResult = context.getSendResult(); + Span span = (Span) context.getMqTraceContext(); + TracingProducerRequest request = new TracingProducerRequest(context.getMessage()); + + long timestamp = tracing.tracing.clock(span.context()).currentTimeMicroseconds(); + if (sendResult == null) { + if (context.getCommunicationMode() == CommunicationMode.ASYNC) { + return; + } + span.finish(timestamp); + tracing.producerInjector.inject(span.context(), request); + return; + } + + tracing.producerInjector.inject(span.context(), request); + span.finish(timestamp); + } +} diff --git a/instrumentation/rocketmq-client/src/test/java/brave/rocketmq/client/ITRocketMQTracingTest.java b/instrumentation/rocketmq-client/src/test/java/brave/rocketmq/client/ITRocketMQTracingTest.java new file mode 100644 index 0000000000..7c749f322b --- /dev/null +++ b/instrumentation/rocketmq-client/src/test/java/brave/rocketmq/client/ITRocketMQTracingTest.java @@ -0,0 +1,260 @@ +/* + * Copyright 2013-2024 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package brave.rocketmq.client; + +import brave.Span; +import brave.Tracer; +import brave.Tracing; +import brave.handler.MutableSpan; +import brave.messaging.MessagingRequest; +import brave.messaging.MessagingTracing; +import brave.sampler.Sampler; +import brave.sampler.SamplerFunction; +import brave.sampler.SamplerFunctions; +import brave.test.ITRemote; +import brave.test.IntegrationTestSpanHandler; +import java.util.Optional; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.producer.SendCallback; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageExt; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +import static org.assertj.core.api.Assertions.assertThat; + +@Tag("docker") +@Testcontainers(disabledWithoutDocker = true) +@Timeout(60) +class ITRocketMQTracingTest extends ITRemote { + static final String TOPIC_PREFIX = "JoeKerouac_Test_"; + + @Container static RocketMQContainer rocketMQ = new RocketMQContainer(); + + IntegrationTestSpanHandler producerSpanHandler = new IntegrationTestSpanHandler(); + IntegrationTestSpanHandler consumerSpanHandler = new IntegrationTestSpanHandler(); + + SamplerFunction producerSampler = SamplerFunctions.deferDecision(); + SamplerFunction consumerSampler = SamplerFunctions.deferDecision(); + + RocketMQTracing producerTracing = + RocketMQTracing.create(MessagingTracing + .newBuilder( + tracingBuilder(Sampler.ALWAYS_SAMPLE).localServiceName("producer").clearSpanHandlers() + .addSpanHandler(producerSpanHandler).build()) + .producerSampler(r -> producerSampler.trySample(r)).build()); + + RocketMQTracing consumerTracing = + RocketMQTracing.create(MessagingTracing + .newBuilder( + tracingBuilder(Sampler.ALWAYS_SAMPLE).localServiceName("consumer").clearSpanHandlers() + .addSpanHandler(consumerSpanHandler).build()) + .consumerSampler(r -> consumerSampler.trySample(r)).build()); + + @Test void send() throws Exception { + String topic = TOPIC_PREFIX + "testSend"; + Message message = new Message(topic, "JoeKerouac", "hello".getBytes()); + DefaultMQProducer producer = new DefaultMQProducer("testSend"); + // TODO: what is this deprecated in favor of? + producer.getDefaultMQProducerImpl() + .registerSendMessageHook(new TracingSendMessageHook(producerTracing)); + producer.setNamesrvAddr(rocketMQ.getNamesrvAddr()); + producer.start(); + producer.send(message); + + producer.shutdown(); + + MutableSpan producerSpan = producerSpanHandler.takeRemoteSpan(Span.Kind.PRODUCER); + assertThat(producerSpan.parentId()).isNull(); + } + + @Test void sendOneway() throws Exception { + String topic = TOPIC_PREFIX + "testSendOneway"; + Message message = new Message(topic, "JoeKerouac", "hello".getBytes()); + DefaultMQProducer producer = new DefaultMQProducer("testSendOneway"); + producer.getDefaultMQProducerImpl() + .registerSendMessageHook(new TracingSendMessageHook(producerTracing)); + producer.setNamesrvAddr(rocketMQ.getNamesrvAddr()); + producer.start(); + producer.sendOneway(message); + + producer.shutdown(); + + MutableSpan producerSpan = producerSpanHandler.takeRemoteSpan(Span.Kind.PRODUCER); + assertThat(producerSpan.parentId()).isNull(); + } + + @Test void sendAsync() throws Exception { + String topic = TOPIC_PREFIX + "testSendAsync"; + Message message = new Message(topic, "JoeKerouac", "hello".getBytes()); + DefaultMQProducer producer = new DefaultMQProducer("testSendAsync"); + producer.getDefaultMQProducerImpl() + .registerSendMessageHook(new TracingSendMessageHook(producerTracing)); + producer.setNamesrvAddr(rocketMQ.getNamesrvAddr()); + producer.start(); + CountDownLatch latch = new CountDownLatch(1); + producer.send(message, new SendCallback() { + @Override public void onSuccess(SendResult sendResult) { + latch.countDown(); + } + + @Override public void onException(Throwable e) { + + } + }); + + assertThat(latch.await(3000, TimeUnit.MILLISECONDS)).isTrue(); + producer.shutdown(); + + MutableSpan producerSpan = producerSpanHandler.takeRemoteSpan(Span.Kind.PRODUCER); + assertThat(producerSpan.parentId()).isNull(); + } + + @Test void tracingMessageListenerConcurrently() throws Exception { + String topic = TOPIC_PREFIX + "tracingMessageListenerConcurrently"; + Message message = new Message(topic, "JoeKerouac", "hello".getBytes()); + String nameserverAddr = rocketMQ.getNamesrvAddr(); + DefaultMQProducer producer = new DefaultMQProducer("tracingMessageListenerConcurrently"); + producer.setNamesrvAddr(nameserverAddr); + producer.start(); + + DefaultMQPushConsumer consumer = + new DefaultMQPushConsumer("tracingMessageListenerConcurrently"); + consumer.setNamesrvAddr(nameserverAddr); + consumer.subscribe(topic, "*"); + CountDownLatch latch = new CountDownLatch(1); + AtomicReference reference = new AtomicReference<>(); + consumer.registerMessageListener(new TracingMessageListenerConcurrently(0, consumerTracing) { + @Override + protected ConsumeConcurrentlyStatus handleMessage(MessageExt messageExt, + ConsumeConcurrentlyContext context) { + Span span = + Optional.ofNullable(Tracing.currentTracer()).map(Tracer::currentSpan).orElse(null); + reference.set(span); + latch.countDown(); + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + } + }); + producer.send(message); + consumer.start(); + + boolean flag = latch.await(3000, TimeUnit.MILLISECONDS); + + producer.shutdown(); + consumer.shutdown(); + + assertThat(flag).isTrue(); + assertThat(reference.get()).isNotNull(); + + MutableSpan consumerSpan = consumerSpanHandler.takeRemoteSpan(Span.Kind.CONSUMER); + assertThat(consumerSpan.parentId()).isNull(); + } + + @Test void tracingMessageListenerOrderly() throws Exception { + String topic = TOPIC_PREFIX + "tracingMessageListenerOrderly"; + Message message = new Message(topic, "JoeKerouac", "hello".getBytes()); + String nameserverAddr = rocketMQ.getNamesrvAddr(); + DefaultMQProducer producer = new DefaultMQProducer("tracingMessageListenerOrderly"); + producer.setNamesrvAddr(nameserverAddr); + producer.start(); + + DefaultMQPushConsumer consumer = + new DefaultMQPushConsumer("tracingMessageListenerOrderly"); + consumer.setNamesrvAddr(nameserverAddr); + consumer.subscribe(topic, "*"); + CountDownLatch latch = new CountDownLatch(1); + AtomicReference reference = new AtomicReference<>(); + consumer.registerMessageListener(new TracingMessageListenerOrderly(0, consumerTracing) { + @Override + protected ConsumeOrderlyStatus handleMessage(MessageExt messageExt, + ConsumeOrderlyContext context) { + Span span = + Optional.ofNullable(Tracing.currentTracer()).map(Tracer::currentSpan).orElse(null); + reference.set(span); + latch.countDown(); + return ConsumeOrderlyStatus.SUCCESS; + } + }); + producer.send(message); + consumer.start(); + + boolean flag = latch.await(3000, TimeUnit.MILLISECONDS); + + producer.shutdown(); + consumer.shutdown(); + + assertThat(flag).isTrue(); + assertThat(reference.get()).isNotNull(); + + MutableSpan consumerSpan = consumerSpanHandler.takeRemoteSpan(Span.Kind.CONSUMER); + assertThat(consumerSpan.parentId()).isNull(); + } + + @Test void all() throws Exception { + String topic = TOPIC_PREFIX + "testAll"; + Message message = new Message(topic, "JoeKerouac", "hello".getBytes()); + String nameserverAddr = rocketMQ.getNamesrvAddr(); + DefaultMQProducer producer = new DefaultMQProducer("testAll"); + producer.getDefaultMQProducerImpl() + .registerSendMessageHook(new TracingSendMessageHook(producerTracing)); + producer.setNamesrvAddr(nameserverAddr); + producer.start(); + + DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("testAll"); + consumer.setNamesrvAddr(nameserverAddr); + consumer.subscribe(topic, "*"); + CountDownLatch latch = new CountDownLatch(1); + AtomicReference reference = new AtomicReference<>(); + consumer.registerMessageListener(new TracingMessageListenerOrderly(0, consumerTracing) { + @Override + protected ConsumeOrderlyStatus handleMessage(MessageExt messageExt, + ConsumeOrderlyContext context) { + Span span = + Optional.ofNullable(Tracing.currentTracer()).map(Tracer::currentSpan).orElse(null); + reference.set(span); + latch.countDown(); + return ConsumeOrderlyStatus.SUCCESS; + } + }); + + producer.send(message); + consumer.start(); + + boolean flag = latch.await(3000, TimeUnit.MILLISECONDS); + + producer.shutdown(); + consumer.shutdown(); + + assertThat(flag).isTrue(); + assertThat(reference.get()).isNotNull(); + + MutableSpan producerSpan = producerSpanHandler.takeRemoteSpan(Span.Kind.PRODUCER); + assertThat(producerSpan.parentId()).isNull(); + MutableSpan consumerSpan = consumerSpanHandler.takeRemoteSpan(Span.Kind.CONSUMER); + assertThat(consumerSpan.parentId()).isNotNull(); + assertChildOf(consumerSpan, producerSpan); + } +} diff --git a/instrumentation/rocketmq-client/src/test/java/brave/rocketmq/client/RocketMQContainer.java b/instrumentation/rocketmq-client/src/test/java/brave/rocketmq/client/RocketMQContainer.java new file mode 100644 index 0000000000..6fb01079d2 --- /dev/null +++ b/instrumentation/rocketmq-client/src/test/java/brave/rocketmq/client/RocketMQContainer.java @@ -0,0 +1,66 @@ +/* + * Copyright 2013-2024 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package brave.rocketmq.client; + +import java.io.File; +import java.net.URISyntaxException; +import java.net.URL; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import org.testcontainers.containers.BindMode; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.utility.DockerImageName; + +final class RocketMQContainer extends GenericContainer { + static final int NAMESERVER_PORT = 9876; + static final int BROKER_PORT = 10911; + + RocketMQContainer() { + super(DockerImageName.parse("apache/rocketmq:5.1.4")); + List portBindings = new ArrayList<>(); + portBindings.add(String.format("%d:%d", NAMESERVER_PORT, NAMESERVER_PORT)); + portBindings.add(String.format("%d:%d", BROKER_PORT, BROKER_PORT)); + setPortBindings(portBindings); + + // do not publish all ports + withCreateContainerCmdModifier(cmd -> { + if (cmd.getHostConfig() != null) { + cmd.getHostConfig().withPublishAllPorts(false); + } + }); + + setCommand("sh /start.sh"); + this.waitStrategy = + Wait.forLogMessage(".*boot success.*", 1).withStartupTimeout(Duration.ofSeconds(60)); + + mount("broker.conf"); + mount("start.sh"); + } + + private void mount(String fileName) { + URL confUrl = getClass().getClassLoader().getResource(fileName); + try { + withFileSystemBind(new File(confUrl.toURI()).getAbsolutePath(), "/" + fileName, + BindMode.READ_ONLY); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + } + + public String getNamesrvAddr() { + return getHost() + ":" + NAMESERVER_PORT; + } +} diff --git a/instrumentation/rocketmq-client/src/test/resources/broker.conf b/instrumentation/rocketmq-client/src/test/resources/broker.conf new file mode 100644 index 0000000000..4948380251 --- /dev/null +++ b/instrumentation/rocketmq-client/src/test/resources/broker.conf @@ -0,0 +1,3 @@ +brokerName=JoeKerouac-Test +brokerIP1=127.0.0.1 +autoCreateTopicEnable=true diff --git a/instrumentation/rocketmq-client/src/test/resources/log4j2.properties b/instrumentation/rocketmq-client/src/test/resources/log4j2.properties new file mode 100644 index 0000000000..e6988c4497 --- /dev/null +++ b/instrumentation/rocketmq-client/src/test/resources/log4j2.properties @@ -0,0 +1,9 @@ +appenders=console +appender.console.type=Console +appender.console.name=STDOUT +appender.console.layout.type=PatternLayout +appender.console.layout.pattern=%d{ABSOLUTE} %-5p [%t] %C{2} (%F:%L) - %m%n +rootLogger.level=warn +rootLogger.appenderRefs=stdout +rootLogger.appenderRef.stdout.ref=STDOUT + diff --git a/instrumentation/rocketmq-client/src/test/resources/start.sh b/instrumentation/rocketmq-client/src/test/resources/start.sh new file mode 100644 index 0000000000..0a0dba7b76 --- /dev/null +++ b/instrumentation/rocketmq-client/src/test/resources/start.sh @@ -0,0 +1,30 @@ +function check() { + for i in {1..10} + do + if grep -q "$1" $2; then + break + else + sleep 1 + fi + done +} + +cleanup() { + jps | grep -v Jps | awk '{print $1}' | xargs -I {} kill {} + exit 0 +} + +trap cleanup SIGINT SIGTERM + +sh /home/rocketmq/rocketmq-4.6.0/bin/mqnamesrv > ~/ns.log & +check "The Name Server boot success" ~/ns.log + +sh /home/rocketmq/rocketmq-4.6.0/bin/mqbroker -n 127.0.0.1:9876 -c /broker.conf > ~/broker.log & +check "boot success" ~/broker.log + +echo "boot success" + +while true +do + sleep 1 +done From d07e7a0d18ca68928f9685082455c48933aa9055 Mon Sep 17 00:00:00 2001 From: Adrian Cole Date: Thu, 1 Feb 2024 08:33:40 +0700 Subject: [PATCH 2/7] fix version and touch files to get 2024 license Signed-off-by: Adrian Cole --- instrumentation/rocketmq-client/pom.xml | 2 +- ...{TraceConstants.java => RocketMQTags.java} | 3 +-- .../rocketmq/client/RocketMQTracing.java | 4 +-- .../brave/rocketmq/client/StringUtils.java | 26 ------------------- .../client/TracingConsumerRequest.java | 6 ++--- .../TracingMessageListenerConcurrently.java | 7 ++--- .../client/TracingMessageListenerOrderly.java | 7 ++--- .../client/TracingProducerRequest.java | 1 + .../client/TracingSendMessageHook.java | 8 +++--- .../client/{SpanUtil.java => Util.java} | 13 ++++++++-- .../client/ITRocketMQTracingTest.java | 14 +++++----- .../rocketmq/client/RocketMQContainer.java | 4 +-- 12 files changed, 39 insertions(+), 56 deletions(-) rename instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/{TraceConstants.java => RocketMQTags.java} (97%) delete mode 100644 instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/StringUtils.java rename instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/{SpanUtil.java => Util.java} (87%) diff --git a/instrumentation/rocketmq-client/pom.xml b/instrumentation/rocketmq-client/pom.xml index 3b62306596..59535a04c5 100644 --- a/instrumentation/rocketmq-client/pom.xml +++ b/instrumentation/rocketmq-client/pom.xml @@ -33,7 +33,7 @@ ${project.basedir}/../.. - 4.7.0 + 5.1.4 --add-opens java.base/java.nio=ALL-UNNAMED diff --git a/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TraceConstants.java b/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/RocketMQTags.java similarity index 97% rename from instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TraceConstants.java rename to instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/RocketMQTags.java index fec6c3904d..20d51d825a 100644 --- a/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TraceConstants.java +++ b/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/RocketMQTags.java @@ -11,10 +11,9 @@ * or implied. See the License for the specific language governing permissions and limitations under * the License. */ - package brave.rocketmq.client; -public class TraceConstants { +public class RocketMQTags { public static final String TO_PREFIX = "To_"; public static final String FROM_PREFIX = "From_"; diff --git a/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/RocketMQTracing.java b/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/RocketMQTracing.java index ca107c98d2..d9a13e7405 100644 --- a/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/RocketMQTracing.java +++ b/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/RocketMQTracing.java @@ -27,11 +27,11 @@ public class RocketMQTracing { public static RocketMQTracing create(Tracing tracing) { - return new RocketMQTracing(MessagingTracing.create(tracing), TraceConstants.ROCKETMQ_SERVICE); + return new RocketMQTracing(MessagingTracing.create(tracing), RocketMQTags.ROCKETMQ_SERVICE); } public static RocketMQTracing create(MessagingTracing messagingTracing) { - return new RocketMQTracing(messagingTracing, TraceConstants.ROCKETMQ_SERVICE); + return new RocketMQTracing(messagingTracing, RocketMQTags.ROCKETMQ_SERVICE); } public static RocketMQTracing create(MessagingTracing messagingTracing, diff --git a/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/StringUtils.java b/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/StringUtils.java deleted file mode 100644 index 9d03d1d3df..0000000000 --- a/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/StringUtils.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Copyright 2013-2024 The OpenZipkin Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ -package brave.rocketmq.client; - -class StringUtils { - - // TODO: we shouldn't add tags with empty values! - static String getOrEmpty(String obj) { - if (obj == null) { - return ""; - } else { - return obj; - } - } -} diff --git a/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TracingConsumerRequest.java b/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TracingConsumerRequest.java index b63886aab3..708e94b29d 100644 --- a/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TracingConsumerRequest.java +++ b/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TracingConsumerRequest.java @@ -19,8 +19,8 @@ import brave.propagation.Propagation.RemoteSetter; import org.apache.rocketmq.common.message.MessageExt; +// intentionally not yet public until we add tag parsing functionality final class TracingConsumerRequest extends ConsumerRequest { - static final RemoteGetter GETTER = new RemoteGetter() { @Override public Span.Kind spanKind() { @@ -32,7 +32,7 @@ final class TracingConsumerRequest extends ConsumerRequest { } @Override public String toString() { - return "Message::getUserProperty"; + return "MessageExt::getUserProperty"; } }; @@ -47,7 +47,7 @@ final class TracingConsumerRequest extends ConsumerRequest { } @Override public String toString() { - return "Message::putUserProperty"; + return "MessageExt::putUserProperty"; } }; diff --git a/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TracingMessageListenerConcurrently.java b/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TracingMessageListenerConcurrently.java index 3e60625c8b..8ce15d24e0 100644 --- a/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TracingMessageListenerConcurrently.java +++ b/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TracingMessageListenerConcurrently.java @@ -15,6 +15,7 @@ import brave.Span; import brave.Tracer; +import brave.Tracer.SpanInScope; import java.util.List; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; @@ -43,12 +44,12 @@ public final ConsumeConcurrentlyStatus consumeMessage(List msgs, for (MessageExt msg : msgs) { TracingConsumerRequest request = new TracingConsumerRequest(msg); Span span = - SpanUtil.createAndStartSpan(tracing, tracing.consumerExtractor, tracing.consumerSampler, + Util.createAndStartSpan(tracing, tracing.consumerExtractor, tracing.consumerSampler, request, msg.getProperties()); - span.name(TraceConstants.FROM_PREFIX + msg.getTopic()); + span.name(RocketMQTags.FROM_PREFIX + msg.getTopic()); ConsumeConcurrentlyStatus result; - try (Tracer.SpanInScope scope = tracing.tracer().withSpanInScope(span)) { + try (SpanInScope scope = tracing.tracer().withSpanInScope(span)) { result = handleMessage(msg, context); } catch (Exception e) { context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume); diff --git a/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TracingMessageListenerOrderly.java b/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TracingMessageListenerOrderly.java index 472ff7d6ee..915baa46b5 100644 --- a/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TracingMessageListenerOrderly.java +++ b/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TracingMessageListenerOrderly.java @@ -15,6 +15,7 @@ import brave.Span; import brave.Tracer; +import brave.Tracer.SpanInScope; import java.util.List; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; @@ -41,12 +42,12 @@ public final ConsumeOrderlyStatus consumeMessage(List msgs, for (MessageExt msg : msgs) { TracingConsumerRequest request = new TracingConsumerRequest(msg); Span span = - SpanUtil.createAndStartSpan(tracing, tracing.consumerExtractor, tracing.consumerSampler, + Util.createAndStartSpan(tracing, tracing.consumerExtractor, tracing.consumerSampler, request, msg.getProperties()); - span.name(TraceConstants.FROM_PREFIX + msg.getTopic()); + span.name(RocketMQTags.FROM_PREFIX + msg.getTopic()); ConsumeOrderlyStatus result; - try (Tracer.SpanInScope scope = tracing.tracer().withSpanInScope(span)) { + try (SpanInScope scope = tracing.tracer().withSpanInScope(span)) { result = handleMessage(msg, context); } catch (Exception e) { span.error(e); diff --git a/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TracingProducerRequest.java b/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TracingProducerRequest.java index 62695d7d4c..1e0767741d 100644 --- a/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TracingProducerRequest.java +++ b/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TracingProducerRequest.java @@ -19,6 +19,7 @@ import brave.propagation.Propagation.RemoteSetter; import org.apache.rocketmq.common.message.Message; +// intentionally not yet public until we add tag parsing functionality final class TracingProducerRequest extends ProducerRequest { static final RemoteGetter GETTER = new RemoteGetter() { diff --git a/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TracingSendMessageHook.java b/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TracingSendMessageHook.java index a11e21a6e2..7258446d73 100644 --- a/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TracingSendMessageHook.java +++ b/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TracingSendMessageHook.java @@ -29,7 +29,7 @@ public TracingSendMessageHook(RocketMQTracing tracing) { @Override public String hookName() { - return "SendMessageBraveHook"; + return "TracingSendMessageHook"; } @Override @@ -41,11 +41,11 @@ public void sendMessageBefore(SendMessageContext context) { Message msg = context.getMessage(); TracingProducerRequest request = new TracingProducerRequest(msg); Span span = - SpanUtil.createAndStartSpan(tracing, tracing.producerExtractor, tracing.producerSampler, + Util.createAndStartSpan(tracing, tracing.producerExtractor, tracing.producerSampler, request, msg.getProperties()); - span.name(TraceConstants.TO_PREFIX + msg.getTopic()); - span.tag(TraceConstants.ROCKETMQ_TAGS, StringUtils.getOrEmpty(msg.getTags())); + span.name(RocketMQTags.TO_PREFIX + msg.getTopic()); + span.tag(RocketMQTags.ROCKETMQ_TAGS, Util.getOrEmpty(msg.getTags())); context.setMqTraceContext(span); tracing.producerInjector.inject(span.context(), request); } diff --git a/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/SpanUtil.java b/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/Util.java similarity index 87% rename from instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/SpanUtil.java rename to instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/Util.java index 18fd080925..4c50f13134 100644 --- a/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/SpanUtil.java +++ b/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/Util.java @@ -22,7 +22,7 @@ import brave.sampler.SamplerFunction; import java.util.Map; -class SpanUtil { +class Util { static Span createAndStartSpan(RocketMQTracing tracing, TraceContext.Extractor extractor, SamplerFunction sampler, T request, Map props) { @@ -41,9 +41,18 @@ static Span createAndStartSpan(RocketMQTracing trac span.kind(request.spanKind()); span.remoteServiceName(tracing.remoteServiceName); - span.tag(TraceConstants.ROCKETMQ_TOPIC, request.channelName()); + span.tag(RocketMQTags.ROCKETMQ_TOPIC, request.channelName()); long timestamp = tracing.tracing.clock(span.context()).currentTimeMicroseconds(); span.start(timestamp); return span; } + + // TODO: we shouldn't add tags with empty values! + static String getOrEmpty(String obj) { + if (obj == null) { + return ""; + } else { + return obj; + } + } } diff --git a/instrumentation/rocketmq-client/src/test/java/brave/rocketmq/client/ITRocketMQTracingTest.java b/instrumentation/rocketmq-client/src/test/java/brave/rocketmq/client/ITRocketMQTracingTest.java index 7c749f322b..28a22325ff 100644 --- a/instrumentation/rocketmq-client/src/test/java/brave/rocketmq/client/ITRocketMQTracingTest.java +++ b/instrumentation/rocketmq-client/src/test/java/brave/rocketmq/client/ITRocketMQTracingTest.java @@ -50,8 +50,6 @@ @Testcontainers(disabledWithoutDocker = true) @Timeout(60) class ITRocketMQTracingTest extends ITRemote { - static final String TOPIC_PREFIX = "JoeKerouac_Test_"; - @Container static RocketMQContainer rocketMQ = new RocketMQContainer(); IntegrationTestSpanHandler producerSpanHandler = new IntegrationTestSpanHandler(); @@ -75,7 +73,7 @@ class ITRocketMQTracingTest extends ITRemote { .consumerSampler(r -> consumerSampler.trySample(r)).build()); @Test void send() throws Exception { - String topic = TOPIC_PREFIX + "testSend"; + String topic = "testSend"; Message message = new Message(topic, "JoeKerouac", "hello".getBytes()); DefaultMQProducer producer = new DefaultMQProducer("testSend"); // TODO: what is this deprecated in favor of? @@ -92,7 +90,7 @@ class ITRocketMQTracingTest extends ITRemote { } @Test void sendOneway() throws Exception { - String topic = TOPIC_PREFIX + "testSendOneway"; + String topic = "testSendOneway"; Message message = new Message(topic, "JoeKerouac", "hello".getBytes()); DefaultMQProducer producer = new DefaultMQProducer("testSendOneway"); producer.getDefaultMQProducerImpl() @@ -108,7 +106,7 @@ class ITRocketMQTracingTest extends ITRemote { } @Test void sendAsync() throws Exception { - String topic = TOPIC_PREFIX + "testSendAsync"; + String topic = "testSendAsync"; Message message = new Message(topic, "JoeKerouac", "hello".getBytes()); DefaultMQProducer producer = new DefaultMQProducer("testSendAsync"); producer.getDefaultMQProducerImpl() @@ -134,7 +132,7 @@ class ITRocketMQTracingTest extends ITRemote { } @Test void tracingMessageListenerConcurrently() throws Exception { - String topic = TOPIC_PREFIX + "tracingMessageListenerConcurrently"; + String topic = "tracingMessageListenerConcurrently"; Message message = new Message(topic, "JoeKerouac", "hello".getBytes()); String nameserverAddr = rocketMQ.getNamesrvAddr(); DefaultMQProducer producer = new DefaultMQProducer("tracingMessageListenerConcurrently"); @@ -174,7 +172,7 @@ protected ConsumeConcurrentlyStatus handleMessage(MessageExt messageExt, } @Test void tracingMessageListenerOrderly() throws Exception { - String topic = TOPIC_PREFIX + "tracingMessageListenerOrderly"; + String topic = "tracingMessageListenerOrderly"; Message message = new Message(topic, "JoeKerouac", "hello".getBytes()); String nameserverAddr = rocketMQ.getNamesrvAddr(); DefaultMQProducer producer = new DefaultMQProducer("tracingMessageListenerOrderly"); @@ -214,7 +212,7 @@ protected ConsumeOrderlyStatus handleMessage(MessageExt messageExt, } @Test void all() throws Exception { - String topic = TOPIC_PREFIX + "testAll"; + String topic = "testAll"; Message message = new Message(topic, "JoeKerouac", "hello".getBytes()); String nameserverAddr = rocketMQ.getNamesrvAddr(); DefaultMQProducer producer = new DefaultMQProducer("testAll"); diff --git a/instrumentation/rocketmq-client/src/test/java/brave/rocketmq/client/RocketMQContainer.java b/instrumentation/rocketmq-client/src/test/java/brave/rocketmq/client/RocketMQContainer.java index 6fb01079d2..79f969574b 100644 --- a/instrumentation/rocketmq-client/src/test/java/brave/rocketmq/client/RocketMQContainer.java +++ b/instrumentation/rocketmq-client/src/test/java/brave/rocketmq/client/RocketMQContainer.java @@ -53,8 +53,8 @@ final class RocketMQContainer extends GenericContainer { private void mount(String fileName) { URL confUrl = getClass().getClassLoader().getResource(fileName); try { - withFileSystemBind(new File(confUrl.toURI()).getAbsolutePath(), "/" + fileName, - BindMode.READ_ONLY); + String confPath = new File(confUrl.toURI()).getAbsolutePath(); + withFileSystemBind(confPath, "/" + fileName, BindMode.READ_ONLY); } catch (URISyntaxException e) { throw new RuntimeException(e); } From 9e9d1ed3c95709f6b21433a210369e89ea494733 Mon Sep 17 00:00:00 2001 From: JoeKerouac Date: Sun, 4 Feb 2024 23:39:31 +0800 Subject: [PATCH 3/7] Downgrade the RocketMQ version from 5.1.4 to 4.6.0. --- instrumentation/rocketmq-client/pom.xml | 2 +- .../test/java/brave/rocketmq/client/RocketMQContainer.java | 4 ++-- instrumentation/rocketmq-client/src/test/resources/start.sh | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/instrumentation/rocketmq-client/pom.xml b/instrumentation/rocketmq-client/pom.xml index 59535a04c5..b21dcbd1c3 100644 --- a/instrumentation/rocketmq-client/pom.xml +++ b/instrumentation/rocketmq-client/pom.xml @@ -33,7 +33,7 @@ ${project.basedir}/../.. - 5.1.4 + 4.6.0 --add-opens java.base/java.nio=ALL-UNNAMED diff --git a/instrumentation/rocketmq-client/src/test/java/brave/rocketmq/client/RocketMQContainer.java b/instrumentation/rocketmq-client/src/test/java/brave/rocketmq/client/RocketMQContainer.java index 79f969574b..054860dabe 100644 --- a/instrumentation/rocketmq-client/src/test/java/brave/rocketmq/client/RocketMQContainer.java +++ b/instrumentation/rocketmq-client/src/test/java/brave/rocketmq/client/RocketMQContainer.java @@ -29,7 +29,7 @@ final class RocketMQContainer extends GenericContainer { static final int BROKER_PORT = 10911; RocketMQContainer() { - super(DockerImageName.parse("apache/rocketmq:5.1.4")); + super(DockerImageName.parse("apache/rocketmq:4.6.0")); List portBindings = new ArrayList<>(); portBindings.add(String.format("%d:%d", NAMESERVER_PORT, NAMESERVER_PORT)); portBindings.add(String.format("%d:%d", BROKER_PORT, BROKER_PORT)); @@ -44,7 +44,7 @@ final class RocketMQContainer extends GenericContainer { setCommand("sh /start.sh"); this.waitStrategy = - Wait.forLogMessage(".*boot success.*", 1).withStartupTimeout(Duration.ofSeconds(60)); + Wait.forLogMessage(".*--JoeKerouac--.*", 1).withStartupTimeout(Duration.ofSeconds(60)); mount("broker.conf"); mount("start.sh"); diff --git a/instrumentation/rocketmq-client/src/test/resources/start.sh b/instrumentation/rocketmq-client/src/test/resources/start.sh index 0a0dba7b76..09f65a0ab3 100644 --- a/instrumentation/rocketmq-client/src/test/resources/start.sh +++ b/instrumentation/rocketmq-client/src/test/resources/start.sh @@ -22,7 +22,7 @@ check "The Name Server boot success" ~/ns.log sh /home/rocketmq/rocketmq-4.6.0/bin/mqbroker -n 127.0.0.1:9876 -c /broker.conf > ~/broker.log & check "boot success" ~/broker.log -echo "boot success" +echo "--JoeKerouac--" while true do From 9c72c70dff3233b3b327bd908ef4f3bb58b1bb32 Mon Sep 17 00:00:00 2001 From: JoeKerouac Date: Tue, 6 Feb 2024 12:56:21 +0800 Subject: [PATCH 4/7] upgrade rocketMQ --- instrumentation/rocketmq-client/pom.xml | 2 +- .../brave/rocketmq/client/RocketMQContainer.java | 2 +- .../rocketmq-client/src/test/resources/start.sh | 15 +++++++++++++-- 3 files changed, 15 insertions(+), 4 deletions(-) diff --git a/instrumentation/rocketmq-client/pom.xml b/instrumentation/rocketmq-client/pom.xml index b21dcbd1c3..59535a04c5 100644 --- a/instrumentation/rocketmq-client/pom.xml +++ b/instrumentation/rocketmq-client/pom.xml @@ -33,7 +33,7 @@ ${project.basedir}/../.. - 4.6.0 + 5.1.4 --add-opens java.base/java.nio=ALL-UNNAMED diff --git a/instrumentation/rocketmq-client/src/test/java/brave/rocketmq/client/RocketMQContainer.java b/instrumentation/rocketmq-client/src/test/java/brave/rocketmq/client/RocketMQContainer.java index 054860dabe..5f13d0a55a 100644 --- a/instrumentation/rocketmq-client/src/test/java/brave/rocketmq/client/RocketMQContainer.java +++ b/instrumentation/rocketmq-client/src/test/java/brave/rocketmq/client/RocketMQContainer.java @@ -29,7 +29,7 @@ final class RocketMQContainer extends GenericContainer { static final int BROKER_PORT = 10911; RocketMQContainer() { - super(DockerImageName.parse("apache/rocketmq:4.6.0")); + super(DockerImageName.parse("apache/rocketmq:5.1.4")); List portBindings = new ArrayList<>(); portBindings.add(String.format("%d:%d", NAMESERVER_PORT, NAMESERVER_PORT)); portBindings.add(String.format("%d:%d", BROKER_PORT, BROKER_PORT)); diff --git a/instrumentation/rocketmq-client/src/test/resources/start.sh b/instrumentation/rocketmq-client/src/test/resources/start.sh index 09f65a0ab3..d1b11643eb 100644 --- a/instrumentation/rocketmq-client/src/test/resources/start.sh +++ b/instrumentation/rocketmq-client/src/test/resources/start.sh @@ -9,6 +9,10 @@ function check() { done } +function createTopic() { + sh /home/rocketmq/rocketmq-5.1.4/bin/mqadmin updateTopic -n 127.0.0.1:9876 -b 127.0.0.1:10911 -t $1 +} + cleanup() { jps | grep -v Jps | awk '{print $1}' | xargs -I {} kill {} exit 0 @@ -16,12 +20,19 @@ cleanup() { trap cleanup SIGINT SIGTERM -sh /home/rocketmq/rocketmq-4.6.0/bin/mqnamesrv > ~/ns.log & +sh /home/rocketmq/rocketmq-5.1.4/bin/mqnamesrv > ~/ns.log 2>&1 & check "The Name Server boot success" ~/ns.log -sh /home/rocketmq/rocketmq-4.6.0/bin/mqbroker -n 127.0.0.1:9876 -c /broker.conf > ~/broker.log & +sh /home/rocketmq/rocketmq-5.1.4/bin/mqbroker -n 127.0.0.1:9876 -c /broker.conf > ~/broker.log 2>&1 & check "boot success" ~/broker.log +createTopic testSend +createTopic testSendOneway +createTopic testSendAsync +createTopic tracingMessageListenerConcurrently +createTopic tracingMessageListenerOrderly +createTopic testAll + echo "--JoeKerouac--" while true From 054034ea68b4c095b5b0167dbd9c0616b7d2603f Mon Sep 17 00:00:00 2001 From: JoeKerouac Date: Tue, 6 Feb 2024 14:19:32 +0800 Subject: [PATCH 5/7] fix todo --- instrumentation/rocketmq-client/README.md | 32 ++++++++++------ .../rocketmq/client/RocketMQTracing.java | 38 +++++++++++++++++++ .../TracingMessageListenerConcurrently.java | 26 ++++++------- .../client/TracingMessageListenerOrderly.java | 25 ++++++------ .../client/TracingSendMessageHook.java | 4 +- .../main/java/brave/rocketmq/client/Util.java | 8 ---- .../client/ITRocketMQTracingTest.java | 22 ++++++----- 7 files changed, 97 insertions(+), 58 deletions(-) diff --git a/instrumentation/rocketmq-client/README.md b/instrumentation/rocketmq-client/README.md index c447f15103..b20c69a629 100644 --- a/instrumentation/rocketmq-client/README.md +++ b/instrumentation/rocketmq-client/README.md @@ -49,25 +49,31 @@ public class ProducerExample { ### consumer -Replace `org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently` -with `brave.rocketmq.client.TracingMessageListenerConcurrently` -or `org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly` -with `brave.rocketmq.client.TracingMessageListenerOrderly`; +wrap `org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently` +using `brave.rocketmq.client.RocketMQTracing.wrap(long, org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly)`, +or alternatively, wrap `org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly` +using `brave.rocketmq.client.RocketMQTracing.wrap(int, org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently)`; ```java package brave.rocketmq.client; +import java.util.List; +import java.util.Optional; + +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.common.message.MessageExt; + import brave.Span; import brave.Tracer; import brave.Tracing; import brave.messaging.MessagingRequest; import brave.messaging.MessagingTracing; +import brave.rocketmq.client.RocketMQTracing; import brave.sampler.SamplerFunction; import brave.sampler.SamplerFunctions; -import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; -import org.apache.rocketmq.common.message.MessageExt; - -import java.util.Optional; public class ProducerExample { @@ -75,7 +81,7 @@ public class ProducerExample { // todo Replaced with actual tracing construct Tracing tracing = Tracing.newBuilder().build(); SamplerFunction producerSampler = SamplerFunctions.deferDecision(); - RocketMQTracing producerTracing = RocketMQTracing.create( + RocketMQTracing rocketMQTracing = RocketMQTracing.create( MessagingTracing.newBuilder(tracing).producerSampler(producerSampler).build()); String topic = "testPushConsumer"; @@ -84,16 +90,20 @@ public class ProducerExample { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("testPushConsumer"); consumer.setNamesrvAddr(nameserverAddr); consumer.subscribe(topic, "*"); - consumer.registerMessageListener(new TraceableMessageListenerConcurrently(0, producerTracing) { + MessageListenerConcurrently messageListenerConcurrently = rocketMQTracing.wrap(new MessageListenerConcurrently() { @Override - protected void handleMessage(MessageExt messageExt) { + public ConsumeConcurrentlyStatus consumeMessage(List list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { Span span = Optional.ofNullable(Tracing.currentTracer()).map(Tracer::currentSpan).orElse(null); // do something + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); + consumer.registerMessageListener(messageListenerConcurrently); + consumer.start(); } + } ``` diff --git a/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/RocketMQTracing.java b/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/RocketMQTracing.java index d9a13e7405..d26d520013 100644 --- a/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/RocketMQTracing.java +++ b/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/RocketMQTracing.java @@ -23,9 +23,16 @@ import brave.propagation.TraceContext.Injector; import brave.propagation.TraceContextOrSamplingFlags; import brave.sampler.SamplerFunction; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; + import java.util.Map; public class RocketMQTracing { + + private static final long defaultSuspendCurrentQueueTimeMillis = 1000; + private static final int defaultDelayLevelWhenNextConsume = 0; + public static RocketMQTracing create(Tracing tracing) { return new RocketMQTracing(MessagingTracing.create(tracing), RocketMQTags.ROCKETMQ_SERVICE); } @@ -106,4 +113,35 @@ public Tracing tracing() { public Tracer tracer() { return tracer; } + + public MessageListenerOrderly wrap(MessageListenerOrderly messageListenerOrderly) { + return new TracingMessageListenerOrderly(defaultSuspendCurrentQueueTimeMillis, this, messageListenerOrderly); + } + + public MessageListenerOrderly wrap(long suspendCurrentQueueTimeMillis, MessageListenerOrderly messageListenerOrderly) { + return new TracingMessageListenerOrderly(suspendCurrentQueueTimeMillis, this, messageListenerOrderly); + } + + public MessageListenerConcurrently wrap(MessageListenerConcurrently messageListenerConcurrently) { + return new TracingMessageListenerConcurrently(defaultDelayLevelWhenNextConsume, this, messageListenerConcurrently); + } + + public MessageListenerConcurrently wrap(int delayLevelWhenNextConsume, MessageListenerConcurrently messageListenerConcurrently) { + return new TracingMessageListenerConcurrently(delayLevelWhenNextConsume, this, messageListenerConcurrently); + } + + public MessageListenerOrderly unwrap(MessageListenerOrderly messageListenerOrderly) { + if (messageListenerOrderly instanceof TracingMessageListenerOrderly) { + return ((TracingMessageListenerOrderly)messageListenerOrderly).messageListenerOrderly; + } + return null; + } + + public MessageListenerConcurrently unwrap(MessageListenerConcurrently messageListenerConcurrently) { + if (messageListenerConcurrently instanceof TracingMessageListenerConcurrently) { + return ((TracingMessageListenerConcurrently)messageListenerConcurrently).messageListenerConcurrently; + } + return null; + } + } diff --git a/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TracingMessageListenerConcurrently.java b/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TracingMessageListenerConcurrently.java index 8ce15d24e0..f47fd2770d 100644 --- a/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TracingMessageListenerConcurrently.java +++ b/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TracingMessageListenerConcurrently.java @@ -13,29 +13,28 @@ */ package brave.rocketmq.client; -import brave.Span; -import brave.Tracer; -import brave.Tracer.SpanInScope; +import java.util.Collections; import java.util.List; + import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.message.MessageExt; -// TODO: I think we don't want to expose a custom class rather wrap in context and prove a user can -// do custom tagging via their own MessageListenerConcurrently. -// Maybe expose RocketMQTracing.messageListenerConcurrently() to wrap theirs or make spans default -// and not expose this. -public abstract class TracingMessageListenerConcurrently implements MessageListenerConcurrently { +import brave.Span; +import brave.Tracer.SpanInScope; + +class TracingMessageListenerConcurrently implements MessageListenerConcurrently { private final int delayLevelWhenNextConsume; - private final RocketMQTracing tracing; + final MessageListenerConcurrently messageListenerConcurrently; - public TracingMessageListenerConcurrently(int delayLevelWhenNextConsume, - RocketMQTracing tracing) { + TracingMessageListenerConcurrently(int delayLevelWhenNextConsume, + RocketMQTracing tracing, MessageListenerConcurrently messageListenerConcurrently) { this.delayLevelWhenNextConsume = delayLevelWhenNextConsume; this.tracing = tracing; + this.messageListenerConcurrently = messageListenerConcurrently; } @Override @@ -50,7 +49,7 @@ public final ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyStatus result; try (SpanInScope scope = tracing.tracer().withSpanInScope(span)) { - result = handleMessage(msg, context); + result = messageListenerConcurrently.consumeMessage(Collections.singletonList(msg), context); } catch (Exception e) { context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume); result = ConsumeConcurrentlyStatus.RECONSUME_LATER; @@ -66,7 +65,4 @@ public final ConsumeConcurrentlyStatus consumeMessage(List msgs, return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } - - protected abstract ConsumeConcurrentlyStatus handleMessage(MessageExt messageExt, - ConsumeConcurrentlyContext context); } diff --git a/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TracingMessageListenerOrderly.java b/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TracingMessageListenerOrderly.java index 915baa46b5..48ea34a69f 100644 --- a/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TracingMessageListenerOrderly.java +++ b/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TracingMessageListenerOrderly.java @@ -13,27 +13,27 @@ */ package brave.rocketmq.client; -import brave.Span; -import brave.Tracer; -import brave.Tracer.SpanInScope; +import java.util.Collections; import java.util.List; + import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; import org.apache.rocketmq.common.message.MessageExt; -// TODO: I think we don't want to expose a custom class rather wrap in context and prove a user can -// do custom tagging via their own MessageListenerOrderly. -// Maybe expose RocketMQTracing.messageListenerOrderly() to wrap theirs or make spans default -// and not expose this. -public abstract class TracingMessageListenerOrderly implements MessageListenerOrderly { +import brave.Span; +import brave.Tracer.SpanInScope; + +class TracingMessageListenerOrderly implements MessageListenerOrderly { private final long suspendCurrentQueueTimeMillis; private final RocketMQTracing tracing; + final MessageListenerOrderly messageListenerOrderly; - public TracingMessageListenerOrderly(long suspendCurrentQueueTimeMillis, - RocketMQTracing tracing) { + TracingMessageListenerOrderly(long suspendCurrentQueueTimeMillis, + RocketMQTracing tracing, MessageListenerOrderly messageListenerOrderly) { this.suspendCurrentQueueTimeMillis = suspendCurrentQueueTimeMillis; this.tracing = tracing; + this.messageListenerOrderly = messageListenerOrderly; } @Override @@ -48,7 +48,7 @@ public final ConsumeOrderlyStatus consumeMessage(List msgs, ConsumeOrderlyStatus result; try (SpanInScope scope = tracing.tracer().withSpanInScope(span)) { - result = handleMessage(msg, context); + result = messageListenerOrderly.consumeMessage(Collections.singletonList(msg), context); } catch (Exception e) { span.error(e); context.setSuspendCurrentQueueTimeMillis(suspendCurrentQueueTimeMillis); @@ -65,7 +65,4 @@ public final ConsumeOrderlyStatus consumeMessage(List msgs, return ConsumeOrderlyStatus.SUCCESS; } - - protected abstract ConsumeOrderlyStatus handleMessage(MessageExt messageExt, - ConsumeOrderlyContext context); } diff --git a/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TracingSendMessageHook.java b/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TracingSendMessageHook.java index 7258446d73..a3f268038c 100644 --- a/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TracingSendMessageHook.java +++ b/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TracingSendMessageHook.java @@ -45,7 +45,9 @@ public void sendMessageBefore(SendMessageContext context) { request, msg.getProperties()); span.name(RocketMQTags.TO_PREFIX + msg.getTopic()); - span.tag(RocketMQTags.ROCKETMQ_TAGS, Util.getOrEmpty(msg.getTags())); + if (msg.getTags() != null && !msg.getTags().isEmpty()) { + span.tag(RocketMQTags.ROCKETMQ_TAGS, msg.getTags()); + } context.setMqTraceContext(span); tracing.producerInjector.inject(span.context(), request); } diff --git a/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/Util.java b/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/Util.java index 4c50f13134..0b98893e19 100644 --- a/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/Util.java +++ b/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/Util.java @@ -47,12 +47,4 @@ static Span createAndStartSpan(RocketMQTracing trac return span; } - // TODO: we shouldn't add tags with empty values! - static String getOrEmpty(String obj) { - if (obj == null) { - return ""; - } else { - return obj; - } - } } diff --git a/instrumentation/rocketmq-client/src/test/java/brave/rocketmq/client/ITRocketMQTracingTest.java b/instrumentation/rocketmq-client/src/test/java/brave/rocketmq/client/ITRocketMQTracingTest.java index 28a22325ff..1726c7c98a 100644 --- a/instrumentation/rocketmq-client/src/test/java/brave/rocketmq/client/ITRocketMQTracingTest.java +++ b/instrumentation/rocketmq-client/src/test/java/brave/rocketmq/client/ITRocketMQTracingTest.java @@ -24,6 +24,8 @@ import brave.sampler.SamplerFunctions; import brave.test.ITRemote; import brave.test.IntegrationTestSpanHandler; + +import java.util.List; import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -33,6 +35,8 @@ import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; @@ -145,10 +149,9 @@ class ITRocketMQTracingTest extends ITRemote { consumer.subscribe(topic, "*"); CountDownLatch latch = new CountDownLatch(1); AtomicReference reference = new AtomicReference<>(); - consumer.registerMessageListener(new TracingMessageListenerConcurrently(0, consumerTracing) { + MessageListenerConcurrently messageListenerConcurrently = consumerTracing.wrap(new MessageListenerConcurrently() { @Override - protected ConsumeConcurrentlyStatus handleMessage(MessageExt messageExt, - ConsumeConcurrentlyContext context) { + public ConsumeConcurrentlyStatus consumeMessage(List list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { Span span = Optional.ofNullable(Tracing.currentTracer()).map(Tracer::currentSpan).orElse(null); reference.set(span); @@ -156,6 +159,7 @@ protected ConsumeConcurrentlyStatus handleMessage(MessageExt messageExt, return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); + consumer.registerMessageListener(messageListenerConcurrently); producer.send(message); consumer.start(); @@ -185,10 +189,9 @@ protected ConsumeConcurrentlyStatus handleMessage(MessageExt messageExt, consumer.subscribe(topic, "*"); CountDownLatch latch = new CountDownLatch(1); AtomicReference reference = new AtomicReference<>(); - consumer.registerMessageListener(new TracingMessageListenerOrderly(0, consumerTracing) { + MessageListenerOrderly messageListenerOrderly = consumerTracing.wrap(new MessageListenerOrderly() { @Override - protected ConsumeOrderlyStatus handleMessage(MessageExt messageExt, - ConsumeOrderlyContext context) { + public ConsumeOrderlyStatus consumeMessage(List list, ConsumeOrderlyContext consumeOrderlyContext) { Span span = Optional.ofNullable(Tracing.currentTracer()).map(Tracer::currentSpan).orElse(null); reference.set(span); @@ -196,6 +199,7 @@ protected ConsumeOrderlyStatus handleMessage(MessageExt messageExt, return ConsumeOrderlyStatus.SUCCESS; } }); + consumer.registerMessageListener(messageListenerOrderly); producer.send(message); consumer.start(); @@ -226,10 +230,9 @@ protected ConsumeOrderlyStatus handleMessage(MessageExt messageExt, consumer.subscribe(topic, "*"); CountDownLatch latch = new CountDownLatch(1); AtomicReference reference = new AtomicReference<>(); - consumer.registerMessageListener(new TracingMessageListenerOrderly(0, consumerTracing) { + MessageListenerOrderly messageListenerOrderly = consumerTracing.wrap(new MessageListenerOrderly() { @Override - protected ConsumeOrderlyStatus handleMessage(MessageExt messageExt, - ConsumeOrderlyContext context) { + public ConsumeOrderlyStatus consumeMessage(List list, ConsumeOrderlyContext consumeOrderlyContext) { Span span = Optional.ofNullable(Tracing.currentTracer()).map(Tracer::currentSpan).orElse(null); reference.set(span); @@ -237,6 +240,7 @@ protected ConsumeOrderlyStatus handleMessage(MessageExt messageExt, return ConsumeOrderlyStatus.SUCCESS; } }); + consumer.registerMessageListener(messageListenerOrderly); producer.send(message); consumer.start(); From 13e3a091ed52ef42591f53e9c4694923a9bf274f Mon Sep 17 00:00:00 2001 From: JoeKerouac Date: Wed, 7 Feb 2024 11:04:21 +0800 Subject: [PATCH 6/7] fix test --- .../client/ITRocketMQTracingTest.java | 71 +++++-------------- 1 file changed, 16 insertions(+), 55 deletions(-) diff --git a/instrumentation/rocketmq-client/src/test/java/brave/rocketmq/client/ITRocketMQTracingTest.java b/instrumentation/rocketmq-client/src/test/java/brave/rocketmq/client/ITRocketMQTracingTest.java index 1726c7c98a..46ded2b2e9 100644 --- a/instrumentation/rocketmq-client/src/test/java/brave/rocketmq/client/ITRocketMQTracingTest.java +++ b/instrumentation/rocketmq-client/src/test/java/brave/rocketmq/client/ITRocketMQTracingTest.java @@ -13,23 +13,10 @@ */ package brave.rocketmq.client; -import brave.Span; -import brave.Tracer; -import brave.Tracing; -import brave.handler.MutableSpan; -import brave.messaging.MessagingRequest; -import brave.messaging.MessagingTracing; -import brave.sampler.Sampler; -import brave.sampler.SamplerFunction; -import brave.sampler.SamplerFunctions; -import brave.test.ITRemote; -import brave.test.IntegrationTestSpanHandler; +import static org.assertj.core.api.Assertions.assertThat; import java.util.List; -import java.util.Optional; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; + import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; @@ -48,7 +35,15 @@ import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; -import static org.assertj.core.api.Assertions.assertThat; +import brave.Span; +import brave.handler.MutableSpan; +import brave.messaging.MessagingRequest; +import brave.messaging.MessagingTracing; +import brave.sampler.Sampler; +import brave.sampler.SamplerFunction; +import brave.sampler.SamplerFunctions; +import brave.test.ITRemote; +import brave.test.IntegrationTestSpanHandler; @Tag("docker") @Testcontainers(disabledWithoutDocker = true) @@ -117,10 +112,8 @@ class ITRocketMQTracingTest extends ITRemote { .registerSendMessageHook(new TracingSendMessageHook(producerTracing)); producer.setNamesrvAddr(rocketMQ.getNamesrvAddr()); producer.start(); - CountDownLatch latch = new CountDownLatch(1); producer.send(message, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { - latch.countDown(); } @Override public void onException(Throwable e) { @@ -128,10 +121,8 @@ class ITRocketMQTracingTest extends ITRemote { } }); - assertThat(latch.await(3000, TimeUnit.MILLISECONDS)).isTrue(); - producer.shutdown(); - MutableSpan producerSpan = producerSpanHandler.takeRemoteSpan(Span.Kind.PRODUCER); + producer.shutdown(); assertThat(producerSpan.parentId()).isNull(); } @@ -147,15 +138,9 @@ class ITRocketMQTracingTest extends ITRemote { new DefaultMQPushConsumer("tracingMessageListenerConcurrently"); consumer.setNamesrvAddr(nameserverAddr); consumer.subscribe(topic, "*"); - CountDownLatch latch = new CountDownLatch(1); - AtomicReference reference = new AtomicReference<>(); MessageListenerConcurrently messageListenerConcurrently = consumerTracing.wrap(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { - Span span = - Optional.ofNullable(Tracing.currentTracer()).map(Tracer::currentSpan).orElse(null); - reference.set(span); - latch.countDown(); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); @@ -163,15 +148,11 @@ public ConsumeConcurrentlyStatus consumeMessage(List list, ConsumeCo producer.send(message); consumer.start(); - boolean flag = latch.await(3000, TimeUnit.MILLISECONDS); + MutableSpan consumerSpan = consumerSpanHandler.takeRemoteSpan(Span.Kind.CONSUMER); producer.shutdown(); consumer.shutdown(); - assertThat(flag).isTrue(); - assertThat(reference.get()).isNotNull(); - - MutableSpan consumerSpan = consumerSpanHandler.takeRemoteSpan(Span.Kind.CONSUMER); assertThat(consumerSpan.parentId()).isNull(); } @@ -187,15 +168,9 @@ public ConsumeConcurrentlyStatus consumeMessage(List list, ConsumeCo new DefaultMQPushConsumer("tracingMessageListenerOrderly"); consumer.setNamesrvAddr(nameserverAddr); consumer.subscribe(topic, "*"); - CountDownLatch latch = new CountDownLatch(1); - AtomicReference reference = new AtomicReference<>(); MessageListenerOrderly messageListenerOrderly = consumerTracing.wrap(new MessageListenerOrderly() { @Override public ConsumeOrderlyStatus consumeMessage(List list, ConsumeOrderlyContext consumeOrderlyContext) { - Span span = - Optional.ofNullable(Tracing.currentTracer()).map(Tracer::currentSpan).orElse(null); - reference.set(span); - latch.countDown(); return ConsumeOrderlyStatus.SUCCESS; } }); @@ -203,15 +178,11 @@ public ConsumeOrderlyStatus consumeMessage(List list, ConsumeOrderly producer.send(message); consumer.start(); - boolean flag = latch.await(3000, TimeUnit.MILLISECONDS); + MutableSpan consumerSpan = consumerSpanHandler.takeRemoteSpan(Span.Kind.CONSUMER); producer.shutdown(); consumer.shutdown(); - assertThat(flag).isTrue(); - assertThat(reference.get()).isNotNull(); - - MutableSpan consumerSpan = consumerSpanHandler.takeRemoteSpan(Span.Kind.CONSUMER); assertThat(consumerSpan.parentId()).isNull(); } @@ -228,15 +199,9 @@ public ConsumeOrderlyStatus consumeMessage(List list, ConsumeOrderly DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("testAll"); consumer.setNamesrvAddr(nameserverAddr); consumer.subscribe(topic, "*"); - CountDownLatch latch = new CountDownLatch(1); - AtomicReference reference = new AtomicReference<>(); MessageListenerOrderly messageListenerOrderly = consumerTracing.wrap(new MessageListenerOrderly() { @Override public ConsumeOrderlyStatus consumeMessage(List list, ConsumeOrderlyContext consumeOrderlyContext) { - Span span = - Optional.ofNullable(Tracing.currentTracer()).map(Tracer::currentSpan).orElse(null); - reference.set(span); - latch.countDown(); return ConsumeOrderlyStatus.SUCCESS; } }); @@ -245,17 +210,13 @@ public ConsumeOrderlyStatus consumeMessage(List list, ConsumeOrderly producer.send(message); consumer.start(); - boolean flag = latch.await(3000, TimeUnit.MILLISECONDS); + MutableSpan producerSpan = producerSpanHandler.takeRemoteSpan(Span.Kind.PRODUCER); + MutableSpan consumerSpan = consumerSpanHandler.takeRemoteSpan(Span.Kind.CONSUMER); producer.shutdown(); consumer.shutdown(); - assertThat(flag).isTrue(); - assertThat(reference.get()).isNotNull(); - - MutableSpan producerSpan = producerSpanHandler.takeRemoteSpan(Span.Kind.PRODUCER); assertThat(producerSpan.parentId()).isNull(); - MutableSpan consumerSpan = consumerSpanHandler.takeRemoteSpan(Span.Kind.CONSUMER); assertThat(consumerSpan.parentId()).isNotNull(); assertChildOf(consumerSpan, producerSpan); } From 98f6d8a4cb363aa1d1623f83e08056296321fda8 Mon Sep 17 00:00:00 2001 From: JoeKerouac Date: Mon, 26 Feb 2024 22:35:59 +0800 Subject: [PATCH 7/7] fix code review --- brave-bom/pom.xml | 5 -- .../rocketmq/client/RocketMQTracing.java | 60 +++++++------------ .../TracingMessageListenerConcurrently.java | 28 ++++----- .../client/TracingMessageListenerOrderly.java | 26 ++++---- .../client/TracingSendMessageHook.java | 5 +- .../main/java/brave/rocketmq/client/Util.java | 3 +- .../client/ITRocketMQTracingTest.java | 6 +- 7 files changed, 48 insertions(+), 85 deletions(-) diff --git a/brave-bom/pom.xml b/brave-bom/pom.xml index 4769371f91..565334d756 100644 --- a/brave-bom/pom.xml +++ b/brave-bom/pom.xml @@ -231,11 +231,6 @@ brave-instrumentation-rocketmq-client ${project.version} - - ${project.groupId} - brave-instrumentation-rocketmq-clients - ${project.version} - ${project.groupId} brave-instrumentation-rpc diff --git a/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/RocketMQTracing.java b/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/RocketMQTracing.java index d26d520013..c3249cca50 100644 --- a/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/RocketMQTracing.java +++ b/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/RocketMQTracing.java @@ -42,7 +42,7 @@ public static RocketMQTracing create(MessagingTracing messagingTracing) { } public static RocketMQTracing create(MessagingTracing messagingTracing, - String remoteServiceName) { + String remoteServiceName) { return new RocketMQTracing(messagingTracing, remoteServiceName); } @@ -57,7 +57,7 @@ public static RocketMQTracing create(MessagingTracing messagingTracing, final String remoteServiceName; RocketMQTracing(MessagingTracing messagingTracing, - String remoteServiceName) { // intentionally hidden constructor + String remoteServiceName) { // intentionally hidden constructor this.tracing = messagingTracing.tracing(); this.tracer = tracing.tracer(); Propagation propagation = messagingTracing.propagation(); @@ -76,8 +76,8 @@ public static RocketMQTracing create(MessagingTracing messagingTracing, } TraceContextOrSamplingFlags extractAndClearTraceIdHeaders(Extractor extractor, - R request, - Map properties) { + R request, + Map properties) { TraceContextOrSamplingFlags extracted = extractor.extract(request); // Clear any propagation keys present in the headers if (extracted.samplingFlags() == null) { // then trace IDs were extracted @@ -88,9 +88,11 @@ TraceContextOrSamplingFlags extractAndClearTraceIdHeaders(Extractor extra return extracted; } - /** Creates a potentially noop remote span representing this request. */ + /** + * Creates a potentially noop remote span representing this request. + */ Span nextMessagingSpan(SamplerFunction sampler, MessagingRequest request, - TraceContextOrSamplingFlags extracted) { + TraceContextOrSamplingFlags extracted) { Boolean sampled = extracted.sampled(); // only recreate the context if the messaging sampler made a decision if (sampled == null && (sampled = sampler.trySample(request)) != null) { @@ -106,42 +108,20 @@ void clearTraceIdHeaders(Map headers) { headers.remove(traceIDHeader); } - public Tracing tracing() { - return tracing; + /** + * Extracts or creates a {@link Span.Kind#CONSUMER} span for each message received. This span is + * injected onto each message so it becomes the parent when a processor later calls {@link Tracer#nextSpan(TraceContextOrSamplingFlags)}. + */ + public MessageListenerOrderly messageListenerOrderly(MessageListenerOrderly messageListenerOrderly) { + return new TracingMessageListenerOrderly(this, messageListenerOrderly); } - public Tracer tracer() { - return tracer; - } - - public MessageListenerOrderly wrap(MessageListenerOrderly messageListenerOrderly) { - return new TracingMessageListenerOrderly(defaultSuspendCurrentQueueTimeMillis, this, messageListenerOrderly); - } - - public MessageListenerOrderly wrap(long suspendCurrentQueueTimeMillis, MessageListenerOrderly messageListenerOrderly) { - return new TracingMessageListenerOrderly(suspendCurrentQueueTimeMillis, this, messageListenerOrderly); - } - - public MessageListenerConcurrently wrap(MessageListenerConcurrently messageListenerConcurrently) { - return new TracingMessageListenerConcurrently(defaultDelayLevelWhenNextConsume, this, messageListenerConcurrently); - } - - public MessageListenerConcurrently wrap(int delayLevelWhenNextConsume, MessageListenerConcurrently messageListenerConcurrently) { - return new TracingMessageListenerConcurrently(delayLevelWhenNextConsume, this, messageListenerConcurrently); - } - - public MessageListenerOrderly unwrap(MessageListenerOrderly messageListenerOrderly) { - if (messageListenerOrderly instanceof TracingMessageListenerOrderly) { - return ((TracingMessageListenerOrderly)messageListenerOrderly).messageListenerOrderly; - } - return null; - } - - public MessageListenerConcurrently unwrap(MessageListenerConcurrently messageListenerConcurrently) { - if (messageListenerConcurrently instanceof TracingMessageListenerConcurrently) { - return ((TracingMessageListenerConcurrently)messageListenerConcurrently).messageListenerConcurrently; - } - return null; + /** + * Extracts or creates a {@link Span.Kind#CONSUMER} span for each message received. This span is + * injected onto each message so it becomes the parent when a processor later calls {@link Tracer#nextSpan(TraceContextOrSamplingFlags)}. + */ + public MessageListenerConcurrently messageListenerConcurrently(MessageListenerConcurrently messageListenerConcurrently) { + return new TracingMessageListenerConcurrently(this, messageListenerConcurrently); } } diff --git a/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TracingMessageListenerConcurrently.java b/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TracingMessageListenerConcurrently.java index f47fd2770d..0012496b5d 100644 --- a/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TracingMessageListenerConcurrently.java +++ b/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TracingMessageListenerConcurrently.java @@ -13,33 +13,30 @@ */ package brave.rocketmq.client; -import java.util.Collections; -import java.util.List; - +import brave.Span; +import brave.Tracer.SpanInScope; +import brave.Tracing; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.message.MessageExt; -import brave.Span; -import brave.Tracer.SpanInScope; +import java.util.Collections; +import java.util.List; class TracingMessageListenerConcurrently implements MessageListenerConcurrently { - private final int delayLevelWhenNextConsume; private final RocketMQTracing tracing; final MessageListenerConcurrently messageListenerConcurrently; - TracingMessageListenerConcurrently(int delayLevelWhenNextConsume, - RocketMQTracing tracing, MessageListenerConcurrently messageListenerConcurrently) { - this.delayLevelWhenNextConsume = delayLevelWhenNextConsume; + TracingMessageListenerConcurrently(RocketMQTracing tracing, MessageListenerConcurrently messageListenerConcurrently) { this.tracing = tracing; this.messageListenerConcurrently = messageListenerConcurrently; } @Override public final ConsumeConcurrentlyStatus consumeMessage(List msgs, - ConsumeConcurrentlyContext context) { + ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { TracingConsumerRequest request = new TracingConsumerRequest(msg); Span span = @@ -48,14 +45,13 @@ public final ConsumeConcurrentlyStatus consumeMessage(List msgs, span.name(RocketMQTags.FROM_PREFIX + msg.getTopic()); ConsumeConcurrentlyStatus result; - try (SpanInScope scope = tracing.tracer().withSpanInScope(span)) { + try (SpanInScope scope = tracing.tracer.withSpanInScope(span)) { result = messageListenerConcurrently.consumeMessage(Collections.singletonList(msg), context); - } catch (Exception e) { - context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume); - result = ConsumeConcurrentlyStatus.RECONSUME_LATER; + } catch (Throwable e) { + span.error(e); + throw e; } finally { - long timestamp = tracing.tracing.clock(span.context()).currentTimeMicroseconds(); - span.finish(timestamp); + span.finish(); } if (result != ConsumeConcurrentlyStatus.CONSUME_SUCCESS) { diff --git a/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TracingMessageListenerOrderly.java b/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TracingMessageListenerOrderly.java index 48ea34a69f..884c6ce8a7 100644 --- a/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TracingMessageListenerOrderly.java +++ b/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TracingMessageListenerOrderly.java @@ -13,32 +13,28 @@ */ package brave.rocketmq.client; -import java.util.Collections; -import java.util.List; - +import brave.Span; +import brave.Tracer.SpanInScope; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; import org.apache.rocketmq.common.message.MessageExt; -import brave.Span; -import brave.Tracer.SpanInScope; +import java.util.Collections; +import java.util.List; class TracingMessageListenerOrderly implements MessageListenerOrderly { - private final long suspendCurrentQueueTimeMillis; private final RocketMQTracing tracing; final MessageListenerOrderly messageListenerOrderly; - TracingMessageListenerOrderly(long suspendCurrentQueueTimeMillis, - RocketMQTracing tracing, MessageListenerOrderly messageListenerOrderly) { - this.suspendCurrentQueueTimeMillis = suspendCurrentQueueTimeMillis; + TracingMessageListenerOrderly(RocketMQTracing tracing, MessageListenerOrderly messageListenerOrderly) { this.tracing = tracing; this.messageListenerOrderly = messageListenerOrderly; } @Override public final ConsumeOrderlyStatus consumeMessage(List msgs, - ConsumeOrderlyContext context) { + ConsumeOrderlyContext context) { for (MessageExt msg : msgs) { TracingConsumerRequest request = new TracingConsumerRequest(msg); Span span = @@ -47,15 +43,13 @@ public final ConsumeOrderlyStatus consumeMessage(List msgs, span.name(RocketMQTags.FROM_PREFIX + msg.getTopic()); ConsumeOrderlyStatus result; - try (SpanInScope scope = tracing.tracer().withSpanInScope(span)) { + try (SpanInScope scope = tracing.tracer.withSpanInScope(span)) { result = messageListenerOrderly.consumeMessage(Collections.singletonList(msg), context); - } catch (Exception e) { + } catch (Throwable e) { span.error(e); - context.setSuspendCurrentQueueTimeMillis(suspendCurrentQueueTimeMillis); - result = ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; + throw e; } finally { - long timestamp = tracing.tracing.clock(span.context()).currentTimeMicroseconds(); - span.finish(timestamp); + span.finish(); } if (result != ConsumeOrderlyStatus.SUCCESS) { diff --git a/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TracingSendMessageHook.java b/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TracingSendMessageHook.java index a3f268038c..1de6d7bb1d 100644 --- a/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TracingSendMessageHook.java +++ b/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TracingSendMessageHook.java @@ -62,17 +62,16 @@ public void sendMessageAfter(SendMessageContext context) { Span span = (Span) context.getMqTraceContext(); TracingProducerRequest request = new TracingProducerRequest(context.getMessage()); - long timestamp = tracing.tracing.clock(span.context()).currentTimeMicroseconds(); if (sendResult == null) { if (context.getCommunicationMode() == CommunicationMode.ASYNC) { return; } - span.finish(timestamp); + span.finish(); tracing.producerInjector.inject(span.context(), request); return; } tracing.producerInjector.inject(span.context(), request); - span.finish(timestamp); + span.finish(); } } diff --git a/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/Util.java b/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/Util.java index 0b98893e19..ed4476051a 100644 --- a/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/Util.java +++ b/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/Util.java @@ -42,8 +42,7 @@ static Span createAndStartSpan(RocketMQTracing trac span.kind(request.spanKind()); span.remoteServiceName(tracing.remoteServiceName); span.tag(RocketMQTags.ROCKETMQ_TOPIC, request.channelName()); - long timestamp = tracing.tracing.clock(span.context()).currentTimeMicroseconds(); - span.start(timestamp); + span.start(); return span; } diff --git a/instrumentation/rocketmq-client/src/test/java/brave/rocketmq/client/ITRocketMQTracingTest.java b/instrumentation/rocketmq-client/src/test/java/brave/rocketmq/client/ITRocketMQTracingTest.java index 46ded2b2e9..235f71e5a6 100644 --- a/instrumentation/rocketmq-client/src/test/java/brave/rocketmq/client/ITRocketMQTracingTest.java +++ b/instrumentation/rocketmq-client/src/test/java/brave/rocketmq/client/ITRocketMQTracingTest.java @@ -138,7 +138,7 @@ class ITRocketMQTracingTest extends ITRemote { new DefaultMQPushConsumer("tracingMessageListenerConcurrently"); consumer.setNamesrvAddr(nameserverAddr); consumer.subscribe(topic, "*"); - MessageListenerConcurrently messageListenerConcurrently = consumerTracing.wrap(new MessageListenerConcurrently() { + MessageListenerConcurrently messageListenerConcurrently = consumerTracing.messageListenerConcurrently(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; @@ -168,7 +168,7 @@ public ConsumeConcurrentlyStatus consumeMessage(List list, ConsumeCo new DefaultMQPushConsumer("tracingMessageListenerOrderly"); consumer.setNamesrvAddr(nameserverAddr); consumer.subscribe(topic, "*"); - MessageListenerOrderly messageListenerOrderly = consumerTracing.wrap(new MessageListenerOrderly() { + MessageListenerOrderly messageListenerOrderly = consumerTracing.messageListenerOrderly(new MessageListenerOrderly() { @Override public ConsumeOrderlyStatus consumeMessage(List list, ConsumeOrderlyContext consumeOrderlyContext) { return ConsumeOrderlyStatus.SUCCESS; @@ -199,7 +199,7 @@ public ConsumeOrderlyStatus consumeMessage(List list, ConsumeOrderly DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("testAll"); consumer.setNamesrvAddr(nameserverAddr); consumer.subscribe(topic, "*"); - MessageListenerOrderly messageListenerOrderly = consumerTracing.wrap(new MessageListenerOrderly() { + MessageListenerOrderly messageListenerOrderly = consumerTracing.messageListenerOrderly(new MessageListenerOrderly() { @Override public ConsumeOrderlyStatus consumeMessage(List list, ConsumeOrderlyContext consumeOrderlyContext) { return ConsumeOrderlyStatus.SUCCESS;