Skip to content
This repository has been archived by the owner on Jun 11, 2024. It is now read-only.

Commit

Permalink
Merge pull request #11 from michaelhyatt/distributed-http-tracing-2.0
Browse files Browse the repository at this point in the history
Distributed http tracing 2.0
  • Loading branch information
michaelhyatt authored Nov 3, 2020
2 parents c341b65 + 936e185 commit e9dc86a
Show file tree
Hide file tree
Showing 14 changed files with 281 additions and 41 deletions.
69 changes: 57 additions & 12 deletions mule4-agent/pom.xml
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>co.elastic.apm</groupId>
<artifactId>mule4-agent</artifactId>
<version>0.2.0</version>
<version>0.3.0</version>
<name>Elastic Mule 4 APM agent</name>
<description>Elastic Mule 4 APM agent</description>
<packaging>jar</packaging>
Expand All @@ -14,6 +15,8 @@

<elastic-apm.version>1.17.0</elastic-apm.version>

<mule.runtime.version>4.3.0</mule.runtime.version>

<mule.maven.plugin.version>3.2.7</mule.maven.plugin.version>
<build.plugins.plugin.version>2.3.2</build.plugins.plugin.version>
</properties>
Expand All @@ -40,61 +43,103 @@
<dependency>
<groupId>org.mule.tests</groupId>
<artifactId>mule-tests-functional</artifactId>
<version>4.2.0</version>
<version>${mule.runtime.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mule.tests</groupId>
<artifactId>mule-tests-runner</artifactId>
<version>4.2.0</version>
<version>${mule.runtime.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mule.tests</groupId>
<artifactId>mule-tests-unit</artifactId>
<version>4.2.0</version>
<version>${mule.runtime.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mule.tests.plugin</groupId>
<artifactId>mule-tests-component-plugin</artifactId>
<version>4.2.0</version>
<version>${mule.runtime.version}</version>
<classifier>mule-plugin</classifier>
<scope>test</scope>
</dependency>


<!-- HTTP dependencies for testing -->
<dependency>
<groupId>org.mule.modules</groupId>
<artifactId>mule-scripting-module</artifactId>
<version>1.1.6</version>
<classifier>mule-plugin</classifier>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.mule.connectors</groupId>
<artifactId>mule-http-connector</artifactId>
<version>1.5.21</version>
<classifier>mule-plugin</classifier>
<scope>provided</scope>
<!-- scope: test -->
</dependency>
<dependency>
<groupId>org.mule.connectors</groupId>
<artifactId>mule-sockets-connector</artifactId>
<version>1.1.5</version>
<classifier>mule-plugin</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mule.services</groupId>
<artifactId>mule-service-scheduler</artifactId>
<version>1.3.2</version>
<classifier>mule-service</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mule.services</groupId>
<artifactId>mule-service-http</artifactId>
<version>1.4.3</version>
<classifier>mule-service</classifier>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.github.tomakehurst</groupId>
<artifactId>wiremock-jre8</artifactId>
<version>2.25.0</version>
<scope>test</scope>
</dependency>


<!-- provided Mule APIs -->
<dependency>
<groupId>org.mule.runtime</groupId>
<artifactId>mule-api</artifactId>
<version>1.2.1</version>
<version>1.3.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.mule.runtime</groupId>
<artifactId>mule-core</artifactId>
<version>4.2.1</version>
<version>${mule.runtime.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.mule.runtime</groupId>
<artifactId>mule-module-service</artifactId>
<version>4.2.0</version>
<version>${mule.runtime.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.mule.runtime</groupId>
<artifactId>mule-module-extensions-spring-support</artifactId>
<version>4.2.0</version>
<version>${mule.runtime.version}</version>
<scope>provided</scope>
</dependency>


<!-- Elastic APM -->
<dependency>
<groupId>co.elastic.apm</groupId>
Expand All @@ -117,7 +162,7 @@
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.8.0-alpha2</version>
<version>1.7.30</version>
</dependency>


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,9 @@ public class ApmMessageProcessorNotificationListener
@SuppressWarnings("deprecation")
@Override
public void onNotification(MessageProcessorNotification notification) {
// TODO Auto-generated method stub

logger.debug("===> Received " + notification.getClass().getName() + ":" + notification.getActionName());

// Event listener
// TODO: refactor to remove the deprecation warning.
switch (notification.getAction().getActionId()) {
case MessageProcessorNotification.MESSAGE_PROCESSOR_PRE_INVOKE:
ApmHandler.handleProcessorStartEvent(notification);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import co.elastic.apm.api.ElasticApm;
import co.elastic.apm.api.Span;
import co.elastic.apm.api.Transaction;
import co.elastic.apm.mule4.agent.tracing.HttpTracingUtils;
import co.elastic.apm.mule4.agent.transaction.TransactionStore;

/*
Expand Down Expand Up @@ -38,13 +39,23 @@ public static void startSpan(TransactionStore transactionStore, MessageProcessor

Span span = transaction.startSpan(getSpanType(notification), getSubType(notification), getAction(notification));

checkAndPropagateParentTraceId(span, notification);

setSpanDetails(span, notification);

String spanId = getSpanId(notification);

transactionStore.addSpan(transactionId, spanId, span);
}

private static void checkAndPropagateParentTraceId(Span span, MessageProcessorNotification notification) {

// Propagate trace.id through HTTP
if (HttpTracingUtils.isHttpRequester(notification))
HttpTracingUtils.propagateTraceIdHeader(span, notification);

}

public static String getSpanId(MessageProcessorNotification notification) {
return notification.getInfo().getComponent().getLocation().getLocation();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package co.elastic.apm.mule4.agent.tracing;

import org.mule.extension.http.api.HttpRequestAttributes;
import org.mule.runtime.api.component.ComponentIdentifier;
import org.mule.runtime.api.metadata.TypedValue;
import org.mule.runtime.api.notification.MessageProcessorNotification;
import org.mule.runtime.api.notification.PipelineMessageNotification;
import org.mule.runtime.api.util.MultiMap;

import co.elastic.apm.api.Span;

public class HttpTracingUtils {

private static final String HTTP_REQUEST_NAMESPACE = "http://www.mulesoft.org/schema/mule/http";
private static final String HTTP_REQUEST_NAME = "request";
public static final String ELASTIC_APM_TRACEPARENT = "elastic-apm-traceparent";

public static boolean isHttpEvent(PipelineMessageNotification notification) {
return extractAttributes(notification).getDataType().getType() == HttpRequestAttributes.class;
}

public static boolean hasRemoteParent(PipelineMessageNotification notification) {

if (!isHttpEvent(notification))
return false;

return getHttpAttributes(notification).containsKey(ELASTIC_APM_TRACEPARENT);
}

public static String getTracingHeaderValue(String x, PipelineMessageNotification notification) {
return getHttpAttributes(notification).get(ELASTIC_APM_TRACEPARENT);
}

private static MultiMap<String, String> getHttpAttributes(PipelineMessageNotification notification) {

HttpRequestAttributes attributes = (HttpRequestAttributes) extractAttributes(notification).getValue();

return attributes.getHeaders();
}

private static TypedValue<Object> extractAttributes(PipelineMessageNotification notification) {
return notification.getEvent().getMessage().getAttributes();
}

public static boolean isHttpRequester(MessageProcessorNotification notification) {

ComponentIdentifier identifier = notification.getInfo().getComponent().getIdentifier();
String name = identifier.getName();
String namespace = identifier.getNamespaceUri();

return HTTP_REQUEST_NAME.equals(name) && HTTP_REQUEST_NAMESPACE.equals(namespace);
}

public static void propagateTraceIdHeader(Span span, MessageProcessorNotification notification) {
// TODO Auto-generated method stub

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import co.elastic.apm.api.ElasticApm;
import co.elastic.apm.api.Transaction;
import co.elastic.apm.mule4.agent.tracing.HttpTracingUtils;

/*
* Handling of Transaction starts and ends
Expand Down Expand Up @@ -127,14 +128,16 @@ private static Optional<String> getTransactionId(PipelineMessageNotification not
}

private static String getHeaderExtractor(String x, PipelineMessageNotification notification) {
// TODO provide parent trace info header extractor to support distributed
// transactions
return null;
return HttpTracingUtils.getTracingHeaderValue(x, notification);
}

private static boolean hasRemoteParent(PipelineMessageNotification notification) {
// TODO Determine if the notification was published for a request with remote
// parent information.

if (HttpTracingUtils.isHttpEvent(notification) && HttpTracingUtils.hasRemoteParent(notification))
return true;

return false;
}

Expand All @@ -147,7 +150,7 @@ public static void endTransaction(TransactionStore transactionStore, PipelineMes
// rest of the flows invoked through flow-ref are not represented as
// transactions and ignored. Only the corresponding flow-ref step is represented
// as Span.
if (!isEndOfTopFlow(transactionStore, notification))
if (!isEndOfTopFlowOrException(transactionStore, notification))
return;

Transaction transaction = transactionStore.retrieveTransaction(getTransactionId(notification).get())
Expand All @@ -168,7 +171,7 @@ public static void endTransaction(TransactionStore transactionStore, PipelineMes
// return timestamp;
// }

private static boolean isEndOfTopFlow(TransactionStore transactionStore, PipelineMessageNotification notification) {
private static boolean isEndOfTopFlowOrException(TransactionStore transactionStore, PipelineMessageNotification notification) {

Optional<String> transactionId = getTransactionId(notification);

Expand All @@ -182,7 +185,7 @@ private static boolean isEndOfTopFlow(TransactionStore transactionStore, Pipelin

ApmTransaction transaction = (ApmTransaction) optional.get();

if (transaction.getFlowName().equals(getFlowName(notification)))
if (transaction.hasException() || transaction.getFlowName().equals(getFlowName(notification)))
return true;

return false;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package co.elastic.apm.mule4.agent;

import static org.junit.Assert.*;

import java.util.List;

import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.util.EntityUtils;
import org.junit.Test;

import co.elastic.apm.agent.impl.transaction.Transaction;
import co.elastic.apm.mule4.agent.config.BaseAbstractApmMuleTestCase;
import co.elastic.apm.mule4.agent.tracing.HttpTracingUtils;

public class DistributedTracingTest extends BaseAbstractApmMuleTestCase {

private static final String TRACE_ID1 = "0af7651916cd43dd8448eb211c80319c";
private static final String TRACE_PARENT1 = "00-" + TRACE_ID1 + "-b9c7c989f97918e1-01";

@Test
public void testTraceIdPropagation() throws Exception {

HttpGet getRequest = new HttpGet("http://localhost:8998/request");
getRequest.addHeader(HttpTracingUtils.ELASTIC_APM_TRACEPARENT, TRACE_PARENT1);

HttpClient httpClient = HttpClientBuilder.create().build();

HttpResponse response = httpClient.execute(getRequest);

assertEquals(200, response.getStatusLine().getStatusCode());
assertEquals(TRACE_PARENT1, EntityUtils.toString(response.getEntity()));
List<Transaction> transactions = getTransactions();
assertEquals(1, transactions.size());
assertEquals(1, getSpans().size());
assertEquals(TRACE_ID1, getTransaction().getTraceContext().getTraceId().toString());

}

@Test
public void testNoTraceIdPropagation() throws Exception {

HttpGet getRequest = new HttpGet("http://localhost:8998/request");

HttpClient httpClient = HttpClientBuilder.create().build();

HttpResponse response = httpClient.execute(getRequest);

Thread.sleep(1000);

assertEquals(200, response.getStatusLine().getStatusCode());
assertEquals("", EntityUtils.toString(response.getEntity()));
List<Transaction> transactions = getTransactions();
assertEquals(1, transactions.size());
assertEquals(1, getSpans().size());
assertNotEquals(TRACE_ID1, getTransaction().getTraceContext().getTraceId().toString());
assertNotEquals("", getTransaction().getTraceContext().getTraceId().toString());

}

@Test
public void testTraceIdPropagationThroughHttp() throws Exception {

HttpGet getRequest = new HttpGet("http://localhost:8998");
getRequest.addHeader(HttpTracingUtils.ELASTIC_APM_TRACEPARENT, TRACE_PARENT1);

HttpClient httpClient = HttpClientBuilder.create().build();

HttpResponse response = httpClient.execute(getRequest);


}

@Override
protected String getConfigFile() {
return "DistributedTracingTest.xml";
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public void testSimpleFlow() throws Exception {
assertEquals(1, getSpans().size());
assertEquals(1, getErrors().size());
assertEquals("Execute", getSpans().get(0).getNameAsString());
assertEquals("java.lang.Exception: This is an error.", getErrors().get(0).getException().getMessage());
assertEquals("java.lang.Exception: This is an error", getErrors().get(0).getException().getMessage());

}

Expand Down
Loading

0 comments on commit e9dc86a

Please sign in to comment.