From 68e0ca2a357c0201cd84508f95b5662c94ba18d2 Mon Sep 17 00:00:00 2001 From: Deniza Topalova Date: Thu, 16 May 2024 17:53:39 +0300 Subject: [PATCH] adding an option to make routing-key part of RabbitMQ transaction names address PR comments - add the same logic to the sender side --- CHANGELOG.asciidoc | 4 ++ .../rabbitmq/ChannelInstrumentation.java | 15 ++++- .../rabbitmq/ConsumerInstrumentation.java | 16 ++++- .../apm/agent/rabbitmq/RabbitMQIT.java | 67 ++++++++++++++----- .../configuration/MessagingConfiguration.java | 8 ++- docs/configuration.asciidoc | 12 ++-- 6 files changed, 93 insertions(+), 29 deletions(-) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index f87aaa5350..3b82880c61 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -38,6 +38,10 @@ Use subheadings with the "=====" level for adding notes for unreleased changes: ===== Bug fixes * Restore compatibility with Java 7 - {pull}3657[#3657] +[float] +===== Features +* Added option to make routing-key part of RabbitMQ transaction/span names - {pull}3636[#3636] + [[release-notes-1.50.0]] ==== 1.50.0 - 2024/05/28 diff --git a/apm-agent-plugins/apm-rabbitmq/apm-rabbitmq-plugin/src/main/java/co/elastic/apm/agent/rabbitmq/ChannelInstrumentation.java b/apm-agent-plugins/apm-rabbitmq/apm-rabbitmq-plugin/src/main/java/co/elastic/apm/agent/rabbitmq/ChannelInstrumentation.java index 5fa4812ac1..35ec95f310 100644 --- a/apm-agent-plugins/apm-rabbitmq/apm-rabbitmq-plugin/src/main/java/co/elastic/apm/agent/rabbitmq/ChannelInstrumentation.java +++ b/apm-agent-plugins/apm-rabbitmq/apm-rabbitmq-plugin/src/main/java/co/elastic/apm/agent/rabbitmq/ChannelInstrumentation.java @@ -24,6 +24,7 @@ import co.elastic.apm.agent.sdk.ElasticApmInstrumentation; import co.elastic.apm.agent.tracer.ElasticContext; import co.elastic.apm.agent.tracer.Span; +import co.elastic.apm.agent.tracer.configuration.MessagingConfiguration; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; @@ -137,10 +138,9 @@ public static Object[] onBasicPublish(@Advice.This Channel channel, if (exitSpan != null) { exchange = normalizeExchangeName(exchange); - + String transactionNameSuffix = normalizeExchangeName(resolveTransactionNameSuffix(exchange, routingKey)); exitSpan.withAction("send") - .withName("RabbitMQ SEND to ").appendToName(exchange); - + .withName("RabbitMQ SEND to ").appendToName(transactionNameSuffix); } properties = propagateTraceContext(tracer.currentContext(), properties); @@ -174,6 +174,15 @@ private static AMQP.BasicProperties propagateTraceContext(ElasticContext toPr return properties.builder().headers(headersWithContext).build(); } + private static String resolveTransactionNameSuffix(String exchange, String routingKey) { + + if (MessagingConfiguration.RabbitMQNamingMode.ROUTING_KEY == AbstractBaseInstrumentation.getRabbitMQNamingMode()) { + return routingKey; + } else { + return exchange; + } + } + @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class, inline = false) public static void afterBasicPublish(@Advice.Enter @Nullable Object[] enterArray, @Advice.Thrown @Nullable Throwable throwable) { diff --git a/apm-agent-plugins/apm-rabbitmq/apm-rabbitmq-plugin/src/main/java/co/elastic/apm/agent/rabbitmq/ConsumerInstrumentation.java b/apm-agent-plugins/apm-rabbitmq/apm-rabbitmq-plugin/src/main/java/co/elastic/apm/agent/rabbitmq/ConsumerInstrumentation.java index 2ac2847c0c..5659022575 100644 --- a/apm-agent-plugins/apm-rabbitmq/apm-rabbitmq-plugin/src/main/java/co/elastic/apm/agent/rabbitmq/ConsumerInstrumentation.java +++ b/apm-agent-plugins/apm-rabbitmq/apm-rabbitmq-plugin/src/main/java/co/elastic/apm/agent/rabbitmq/ConsumerInstrumentation.java @@ -20,6 +20,7 @@ import co.elastic.apm.agent.tracer.Transaction; import co.elastic.apm.agent.rabbitmq.header.RabbitMQTextHeaderGetter; +import co.elastic.apm.agent.tracer.configuration.MessagingConfiguration.RabbitMQNamingMode; import co.elastic.apm.agent.tracer.metadata.Message; import co.elastic.apm.agent.sdk.internal.util.PrivilegedActionUtils; import com.rabbitmq.client.AMQP; @@ -106,8 +107,9 @@ public static Object onHandleDelivery(@Advice.Origin Class originClazz, return null; } + String transactionNameSuffix = getExchangeOrRoutingKey(envelope); transaction.withType("messaging") - .withName("RabbitMQ RECEIVE from ").appendToName(normalizeExchangeName(exchange)); + .withName("RabbitMQ RECEIVE from ").appendToName(normalizeExchangeName(transactionNameSuffix)); transaction.setFrameworkName("RabbitMQ"); @@ -129,5 +131,17 @@ public static void afterHandleDelivery(@Advice.Enter @Nullable final Object tran .end(); } } + + private static String getExchangeOrRoutingKey(Envelope envelope) { + if (null == envelope) { + return null; + } + + if (RabbitMQNamingMode.ROUTING_KEY == AbstractBaseInstrumentation.getRabbitMQNamingMode()) { + return envelope.getRoutingKey(); + } else { + return envelope.getExchange(); + } + } } } diff --git a/apm-agent-plugins/apm-rabbitmq/apm-rabbitmq-plugin/src/test/java/co/elastic/apm/agent/rabbitmq/RabbitMQIT.java b/apm-agent-plugins/apm-rabbitmq/apm-rabbitmq-plugin/src/test/java/co/elastic/apm/agent/rabbitmq/RabbitMQIT.java index b43f719046..1899aa6f42 100644 --- a/apm-agent-plugins/apm-rabbitmq/apm-rabbitmq-plugin/src/test/java/co/elastic/apm/agent/rabbitmq/RabbitMQIT.java +++ b/apm-agent-plugins/apm-rabbitmq/apm-rabbitmq-plugin/src/test/java/co/elastic/apm/agent/rabbitmq/RabbitMQIT.java @@ -56,6 +56,7 @@ import co.elastic.apm.agent.impl.transaction.Transaction; import co.elastic.apm.agent.common.util.WildcardMatcher; import co.elastic.apm.agent.testutils.TestContainersUtils; +import co.elastic.apm.agent.tracer.configuration.MessagingConfiguration.RabbitMQNamingMode; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; @@ -98,7 +99,7 @@ public class RabbitMQIT extends AbstractInstrumentationTest { private static final String IMAGE = "rabbitmq:3.7-management-alpine"; private static final RabbitMQContainer container = new RabbitMQContainer(IMAGE); - private static final String ROUTING_KEY = "test.key"; + private static final String TEST_ROUTING_KEY = "test.key"; private static final byte[] MSG = "Testing APM!".getBytes(); @@ -203,7 +204,8 @@ void headersCaptureSanitize() throws IOException, InterruptedException { ), true); } - private void testHeadersCapture(Map headersMap, Map expectedHeaders, boolean expectTracingHeaders) throws IOException, InterruptedException { + private void testHeadersCapture(Map headersMap, Map expectedHeaders, + boolean expectTracingHeaders) throws IOException, InterruptedException { performTest( propertiesMap(headersMap), false, @@ -228,6 +230,15 @@ void ignoreExchangeName() throws IOException, InterruptedException { }); } + @Test + void routingKeyInTransactionName() throws IOException, InterruptedException { + MessagingConfiguration messagingConfiguration = config.getConfig(MessagingConfiguration.class); + doReturn(RabbitMQNamingMode.ROUTING_KEY).when(messagingConfiguration).getRabbitMQNamingMode(); + + performTest(emptyProperties(), false, randString("exchange"), "different-routing-key", + (mt, ms) -> {}); + } + private void performTest(@Nullable AMQP.BasicProperties properties) throws IOException, InterruptedException { performTest(properties, false, randString("exchange"), (mt, ms) -> { }); @@ -238,9 +249,18 @@ private void performTest(@Nullable AMQP.BasicProperties properties, String channelName, BiConsumer messageCheck) throws IOException, InterruptedException { + performTest(properties, shouldIgnore, channelName, TEST_ROUTING_KEY, messageCheck); + } + + private void performTest(@Nullable AMQP.BasicProperties properties, + boolean shouldIgnore, + String channelName, + String routingKey, + BiConsumer messageCheck) throws IOException, InterruptedException { + Channel channel = connection.createChannel(); String exchange = createExchange(channel, channelName); - String queue = createQueue(channel, exchange); + String queue = createQueue(channel, exchange, routingKey); CountDownLatch messageReceived = new CountDownLatch(1); @@ -248,7 +268,8 @@ private void performTest(@Nullable AMQP.BasicProperties properties, // using an anonymous class to ensure class matching is properly applied @Override - public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { + public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, + byte[] body) throws IOException { assertThat(properties).isNotNull(); Map headers = properties.getHeaders(); @@ -264,7 +285,7 @@ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProp Transaction rootTransaction = startTestRootTransaction("Rabbit-Test Root Transaction"); - channel.basicPublish(exchange, ROUTING_KEY, properties, MSG); + channel.basicPublish(exchange, routingKey, properties, MSG); endRootTransaction(rootTransaction); @@ -285,17 +306,17 @@ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProp return; } - // 2 transactions, 1 span expected getReporter().awaitTransactionCount(2); getReporter().awaitSpanCount(1); Transaction childTransaction = getNonRootTransaction(rootTransaction, getReporter().getTransactions()); - checkTransaction(childTransaction, exchange); + String transactionNameSuffix = !routingKey.equals(TEST_ROUTING_KEY) ? routingKey: exchange; + checkTransaction(childTransaction, exchange, transactionNameSuffix, "RabbitMQ"); Span span = getReporter().getSpans().get(0); - checkSendSpan(span, exchange); + checkSendSpan(span, exchange, transactionNameSuffix); // span should be child of the first transaction checkParentChild(rootTransaction, span); @@ -306,12 +327,11 @@ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProp Message spanMessage = span.getContext().getMessage(); Message transactionMessage = childTransaction.getContext().getMessage(); - // test-specific assertions on captured message messageCheck.accept(transactionMessage, spanMessage); - } + @Test void testPollingWithinTransactionNoMessage() throws IOException { Channel channel = connection.createChannel(); @@ -395,7 +415,7 @@ void testPollingIgnoreExchangeName() throws IOException { private String declareAndBindQueue(String queue, String exchange, Channel channel) { try { channel.queueDeclare(queue, false, false, false, null); - channel.queueBind(queue, exchange, ROUTING_KEY); + channel.queueBind(queue, exchange, TEST_ROUTING_KEY); return queue; } catch (IOException e) { throw new IllegalStateException(e); @@ -413,7 +433,7 @@ private void pollingTest(boolean withinTransaction, boolean withResult, Supplier } if (withResult) { - channel.basicPublish(exchange, ROUTING_KEY, emptyProperties(), MSG); + channel.basicPublish(exchange, TEST_ROUTING_KEY, emptyProperties(), MSG); } channel.basicGet(queue, true); @@ -573,9 +593,9 @@ static Transaction getNonRootTransaction(Transaction rootTransaction, List" : exchange); + .isEqualTo("RabbitMQ RECEIVE from %s", transactionNameSuffix.isEmpty() ? "" : transactionNameSuffix); assertThat(transaction.getFrameworkName()).isEqualTo(frameworkName); assertThat(transaction.getOutcome()).isEqualTo(Outcome.SUCCESS); @@ -682,14 +706,23 @@ private static HashMap getHeadersMap(Message message) { } private static void checkSendSpan(Span span, String exchange) { - checkSendSpan(span, exchange, connection.getAddress().getHostAddress(), connection.getPort()); + checkSendSpan(span, exchange, exchange, connection.getAddress().getHostAddress(), connection.getPort()); + } + + private static void checkSendSpan(Span span, String exchange, String spanNameSuffix) { + checkSendSpan(span, exchange, spanNameSuffix, connection.getAddress().getHostAddress(), connection.getPort()); } static void checkSendSpan(Span span, String exchange, String host, int port) { + checkSendSpan(span, exchange, exchange, host, port); + } + + static void checkSendSpan(Span span, String exchange, String spanNameSuffix, String host, int port) { String exchangeName = exchange.isEmpty() ? "" : exchange; + String spanName = spanNameSuffix.isEmpty() ? "" : spanNameSuffix; checkSpanCommon(span, "send", - String.format("RabbitMQ SEND to %s", exchangeName), + String.format("RabbitMQ SEND to %s", spanName), exchangeName, true ); diff --git a/apm-agent-tracer/src/main/java/co/elastic/apm/agent/tracer/configuration/MessagingConfiguration.java b/apm-agent-tracer/src/main/java/co/elastic/apm/agent/tracer/configuration/MessagingConfiguration.java index 85420bc1b2..ab8c28322a 100644 --- a/apm-agent-tracer/src/main/java/co/elastic/apm/agent/tracer/configuration/MessagingConfiguration.java +++ b/apm-agent-tracer/src/main/java/co/elastic/apm/agent/tracer/configuration/MessagingConfiguration.java @@ -111,8 +111,8 @@ public class MessagingConfiguration extends ConfigurationOptionProvider { private final ConfigurationOption rabbitMQNamingMode = ConfigurationOption.enumOption(RabbitMQNamingMode.class) .key("rabbitmq_naming_mode") .configurationCategory(MESSAGING_CATEGORY) - .description("Defines whether the agent should use the exchanges or the queue for the naming of RabbitMQ Transactions. Valid options are `QUEUE` and `EXCHANGE`.\n" + - "Note that `QUEUE` only works when using RabbitMQ via spring-amqp." + .description("Defines whether the agent should use the exchanges, the routing key or the queue for the naming of RabbitMQ Transactions. Valid options are `QUEUE`, `ROUTING_KEY` and `EXCHANGE`.\n" + + "Note that `QUEUE` only works when using RabbitMQ via spring-amqp and `ROUTING_KEY` only works for the non spring-client." ) .dynamic(true) .tags("added[1.46.0]") @@ -187,5 +187,9 @@ public enum RabbitMQNamingMode { * Use queue in transaction names */ QUEUE, + /** + * Use routing key in transaction names + */ + ROUTING_KEY } } diff --git a/docs/configuration.asciidoc b/docs/configuration.asciidoc index 1a31360d3f..3d124b4b2d 100644 --- a/docs/configuration.asciidoc +++ b/docs/configuration.asciidoc @@ -2513,12 +2513,12 @@ Starting from version 1.43.0, the classes that are part of the 'application_pack [[config-rabbitmq-naming-mode]] ==== `rabbitmq_naming_mode` (added[1.46.0]) -Defines whether the agent should use the exchanges or the queue for the naming of RabbitMQ Transactions. Valid options are `QUEUE` and `EXCHANGE`. -Note that `QUEUE` only works when using RabbitMQ via spring-amqp. +Defines whether the agent should use the exchanges, the routing key or the queue for the naming of RabbitMQ Transactions. Valid options are `QUEUE`, `ROUTING_KEY` and `EXCHANGE`. +Note that `QUEUE` only works when using RabbitMQ via spring-amqp and `ROUTING_KEY` only works for the non spring-client. <> -Valid options: `EXCHANGE`, `QUEUE` +Valid options: `EXCHANGE`, `QUEUE`, `ROUTING_KEY` [options="header"] |============ @@ -4703,10 +4703,10 @@ Example: `5ms`. # # jms_listener_packages= -# Defines whether the agent should use the exchanges or the queue for the naming of RabbitMQ Transactions. Valid options are `QUEUE` and `EXCHANGE`. -# Note that `QUEUE` only works when using RabbitMQ via spring-amqp. +# Defines whether the agent should use the exchanges, the routing key or the queue for the naming of RabbitMQ Transactions. Valid options are `QUEUE`, `ROUTING_KEY` and `EXCHANGE`. +# Note that `QUEUE` only works when using RabbitMQ via spring-amqp and `ROUTING_KEY` only works for the non spring-client. # -# Valid options: EXCHANGE, QUEUE +# Valid options: EXCHANGE, QUEUE, ROUTING_KEY # This setting can be changed at runtime # Type: RabbitMQNamingMode # Default value: EXCHANGE