diff --git a/apm-agent-plugins/apm-rabbitmq/apm-rabbitmq-plugin/src/main/java/co/elastic/apm/agent/rabbitmq/AbstractBaseInstrumentation.java b/apm-agent-plugins/apm-rabbitmq/apm-rabbitmq-plugin/src/main/java/co/elastic/apm/agent/rabbitmq/AbstractBaseInstrumentation.java index 17e887f362..a73cc84df3 100644 --- a/apm-agent-plugins/apm-rabbitmq/apm-rabbitmq-plugin/src/main/java/co/elastic/apm/agent/rabbitmq/AbstractBaseInstrumentation.java +++ b/apm-agent-plugins/apm-rabbitmq/apm-rabbitmq-plugin/src/main/java/co/elastic/apm/agent/rabbitmq/AbstractBaseInstrumentation.java @@ -52,6 +52,10 @@ protected static boolean captureHeaderKey(String key) { return !WildcardMatcher.isAnyMatch(coreConfiguration.getSanitizeFieldNames(), key); } + protected static MessagingConfiguration.RabbitMQNamingMode getRabbitMQNamingMode() { + return messagingConfiguration.getRabbitMQNamingMode(); + } + /** * Captures queue name and optional timestamp * diff --git a/apm-agent-plugins/apm-rabbitmq/apm-rabbitmq-spring/src/main/java/co/elastic/apm/agent/rabbitmq/SpringAmqpTransactionHelper.java b/apm-agent-plugins/apm-rabbitmq/apm-rabbitmq-spring/src/main/java/co/elastic/apm/agent/rabbitmq/SpringAmqpTransactionHelper.java index 6aa8d6627e..19ae60bd55 100644 --- a/apm-agent-plugins/apm-rabbitmq/apm-rabbitmq-spring/src/main/java/co/elastic/apm/agent/rabbitmq/SpringAmqpTransactionHelper.java +++ b/apm-agent-plugins/apm-rabbitmq/apm-rabbitmq-spring/src/main/java/co/elastic/apm/agent/rabbitmq/SpringAmqpTransactionHelper.java @@ -22,6 +22,7 @@ import co.elastic.apm.agent.tracer.Transaction; import co.elastic.apm.agent.rabbitmq.header.SpringRabbitMQTextHeaderGetter; import co.elastic.apm.agent.sdk.internal.util.PrivilegedActionUtils; +import co.elastic.apm.agent.tracer.configuration.MessagingConfiguration; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; @@ -37,12 +38,12 @@ public SpringAmqpTransactionHelper(Tracer tracer) { @Nullable public Transaction createTransaction(Message message, String transactionNamePrefix) { - String exchange = null; + String exchangeOrQueue = null; MessageProperties messageProperties = message.getMessageProperties(); if (messageProperties != null) { - exchange = messageProperties.getReceivedExchange(); + exchangeOrQueue = getExchangeOrQueue(messageProperties); } - if (exchange != null && AbstractBaseInstrumentation.isIgnored(exchange)) { + if (exchangeOrQueue != null && AbstractBaseInstrumentation.isIgnored(exchangeOrQueue)) { return null; } @@ -58,7 +59,7 @@ public Transaction createTransaction(Message message, String transactionNameP transaction.withType("messaging") .withName(transactionNamePrefix) .appendToName(" RECEIVE from ") - .appendToName(AbstractBaseInstrumentation.normalizeExchangeName(exchange)); + .appendToName(AbstractBaseInstrumentation.normalizeExchangeName(exchangeOrQueue)); transaction.setFrameworkName("Spring AMQP"); @@ -67,12 +68,20 @@ public Transaction createTransaction(Message message, String transactionNameP String receivedRoutingKey = messageProperties.getReceivedRoutingKey(); transaction.getContext().getMessage().withAge(timestamp).withRoutingKey(receivedRoutingKey); } - if (exchange != null) { - transaction.getContext().getMessage().withQueue(exchange); + if (exchangeOrQueue != null) { + transaction.getContext().getMessage().withQueue(exchangeOrQueue); } // only capture incoming messages headers for now (consistent with other messaging plugins) AbstractBaseInstrumentation.captureHeaders(messageProperties != null ? messageProperties.getHeaders() : null, transaction.getContext().getMessage()); return transaction.activate(); } + + private static String getExchangeOrQueue(MessageProperties messageProperties) { + if (MessagingConfiguration.RabbitMQNamingMode.QUEUE == AbstractBaseInstrumentation.getRabbitMQNamingMode()) { + return messageProperties.getConsumerQueue(); + } else { + return messageProperties.getReceivedExchange(); + } + } } diff --git a/apm-agent-tracer/src/main/java/co/elastic/apm/agent/tracer/configuration/MessagingConfiguration.java b/apm-agent-tracer/src/main/java/co/elastic/apm/agent/tracer/configuration/MessagingConfiguration.java index 0710c877a7..5bc1171e21 100644 --- a/apm-agent-tracer/src/main/java/co/elastic/apm/agent/tracer/configuration/MessagingConfiguration.java +++ b/apm-agent-tracer/src/main/java/co/elastic/apm/agent/tracer/configuration/MessagingConfiguration.java @@ -19,7 +19,6 @@ package co.elastic.apm.agent.tracer.configuration; import co.elastic.apm.agent.common.util.WildcardMatcher; -import co.elastic.apm.agent.tracer.configuration.WildcardMatcherValueConverter; import org.stagemonitor.configuration.ConfigurationOption; import org.stagemonitor.configuration.ConfigurationOptionProvider; import org.stagemonitor.configuration.converter.ListValueConverter; @@ -33,7 +32,7 @@ public class MessagingConfiguration extends ConfigurationOptionProvider { private static final String MESSAGE_POLLING_TRANSACTION_STRATEGY = "message_polling_transaction_strategy"; private static final String MESSAGE_BATCH_STRATEGY = "message_batch_strategy"; - private ConfigurationOption messagePollingTransactionStrategy = ConfigurationOption.enumOption(JmsStrategy.class) + private final ConfigurationOption messagePollingTransactionStrategy = ConfigurationOption.enumOption(JmsStrategy.class) .key(MESSAGE_POLLING_TRANSACTION_STRATEGY) .configurationCategory(MESSAGING_CATEGORY) .tags("internal") @@ -45,7 +44,7 @@ public class MessagingConfiguration extends ConfigurationOptionProvider { .dynamic(true) .buildWithDefault(JmsStrategy.HANDLING); - private ConfigurationOption messageBatchStrategy = ConfigurationOption.enumOption(BatchStrategy.class) + private final ConfigurationOption messageBatchStrategy = ConfigurationOption.enumOption(BatchStrategy.class) .key(MESSAGE_BATCH_STRATEGY) .configurationCategory(MESSAGING_CATEGORY) .tags("internal") @@ -57,7 +56,7 @@ public class MessagingConfiguration extends ConfigurationOptionProvider { .dynamic(true) .buildWithDefault(BatchStrategy.BATCH_HANDLING); - private ConfigurationOption collectQueueAddress = ConfigurationOption.booleanOption() + private final ConfigurationOption collectQueueAddress = ConfigurationOption.booleanOption() .key("collect_queue_address") .configurationCategory(MESSAGING_CATEGORY) .tags("internal") @@ -77,7 +76,7 @@ public class MessagingConfiguration extends ConfigurationOptionProvider { "\n" + WildcardMatcher.DOCUMENTATION) .dynamic(true) - .buildWithDefault(Collections.emptyList()); + .buildWithDefault(Collections.emptyList()); private final ConfigurationOption endMessagingTransactionOnPoll = ConfigurationOption.booleanOption() .key("end_messaging_transaction_on_poll") @@ -107,7 +106,14 @@ public class MessagingConfiguration extends ConfigurationOptionProvider { "Starting from version 1.43.0, the classes that are part of the 'application_packages' option are also included in the list of classes considered." ) .dynamic(false) - .buildWithDefault(Collections.emptyList()); + .buildWithDefault(Collections.emptyList()); + + private final ConfigurationOption rabbitMQNamingMode = ConfigurationOption.enumOption(RabbitMQNamingMode.class) + .key("rabbitmq_naming_mode") + .configurationCategory(MESSAGING_CATEGORY) + .description("Defines whether the agent should use the exchanges or the queue for the naming of RabbitMQ Transactions.") + .dynamic(true) + .buildWithDefault(RabbitMQNamingMode.EXCHANGE); public JmsStrategy getMessagePollingTransactionStrategy() { return messagePollingTransactionStrategy.get(); @@ -133,6 +139,10 @@ public Collection getJmsListenerPackages() { return jmsListenerPackages.get(); } + public RabbitMQNamingMode getRabbitMQNamingMode() { + return rabbitMQNamingMode.get(); + } + public enum JmsStrategy { /** * Create a transaction capturing JMS {@code receive} invocations @@ -164,4 +174,15 @@ public enum BatchStrategy { */ BATCH_HANDLING } + + public enum RabbitMQNamingMode { + /** + * Use exchange in transaction names + */ + EXCHANGE, + /** + * Use queue in transaction names + */ + QUEUE, + } }