Skip to content

Commit

Permalink
elastic#3421 extend spring rabbit transactions to use either queue or…
Browse files Browse the repository at this point in the history
… exchange in the name
  • Loading branch information
Cortana7 committed Nov 9, 2023
1 parent b859bcf commit 339e31b
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ protected static boolean captureHeaderKey(String key) {
return !WildcardMatcher.isAnyMatch(coreConfiguration.getSanitizeFieldNames(), key);
}

protected static MessagingConfiguration.RabbitMQNamingMode getRabbitMQNamingMode() {
return messagingConfiguration.getRabbitMQNamingMode();
}

/**
* Captures queue name and optional timestamp
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import co.elastic.apm.agent.tracer.Transaction;
import co.elastic.apm.agent.rabbitmq.header.SpringRabbitMQTextHeaderGetter;
import co.elastic.apm.agent.sdk.internal.util.PrivilegedActionUtils;
import co.elastic.apm.agent.tracer.configuration.MessagingConfiguration;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;

Expand All @@ -37,12 +38,12 @@ public SpringAmqpTransactionHelper(Tracer tracer) {

@Nullable
public Transaction<?> createTransaction(Message message, String transactionNamePrefix) {
String exchange = null;
String exchangeOrQueue = null;
MessageProperties messageProperties = message.getMessageProperties();
if (messageProperties != null) {
exchange = messageProperties.getReceivedExchange();
exchangeOrQueue = getExchangeOrQueue(messageProperties);
}
if (exchange != null && AbstractBaseInstrumentation.isIgnored(exchange)) {
if (exchangeOrQueue != null && AbstractBaseInstrumentation.isIgnored(exchangeOrQueue)) {
return null;
}

Expand All @@ -58,7 +59,7 @@ public Transaction<?> createTransaction(Message message, String transactionNameP
transaction.withType("messaging")
.withName(transactionNamePrefix)
.appendToName(" RECEIVE from ")
.appendToName(AbstractBaseInstrumentation.normalizeExchangeName(exchange));
.appendToName(AbstractBaseInstrumentation.normalizeExchangeName(exchangeOrQueue));

transaction.setFrameworkName("Spring AMQP");

Expand All @@ -67,12 +68,20 @@ public Transaction<?> createTransaction(Message message, String transactionNameP
String receivedRoutingKey = messageProperties.getReceivedRoutingKey();
transaction.getContext().getMessage().withAge(timestamp).withRoutingKey(receivedRoutingKey);
}
if (exchange != null) {
transaction.getContext().getMessage().withQueue(exchange);
if (exchangeOrQueue != null) {
transaction.getContext().getMessage().withQueue(exchangeOrQueue);
}

// only capture incoming messages headers for now (consistent with other messaging plugins)
AbstractBaseInstrumentation.captureHeaders(messageProperties != null ? messageProperties.getHeaders() : null, transaction.getContext().getMessage());
return transaction.activate();
}

private static String getExchangeOrQueue(MessageProperties messageProperties) {
if (MessagingConfiguration.RabbitMQNamingMode.QUEUE == AbstractBaseInstrumentation.getRabbitMQNamingMode()) {
return messageProperties.getConsumerQueue();
} else {
return messageProperties.getReceivedExchange();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package co.elastic.apm.agent.tracer.configuration;

import co.elastic.apm.agent.common.util.WildcardMatcher;
import co.elastic.apm.agent.tracer.configuration.WildcardMatcherValueConverter;
import org.stagemonitor.configuration.ConfigurationOption;
import org.stagemonitor.configuration.ConfigurationOptionProvider;
import org.stagemonitor.configuration.converter.ListValueConverter;
Expand All @@ -33,7 +32,7 @@ public class MessagingConfiguration extends ConfigurationOptionProvider {
private static final String MESSAGE_POLLING_TRANSACTION_STRATEGY = "message_polling_transaction_strategy";
private static final String MESSAGE_BATCH_STRATEGY = "message_batch_strategy";

private ConfigurationOption<JmsStrategy> messagePollingTransactionStrategy = ConfigurationOption.enumOption(JmsStrategy.class)
private final ConfigurationOption<JmsStrategy> messagePollingTransactionStrategy = ConfigurationOption.enumOption(JmsStrategy.class)
.key(MESSAGE_POLLING_TRANSACTION_STRATEGY)
.configurationCategory(MESSAGING_CATEGORY)
.tags("internal")
Expand All @@ -45,7 +44,7 @@ public class MessagingConfiguration extends ConfigurationOptionProvider {
.dynamic(true)
.buildWithDefault(JmsStrategy.HANDLING);

private ConfigurationOption<BatchStrategy> messageBatchStrategy = ConfigurationOption.enumOption(BatchStrategy.class)
private final ConfigurationOption<BatchStrategy> messageBatchStrategy = ConfigurationOption.enumOption(BatchStrategy.class)
.key(MESSAGE_BATCH_STRATEGY)
.configurationCategory(MESSAGING_CATEGORY)
.tags("internal")
Expand All @@ -57,7 +56,7 @@ public class MessagingConfiguration extends ConfigurationOptionProvider {
.dynamic(true)
.buildWithDefault(BatchStrategy.BATCH_HANDLING);

private ConfigurationOption<Boolean> collectQueueAddress = ConfigurationOption.booleanOption()
private final ConfigurationOption<Boolean> collectQueueAddress = ConfigurationOption.booleanOption()
.key("collect_queue_address")
.configurationCategory(MESSAGING_CATEGORY)
.tags("internal")
Expand All @@ -77,7 +76,7 @@ public class MessagingConfiguration extends ConfigurationOptionProvider {
"\n" +
WildcardMatcher.DOCUMENTATION)
.dynamic(true)
.buildWithDefault(Collections.<WildcardMatcher>emptyList());
.buildWithDefault(Collections.emptyList());

private final ConfigurationOption<Boolean> endMessagingTransactionOnPoll = ConfigurationOption.booleanOption()
.key("end_messaging_transaction_on_poll")
Expand Down Expand Up @@ -107,7 +106,14 @@ public class MessagingConfiguration extends ConfigurationOptionProvider {
"Starting from version 1.43.0, the classes that are part of the 'application_packages' option are also included in the list of classes considered."
)
.dynamic(false)
.buildWithDefault(Collections.<String>emptyList());
.buildWithDefault(Collections.emptyList());

private final ConfigurationOption<RabbitMQNamingMode> rabbitMQNamingMode = ConfigurationOption.enumOption(RabbitMQNamingMode.class)
.key("rabbitmq_naming_mode")
.configurationCategory(MESSAGING_CATEGORY)
.description("Defines whether the agent should use the exchanges or the queue for the naming of RabbitMQ Transactions.")
.dynamic(true)
.buildWithDefault(RabbitMQNamingMode.EXCHANGE);

public JmsStrategy getMessagePollingTransactionStrategy() {
return messagePollingTransactionStrategy.get();
Expand All @@ -133,6 +139,10 @@ public Collection<String> getJmsListenerPackages() {
return jmsListenerPackages.get();
}

public RabbitMQNamingMode getRabbitMQNamingMode() {
return rabbitMQNamingMode.get();
}

public enum JmsStrategy {
/**
* Create a transaction capturing JMS {@code receive} invocations
Expand Down Expand Up @@ -164,4 +174,15 @@ public enum BatchStrategy {
*/
BATCH_HANDLING
}

public enum RabbitMQNamingMode {
/**
* Use exchange in transaction names
*/
EXCHANGE,
/**
* Use queue in transaction names
*/
QUEUE,
}
}

0 comments on commit 339e31b

Please sign in to comment.