Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/W-12041210_profiling-tracing #609

Draft
wants to merge 9 commits into
base: support/1.9.x
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,11 @@
<artifactId>mule-sdk-compatibility-api</artifactId>
<version>${muleSdkCompatibilityApiVersion}</version>
</dependency>
<dependency>
<groupId>org.mule.sdk</groupId>
<artifactId>mule-sdk-api</artifactId>
<version>${muleSdkApiVersion}</version>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

import static org.mule.extensions.jms.internal.common.JmsCommons.EXAMPLE_CONTENT_TYPE;
import static org.mule.extensions.jms.internal.common.JmsCommons.EXAMPLE_ENCODING;
import static org.mule.extensions.jms.internal.operation.profiling.tracing.JmsConsumeSpanCustomizer.getJmsConsumeSpanCustomizer;

import static org.slf4j.LoggerFactory.getLogger;

import org.mule.extensions.jms.api.config.ConsumerAckMode;
Expand All @@ -20,7 +22,6 @@
import org.mule.extensions.jms.internal.connection.session.JmsSessionManager;
import org.mule.extensions.jms.internal.metadata.JmsOutputResolver;
import org.mule.jms.commons.api.AttributesOutputResolver;
import org.mule.jms.commons.api.message.JmsAttributes;
import org.mule.jms.commons.internal.connection.JmsTransactionalConnection;
import org.mule.runtime.api.connection.ConnectionException;
import org.mule.runtime.api.lifecycle.Disposable;
Expand All @@ -35,14 +36,14 @@
import org.mule.runtime.extension.api.annotation.param.display.Example;
import org.mule.runtime.extension.api.annotation.param.display.Summary;
import org.mule.runtime.extension.api.runtime.operation.Result;
import org.mule.runtime.extension.api.runtime.process.CompletionCallback;
import org.mule.runtime.extension.api.runtime.parameter.CorrelationInfo;
import org.mule.runtime.extension.api.tx.OperationTransactionalAction;
import org.mule.sdk.compatibility.api.utils.ForwardCompatibilityHelper;

import java.util.concurrent.TimeUnit;

import javax.inject.Inject;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;

