Skip to content

Commit

Permalink
adding an option to make routing-key part of RabbitMQ transaction names
Browse files Browse the repository at this point in the history
address PR comments - add the same logic to the sender side
  • Loading branch information
Deniza Topalova authored and deni-topalova committed Jun 3, 2024
1 parent 0c45a57 commit 68e0ca2
Show file tree
Hide file tree
Showing 6 changed files with 93 additions and 29 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ Use subheadings with the "=====" level for adding notes for unreleased changes:
===== Bug fixes
* Restore compatibility with Java 7 - {pull}3657[#3657]
[float]
===== Features
* Added option to make routing-key part of RabbitMQ transaction/span names - {pull}3636[#3636]
[[release-notes-1.50.0]]
==== 1.50.0 - 2024/05/28
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import co.elastic.apm.agent.sdk.ElasticApmInstrumentation;
import co.elastic.apm.agent.tracer.ElasticContext;
import co.elastic.apm.agent.tracer.Span;
import co.elastic.apm.agent.tracer.configuration.MessagingConfiguration;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
Expand Down Expand Up @@ -137,10 +138,9 @@ public static Object[] onBasicPublish(@Advice.This Channel channel,
if (exitSpan != null) {

exchange = normalizeExchangeName(exchange);

String transactionNameSuffix = normalizeExchangeName(resolveTransactionNameSuffix(exchange, routingKey));
exitSpan.withAction("send")
.withName("RabbitMQ SEND to ").appendToName(exchange);

.withName("RabbitMQ SEND to ").appendToName(transactionNameSuffix);
}

properties = propagateTraceContext(tracer.currentContext(), properties);
Expand Down Expand Up @@ -174,6 +174,15 @@ private static AMQP.BasicProperties propagateTraceContext(ElasticContext<?> toPr
return properties.builder().headers(headersWithContext).build();
}

private static String resolveTransactionNameSuffix(String exchange, String routingKey) {

if (MessagingConfiguration.RabbitMQNamingMode.ROUTING_KEY == AbstractBaseInstrumentation.getRabbitMQNamingMode()) {
return routingKey;
} else {
return exchange;
}
}

@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class, inline = false)
public static void afterBasicPublish(@Advice.Enter @Nullable Object[] enterArray,
@Advice.Thrown @Nullable Throwable throwable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import co.elastic.apm.agent.tracer.Transaction;
import co.elastic.apm.agent.rabbitmq.header.RabbitMQTextHeaderGetter;
import co.elastic.apm.agent.tracer.configuration.MessagingConfiguration.RabbitMQNamingMode;
import co.elastic.apm.agent.tracer.metadata.Message;
import co.elastic.apm.agent.sdk.internal.util.PrivilegedActionUtils;
import com.rabbitmq.client.AMQP;
Expand Down Expand Up @@ -106,8 +107,9 @@ public static Object onHandleDelivery(@Advice.Origin Class<?> originClazz,
return null;
}

String transactionNameSuffix = getExchangeOrRoutingKey(envelope);
transaction.withType("messaging")
.withName("RabbitMQ RECEIVE from ").appendToName(normalizeExchangeName(exchange));
.withName("RabbitMQ RECEIVE from ").appendToName(normalizeExchangeName(transactionNameSuffix));

transaction.setFrameworkName("RabbitMQ");

Expand All @@ -129,5 +131,17 @@ public static void afterHandleDelivery(@Advice.Enter @Nullable final Object tran
.end();
}
}

private static String getExchangeOrRoutingKey(Envelope envelope) {
if (null == envelope) {
return null;
}

if (RabbitMQNamingMode.ROUTING_KEY == AbstractBaseInstrumentation.getRabbitMQNamingMode()) {
return envelope.getRoutingKey();
} else {
return envelope.getExchange();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import co.elastic.apm.agent.impl.transaction.Transaction;
import co.elastic.apm.agent.common.util.WildcardMatcher;
import co.elastic.apm.agent.testutils.TestContainersUtils;
import co.elastic.apm.agent.tracer.configuration.MessagingConfiguration.RabbitMQNamingMode;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
Expand Down Expand Up @@ -98,7 +99,7 @@ public class RabbitMQIT extends AbstractInstrumentationTest {
private static final String IMAGE = "rabbitmq:3.7-management-alpine";
private static final RabbitMQContainer container = new RabbitMQContainer(IMAGE);

private static final String ROUTING_KEY = "test.key";
private static final String TEST_ROUTING_KEY = "test.key";

private static final byte[] MSG = "Testing APM!".getBytes();

Expand Down Expand Up @@ -203,7 +204,8 @@ void headersCaptureSanitize() throws IOException, InterruptedException {
), true);
}

private void testHeadersCapture(Map<String, String> headersMap, Map<String, String> expectedHeaders, boolean expectTracingHeaders) throws IOException, InterruptedException {
private void testHeadersCapture(Map<String, String> headersMap, Map<String, String> expectedHeaders,
boolean expectTracingHeaders) throws IOException, InterruptedException {
performTest(
propertiesMap(headersMap),
false,
Expand All @@ -228,6 +230,15 @@ void ignoreExchangeName() throws IOException, InterruptedException {
});
}

@Test
void routingKeyInTransactionName() throws IOException, InterruptedException {
MessagingConfiguration messagingConfiguration = config.getConfig(MessagingConfiguration.class);
doReturn(RabbitMQNamingMode.ROUTING_KEY).when(messagingConfiguration).getRabbitMQNamingMode();

performTest(emptyProperties(), false, randString("exchange"), "different-routing-key",
(mt, ms) -> {});
}

private void performTest(@Nullable AMQP.BasicProperties properties) throws IOException, InterruptedException {
performTest(properties, false, randString("exchange"), (mt, ms) -> {
});
Expand All @@ -238,17 +249,27 @@ private void performTest(@Nullable AMQP.BasicProperties properties,
String channelName,
BiConsumer<Message, Message> messageCheck) throws IOException, InterruptedException {

performTest(properties, shouldIgnore, channelName, TEST_ROUTING_KEY, messageCheck);
}

private void performTest(@Nullable AMQP.BasicProperties properties,
boolean shouldIgnore,
String channelName,
String routingKey,
BiConsumer<Message, Message> messageCheck) throws IOException, InterruptedException {

Channel channel = connection.createChannel();
String exchange = createExchange(channel, channelName);
String queue = createQueue(channel, exchange);
String queue = createQueue(channel, exchange, routingKey);

CountDownLatch messageReceived = new CountDownLatch(1);

channel.basicConsume(queue, new DefaultConsumer(channel) {
// using an anonymous class to ensure class matching is properly applied

@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
assertThat(properties).isNotNull();
Map<String, Object> headers = properties.getHeaders();

Expand All @@ -264,7 +285,7 @@ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProp

Transaction rootTransaction = startTestRootTransaction("Rabbit-Test Root Transaction");

channel.basicPublish(exchange, ROUTING_KEY, properties, MSG);
channel.basicPublish(exchange, routingKey, properties, MSG);

endRootTransaction(rootTransaction);

Expand All @@ -285,17 +306,17 @@ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProp
return;
}


// 2 transactions, 1 span expected
getReporter().awaitTransactionCount(2);
getReporter().awaitSpanCount(1);

Transaction childTransaction = getNonRootTransaction(rootTransaction, getReporter().getTransactions());

checkTransaction(childTransaction, exchange);
String transactionNameSuffix = !routingKey.equals(TEST_ROUTING_KEY) ? routingKey: exchange;
checkTransaction(childTransaction, exchange, transactionNameSuffix, "RabbitMQ");

Span span = getReporter().getSpans().get(0);
checkSendSpan(span, exchange);
checkSendSpan(span, exchange, transactionNameSuffix);

// span should be child of the first transaction
checkParentChild(rootTransaction, span);
Expand All @@ -306,12 +327,11 @@ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProp
Message spanMessage = span.getContext().getMessage();
Message transactionMessage = childTransaction.getContext().getMessage();


// test-specific assertions on captured message
messageCheck.accept(transactionMessage, spanMessage);

}


@Test
void testPollingWithinTransactionNoMessage() throws IOException {
Channel channel = connection.createChannel();
Expand Down Expand Up @@ -395,7 +415,7 @@ void testPollingIgnoreExchangeName() throws IOException {
private String declareAndBindQueue(String queue, String exchange, Channel channel) {
try {
channel.queueDeclare(queue, false, false, false, null);
channel.queueBind(queue, exchange, ROUTING_KEY);
channel.queueBind(queue, exchange, TEST_ROUTING_KEY);
return queue;
} catch (IOException e) {
throw new IllegalStateException(e);
Expand All @@ -413,7 +433,7 @@ private void pollingTest(boolean withinTransaction, boolean withResult, Supplier
}

if (withResult) {
channel.basicPublish(exchange, ROUTING_KEY, emptyProperties(), MSG);
channel.basicPublish(exchange, TEST_ROUTING_KEY, emptyProperties(), MSG);
}
channel.basicGet(queue, true);

Expand Down Expand Up @@ -573,9 +593,9 @@ static Transaction getNonRootTransaction(Transaction rootTransaction, List<Trans
return childTransaction;
}

private String createQueue(Channel channel, String exchange) throws IOException {
private String createQueue(Channel channel, String exchange, String routingKey) throws IOException {
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, exchange, ROUTING_KEY);
channel.queueBind(queueName, exchange, routingKey);
return queueName;
}

Expand Down Expand Up @@ -618,9 +638,13 @@ private static void checkTransaction(Transaction transaction, String exchange) {
}

static void checkTransaction(Transaction transaction, String exchange, String frameworkName) {
checkTransaction(transaction, exchange, exchange, frameworkName);
}

static void checkTransaction(Transaction transaction, String exchange, String transactionNameSuffix, String frameworkName) {
assertThat(transaction.getType()).isEqualTo("messaging");
assertThat(transaction.getNameAsString())
.isEqualTo("RabbitMQ RECEIVE from %s", exchange.isEmpty() ? "<default>" : exchange);
.isEqualTo("RabbitMQ RECEIVE from %s", transactionNameSuffix.isEmpty() ? "<default>" : transactionNameSuffix);
assertThat(transaction.getFrameworkName()).isEqualTo(frameworkName);

assertThat(transaction.getOutcome()).isEqualTo(Outcome.SUCCESS);
Expand Down Expand Up @@ -682,14 +706,23 @@ private static HashMap<String, String> getHeadersMap(Message message) {
}

private static void checkSendSpan(Span span, String exchange) {
checkSendSpan(span, exchange, connection.getAddress().getHostAddress(), connection.getPort());
checkSendSpan(span, exchange, exchange, connection.getAddress().getHostAddress(), connection.getPort());
}

private static void checkSendSpan(Span span, String exchange, String spanNameSuffix) {
checkSendSpan(span, exchange, spanNameSuffix, connection.getAddress().getHostAddress(), connection.getPort());
}

static void checkSendSpan(Span span, String exchange, String host, int port) {
checkSendSpan(span, exchange, exchange, host, port);
}

static void checkSendSpan(Span span, String exchange, String spanNameSuffix, String host, int port) {
String exchangeName = exchange.isEmpty() ? "<default>" : exchange;
String spanName = spanNameSuffix.isEmpty() ? "<default>" : spanNameSuffix;
checkSpanCommon(span,
"send",
String.format("RabbitMQ SEND to %s", exchangeName),
String.format("RabbitMQ SEND to %s", spanName),
exchangeName,
true
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ public class MessagingConfiguration extends ConfigurationOptionProvider {
private final ConfigurationOption<RabbitMQNamingMode> rabbitMQNamingMode = ConfigurationOption.enumOption(RabbitMQNamingMode.class)
.key("rabbitmq_naming_mode")
.configurationCategory(MESSAGING_CATEGORY)
.description("Defines whether the agent should use the exchanges or the queue for the naming of RabbitMQ Transactions. Valid options are `QUEUE` and `EXCHANGE`.\n" +
"Note that `QUEUE` only works when using RabbitMQ via spring-amqp."
.description("Defines whether the agent should use the exchanges, the routing key or the queue for the naming of RabbitMQ Transactions. Valid options are `QUEUE`, `ROUTING_KEY` and `EXCHANGE`.\n" +
"Note that `QUEUE` only works when using RabbitMQ via spring-amqp and `ROUTING_KEY` only works for the non spring-client."
)
.dynamic(true)
.tags("added[1.46.0]")
Expand Down Expand Up @@ -187,5 +187,9 @@ public enum RabbitMQNamingMode {
* Use queue in transaction names
*/
QUEUE,
/**
* Use routing key in transaction names
*/
ROUTING_KEY
}
}
12 changes: 6 additions & 6 deletions docs/configuration.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -2513,12 +2513,12 @@ Starting from version 1.43.0, the classes that are part of the 'application_pack
[[config-rabbitmq-naming-mode]]
==== `rabbitmq_naming_mode` (added[1.46.0])

Defines whether the agent should use the exchanges or the queue for the naming of RabbitMQ Transactions. Valid options are `QUEUE` and `EXCHANGE`.
Note that `QUEUE` only works when using RabbitMQ via spring-amqp.
Defines whether the agent should use the exchanges, the routing key or the queue for the naming of RabbitMQ Transactions. Valid options are `QUEUE`, `ROUTING_KEY` and `EXCHANGE`.
Note that `QUEUE` only works when using RabbitMQ via spring-amqp and `ROUTING_KEY` only works for the non spring-client.

<<configuration-dynamic, image:./images/dynamic-config.svg[] >>

Valid options: `EXCHANGE`, `QUEUE`
Valid options: `EXCHANGE`, `QUEUE`, `ROUTING_KEY`

[options="header"]
|============
Expand Down Expand Up @@ -4703,10 +4703,10 @@ Example: `5ms`.
#
# jms_listener_packages=
# Defines whether the agent should use the exchanges or the queue for the naming of RabbitMQ Transactions. Valid options are `QUEUE` and `EXCHANGE`.
# Note that `QUEUE` only works when using RabbitMQ via spring-amqp.
# Defines whether the agent should use the exchanges, the routing key or the queue for the naming of RabbitMQ Transactions. Valid options are `QUEUE`, `ROUTING_KEY` and `EXCHANGE`.
# Note that `QUEUE` only works when using RabbitMQ via spring-amqp and `ROUTING_KEY` only works for the non spring-client.
#
# Valid options: EXCHANGE, QUEUE
# Valid options: EXCHANGE, QUEUE, ROUTING_KEY
# This setting can be changed at runtime
# Type: RabbitMQNamingMode
# Default value: EXCHANGE
Expand Down

0 comments on commit 68e0ca2

Please sign in to comment.