Expand All @@ -65,6 +66,9 @@ public final class JmsConsume implements Initialisable, Disposable {

private org.mule.jms.commons.internal.operation.JmsConsume jmsConsume;

@Inject
private java.util.Optional<ForwardCompatibilityHelper> forwardCompatibilityHelper;

/**
* Operation that allows the user to consume a single {@link Message} from a given {@link Destination}.
*
Expand All @@ -77,8 +81,8 @@ public final class JmsConsume implements Initialisable, Disposable {
* @param selector a custom JMS selector for filtering the messages
* @param contentType the {@link Message}'s content content type
* @param encoding the {@link Message}'s content encoding
* @param maximumWait maximum time to wait for a message before timing out
* @param maximumWaitUnit Time unit to be used in the maximumWaitTime configurations
* @param maximumWait maximum time to wait for a message before timing out
* @param maximumWaitUnit Time unit to be used in the maximumWaitTime configurations
* @return a {@link Result} with the {@link Message} content as {@link Result#getOutput} and its properties
* and headers as {@link Result#getAttributes}
* @throws JmsConsumeException if an error occurs
Expand All @@ -97,8 +101,10 @@ public Result<Object, Object> consume(@Config JmsConfig config,
defaultValue = "10000") @Summary("Maximum time to wait for a message to arrive before timeout") Long maximumWait,
@Optional(
defaultValue = "MILLISECONDS") @Example("MILLISECONDS") @Summary("Time unit to be used in the maximumWaitTime configuration") TimeUnit maximumWaitUnit,
OperationTransactionalAction transactionalAction)
OperationTransactionalAction transactionalAction,
CorrelationInfo correlationInfo)
throws JmsExtensionException, ConnectionException {
customizeCurrentSpan(connection, destination, consumerType, correlationInfo);
return (Result) jmsConsume.consume(config, connection, destination, consumerType, ackMode,
selector, contentType, encoding, maximumWait,
maximumWaitUnit, transactionalAction);
Expand All @@ -107,11 +113,20 @@ public Result<Object, Object> consume(@Config JmsConfig config,

@Override
public void initialise() {
jmsConsume = new org.mule.jms.commons.internal.operation.JmsConsume(sessionManager, schedulerService);
jmsConsume =
new org.mule.jms.commons.internal.operation.JmsConsume(sessionManager, schedulerService);
}

@Override
public void dispose() {
jmsConsume.dispose();
}

private void customizeCurrentSpan(JmsTransactionalConnection connection, String destination,
org.mule.jms.commons.api.destination.ConsumerType consumerType,
org.mule.runtime.extension.api.runtime.parameter.CorrelationInfo correlationInfo) {
forwardCompatibilityHelper
.ifPresent(fch -> getJmsConsumeSpanCustomizer().customizeSpan(fch.getDistributedTraceContextManager(correlationInfo),
connection, destination, consumerType));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
package org.mule.extensions.jms.internal.operation;

import static org.mule.extensions.jms.internal.common.JmsCommons.QUEUE;
import static org.mule.extensions.jms.internal.operation.profiling.tracing.JmsPublishSpanCustomizer.getJmsPublishSpanCustomizer;

import static org.slf4j.LoggerFactory.getLogger;

import org.mule.extensions.jms.api.config.JmsProducerConfig;
Expand All @@ -33,6 +35,7 @@
import org.mule.runtime.extension.api.runtime.parameter.OutboundCorrelationStrategy;
import org.mule.runtime.extension.api.runtime.process.CompletionCallback;
import org.mule.runtime.extension.api.tx.OperationTransactionalAction;
import org.mule.sdk.compatibility.api.utils.ForwardCompatibilityHelper;

import javax.inject.Inject;
import javax.jms.Destination;
Expand All @@ -55,6 +58,9 @@ public final class JmsPublish implements Initialisable, Disposable {
@Inject
private SchedulerService schedulerService;

@Inject
private java.util.Optional<ForwardCompatibilityHelper> forwardCompatibilityHelper;

private org.mule.jms.commons.internal.operation.JmsPublish jmsPublish;

/**
Expand Down Expand Up @@ -85,10 +91,18 @@ public void publish(@Config JmsConfig config, @Connection JmsTransactionalConnec
CompletionCallback<Void, Void> completionCallback)

throws JmsExtensionException {
customizeCurrentSpan(connection, destination, destinationType, correlationInfo);
jmsPublish.publish(config, connection, destination, destinationType, messageBuilder, overrides, transactionalAction,
sendCorrelationId, correlationInfo, completionCallback);
}

private void customizeCurrentSpan(JmsTransactionalConnection connection, String destination, DestinationType destinationType,
CorrelationInfo correlationInfo) {
forwardCompatibilityHelper
.ifPresent(fch -> getJmsPublishSpanCustomizer().customizeSpan(fch.getDistributedTraceContextManager(correlationInfo),
connection, destination, destinationType));
}


@Override
public void dispose() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright 2023 Salesforce, Inc. All rights reserved.
* The software in this package is published under the terms of the CPAL v1.0
* license, a copy of which has been included with this distribution in the
* LICENSE.txt file.
*/
package org.mule.extensions.jms.internal.operation.profiling.tracing;

import static org.mule.extensions.jms.internal.operation.profiling.tracing.SpanCustomizerUtils.safeExecute;
import static org.mule.jms.commons.internal.common.JmsCommons.getDestinationType;

import static org.slf4j.LoggerFactory.getLogger;

import org.mule.jms.commons.api.destination.ConsumerType;
import org.mule.jms.commons.internal.connection.JmsTransactionalConnection;
import org.mule.sdk.api.runtime.source.DistributedTraceContextManager;

import java.util.Locale;

import org.slf4j.Logger;

public class JmsConsumeSpanCustomizer extends JmsSpanCustomizer {

private static final Logger LOGGER = getLogger(JmsConsumeSpanCustomizer.class);

private static final String SPAN_OPERATION_NAME = "receive";
private static final String SPAN_KIND_NAME = "CONSUMER";
public static final String MESSAGING_DESTINATION_KIND = "messaging.destination_kind";

/**
* @return a new instance of a {@link JmsConsumeSpanCustomizer}.
*/
public static JmsConsumeSpanCustomizer getJmsConsumeSpanCustomizer() {
return new JmsConsumeSpanCustomizer();
}

public void customizeSpan(DistributedTraceContextManager distributedTraceContextManager, JmsTransactionalConnection connection,
String destination, ConsumerType consumerType) {
super.customizeSpan(distributedTraceContextManager, connection, destination);
safeExecute(() -> distributedTraceContextManager
.addCurrentSpanAttribute(MESSAGING_DESTINATION_KIND, getDestinationType(consumerType).toLowerCase(Locale.ROOT)),
"Messaging destination kind data could not be added to span", LOGGER);
}

@Override
protected String getSpanOperation() {
return SPAN_OPERATION_NAME;
}

@Override
protected String getSpanKind() {
return SPAN_KIND_NAME;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright 2023 Salesforce, Inc. All rights reserved.
* The software in this package is published under the terms of the CPAL v1.0
* license, a copy of which has been included with this distribution in the
* LICENSE.txt file.
*/
package org.mule.extensions.jms.internal.operation.profiling.tracing;

import static org.mule.extensions.jms.internal.operation.profiling.tracing.SpanCustomizerUtils.safeExecute;

import static org.slf4j.LoggerFactory.getLogger;

import org.mule.extensions.jms.api.destination.DestinationType;
import org.mule.jms.commons.internal.connection.JmsTransactionalConnection;
import org.mule.sdk.api.runtime.source.DistributedTraceContextManager;

import java.util.Locale;

import org.slf4j.Logger;

public class JmsPublishSpanCustomizer extends JmsSpanCustomizer {

private static final Logger LOGGER = getLogger(JmsPublishSpanCustomizer.class);

private static final String SPAN_OPERATION_NAME = "send";
private static final String SPAN_KIND_NAME = "PRODUCER";
public static final String MESSAGING_DESTINATION_KIND = "messaging.destination_kind";

/**
* @return a new instance of a {@link JmsPublishSpanCustomizer}.
*/
public static JmsPublishSpanCustomizer getJmsPublishSpanCustomizer() {
return new JmsPublishSpanCustomizer();
}

public void customizeSpan(DistributedTraceContextManager distributedTraceContextManager, JmsTransactionalConnection connection,
String destination,
DestinationType destinationType) {
super.customizeSpan(distributedTraceContextManager, connection, destination);
safeExecute(() -> distributedTraceContextManager
.addCurrentSpanAttribute(MESSAGING_DESTINATION_KIND, destinationType.toString().toLowerCase(Locale.ROOT)),
"Messaging destination kind data could not be added to span", LOGGER);
}

@Override
protected String getSpanOperation() {
return SPAN_OPERATION_NAME;
}

@Override
protected String getSpanKind() {
return SPAN_KIND_NAME;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright 2023 Salesforce, Inc. All rights reserved.
* The software in this package is published under the terms of the CPAL v1.0
* license, a copy of which has been included with this distribution in the
* LICENSE.txt file.
*/
package org.mule.extensions.jms.internal.operation.profiling.tracing;

import static org.mule.extensions.jms.internal.operation.profiling.tracing.SpanCustomizerUtils.safeExecute;
import static org.mule.jms.commons.internal.common.JmsCommons.getDestinationType;

import static org.slf4j.LoggerFactory.getLogger;

import org.mule.jms.commons.internal.connection.JmsTransactionalConnection;
import org.mule.sdk.api.runtime.source.DistributedTraceContextManager;

import java.util.Locale;

import javax.jms.JMSException;

import org.slf4j.Logger;

public abstract class JmsSpanCustomizer {

private static final Logger LOGGER = getLogger(JmsSpanCustomizer.class);
public static final String MESSAGING_SYSTEM = "messaging.system";
public static final String MESSAGING_DESTINATION = "messaging.destination";
public static final String SPAN_KIND = "span.kind.override";

protected void customizeSpan(DistributedTraceContextManager distributedTraceContextManager,
JmsTransactionalConnection connection, String destination) {
safeExecute(() -> distributedTraceContextManager.setCurrentSpanName(destination + " " + getSpanOperation()),
"Span name according to semantic conventions could not be added to span", LOGGER);
safeExecute(() -> distributedTraceContextManager
.addCurrentSpanAttribute(MESSAGING_SYSTEM, getMessagingSystem(connection)),
"Messaging system data could not be added to span", LOGGER);
safeExecute(() -> distributedTraceContextManager.addCurrentSpanAttribute(MESSAGING_DESTINATION, destination),
"Messaging destination data could not be added to span", LOGGER);
safeExecute(() -> distributedTraceContextManager.addCurrentSpanAttribute(SPAN_KIND, getSpanKind()),
"Span kind could not be added to span", LOGGER);
}

protected abstract String getSpanOperation();

protected abstract String getSpanKind();

private String getMessagingSystem(JmsTransactionalConnection connection) {
try {
return connection.get().getMetaData().getJMSProviderName().toLowerCase(Locale.ROOT);
} catch (JMSException e) {
LOGGER.info("Span connection metadata could not be fetched");
}
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright 2023 Salesforce, Inc. All rights reserved.
* The software in this package is published under the terms of the CPAL v1.0
* license, a copy of which has been included with this distribution in the
* LICENSE.txt file.
*/
package org.mule.extensions.jms.internal.operation.profiling.tracing;

import org.slf4j.Logger;

public class SpanCustomizerUtils {

/**
* Safely executes a piece of logic.
*
* @param toExecute the piece of logic to execute.
* @param loggingMessage the logging message if a throwable
* @param logger logger used for informing tracing errors.
*/
public static void safeExecute(Runnable toExecute, String loggingMessage, Logger logger) {
try {
toExecute.run();
} catch (Throwable e) {
if (logger.isWarnEnabled()) {
logger.warn(loggingMessage, e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ interface JmsStory {
String MESSAGE_TYPES = "Durable Subscriber";

String MESSAGE_FILTERING = "Message Filtering";

String TRACING = "Tracing";
}

}
Expand Down
Loading