Skip to content
This repository has been archived by the owner on Jun 11, 2024. It is now read-only.

Distributed http tracing 2.0 #11

Merged
merged 3 commits into from
Nov 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 57 additions & 12 deletions mule4-agent/pom.xml
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>co.elastic.apm</groupId>
<artifactId>mule4-agent</artifactId>
<version>0.2.0</version>
<version>0.3.0</version>
<name>Elastic Mule 4 APM agent</name>
<description>Elastic Mule 4 APM agent</description>
<packaging>jar</packaging>
Expand All @@ -14,6 +15,8 @@

<elastic-apm.version>1.17.0</elastic-apm.version>

<mule.runtime.version>4.3.0</mule.runtime.version>

<mule.maven.plugin.version>3.2.7</mule.maven.plugin.version>
<build.plugins.plugin.version>2.3.2</build.plugins.plugin.version>
</properties>
Expand All @@ -40,61 +43,103 @@
<dependency>
<groupId>org.mule.tests</groupId>
<artifactId>mule-tests-functional</artifactId>
<version>4.2.0</version>
<version>${mule.runtime.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mule.tests</groupId>
<artifactId>mule-tests-runner</artifactId>
<version>4.2.0</version>
<version>${mule.runtime.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mule.tests</groupId>
<artifactId>mule-tests-unit</artifactId>
<version>4.2.0</version>
<version>${mule.runtime.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mule.tests.plugin</groupId>
<artifactId>mule-tests-component-plugin</artifactId>
<version>4.2.0</version>
<version>${mule.runtime.version}</version>
<classifier>mule-plugin</classifier>
<scope>test</scope>
</dependency>


<!-- HTTP dependencies for testing -->
<dependency>
<groupId>org.mule.modules</groupId>
<artifactId>mule-scripting-module</artifactId>
<version>1.1.6</version>
<classifier>mule-plugin</classifier>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.mule.connectors</groupId>
<artifactId>mule-http-connector</artifactId>
<version>1.5.21</version>
<classifier>mule-plugin</classifier>
<scope>provided</scope>
<!-- scope: test -->
</dependency>
<dependency>
<groupId>org.mule.connectors</groupId>
<artifactId>mule-sockets-connector</artifactId>
<version>1.1.5</version>
<classifier>mule-plugin</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mule.services</groupId>
<artifactId>mule-service-scheduler</artifactId>
<version>1.3.2</version>
<classifier>mule-service</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mule.services</groupId>
<artifactId>mule-service-http</artifactId>
<version>1.4.3</version>
<classifier>mule-service</classifier>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.github.tomakehurst</groupId>
<artifactId>wiremock-jre8</artifactId>
<version>2.25.0</version>
<scope>test</scope>
</dependency>


<!-- provided Mule APIs -->
<dependency>
<groupId>org.mule.runtime</groupId>
<artifactId>mule-api</artifactId>
<version>1.2.1</version>
<version>1.3.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.mule.runtime</groupId>
<artifactId>mule-core</artifactId>
<version>4.2.1</version>
<version>${mule.runtime.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.mule.runtime</groupId>
<artifactId>mule-module-service</artifactId>
<version>4.2.0</version>
<version>${mule.runtime.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.mule.runtime</groupId>
<artifactId>mule-module-extensions-spring-support</artifactId>
<version>4.2.0</version>
<version>${mule.runtime.version}</version>
<scope>provided</scope>
</dependency>


<!-- Elastic APM -->
<dependency>
<groupId>co.elastic.apm</groupId>
Expand All @@ -117,7 +162,7 @@
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.8.0-alpha2</version>
<version>1.7.30</version>
</dependency>


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,9 @@ public class ApmMessageProcessorNotificationListener
@SuppressWarnings("deprecation")
@Override
public void onNotification(MessageProcessorNotification notification) {
// TODO Auto-generated method stub

logger.debug("===> Received " + notification.getClass().getName() + ":" + notification.getActionName());

// Event listener
// TODO: refactor to remove the deprecation warning.
switch (notification.getAction().getActionId()) {
case MessageProcessorNotification.MESSAGE_PROCESSOR_PRE_INVOKE:
ApmHandler.handleProcessorStartEvent(notification);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import co.elastic.apm.api.ElasticApm;
import co.elastic.apm.api.Span;
import co.elastic.apm.api.Transaction;
import co.elastic.apm.mule4.agent.tracing.HttpTracingUtils;
import co.elastic.apm.mule4.agent.transaction.TransactionStore;

/*
Expand Down Expand Up @@ -38,13 +39,23 @@ public static void startSpan(TransactionStore transactionStore, MessageProcessor

Span span = transaction.startSpan(getSpanType(notification), getSubType(notification), getAction(notification));

checkAndPropagateParentTraceId(span, notification);

setSpanDetails(span, notification);

String spanId = getSpanId(notification);

transactionStore.addSpan(transactionId, spanId, span);
}

private static void checkAndPropagateParentTraceId(Span span, MessageProcessorNotification notification) {

// Propagate trace.id through HTTP
if (HttpTracingUtils.isHttpRequester(notification))
HttpTracingUtils.propagateTraceIdHeader(span, notification);

}

public static String getSpanId(MessageProcessorNotification notification) {
return notification.getInfo().getComponent().getLocation().getLocation();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package co.elastic.apm.mule4.agent.tracing;

import org.mule.extension.http.api.HttpRequestAttributes;
import org.mule.runtime.api.component.ComponentIdentifier;
import org.mule.runtime.api.metadata.TypedValue;
import org.mule.runtime.api.notification.MessageProcessorNotification;
import org.mule.runtime.api.notification.PipelineMessageNotification;
import org.mule.runtime.api.util.MultiMap;

import co.elastic.apm.api.Span;

public class HttpTracingUtils {

private static final String HTTP_REQUEST_NAMESPACE = "http://www.mulesoft.org/schema/mule/http";
private static final String HTTP_REQUEST_NAME = "request";
public static final String ELASTIC_APM_TRACEPARENT = "elastic-apm-traceparent";

public static boolean isHttpEvent(PipelineMessageNotification notification) {
return extractAttributes(notification).getDataType().getType() == HttpRequestAttributes.class;
}

public static boolean hasRemoteParent(PipelineMessageNotification notification) {

if (!isHttpEvent(notification))
return false;

return getHttpAttributes(notification).containsKey(ELASTIC_APM_TRACEPARENT);
}

public static String getTracingHeaderValue(String x, PipelineMessageNotification notification) {
return getHttpAttributes(notification).get(ELASTIC_APM_TRACEPARENT);
}

private static MultiMap<String, String> getHttpAttributes(PipelineMessageNotification notification) {

HttpRequestAttributes attributes = (HttpRequestAttributes) extractAttributes(notification).getValue();

return attributes.getHeaders();
}

private static TypedValue<Object> extractAttributes(PipelineMessageNotification notification) {
return notification.getEvent().getMessage().getAttributes();
}

public static boolean isHttpRequester(MessageProcessorNotification notification) {

ComponentIdentifier identifier = notification.getInfo().getComponent().getIdentifier();
String name = identifier.getName();
String namespace = identifier.getNamespaceUri();

return HTTP_REQUEST_NAME.equals(name) && HTTP_REQUEST_NAMESPACE.equals(namespace);
}

public static void propagateTraceIdHeader(Span span, MessageProcessorNotification notification) {
// TODO Auto-generated method stub

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import co.elastic.apm.api.ElasticApm;
import co.elastic.apm.api.Transaction;
import co.elastic.apm.mule4.agent.tracing.HttpTracingUtils;

/*
* Handling of Transaction starts and ends
Expand Down Expand Up @@ -127,14 +128,16 @@ private static Optional<String> getTransactionId(PipelineMessageNotification not
}

private static String getHeaderExtractor(String x, PipelineMessageNotification notification) {
// TODO provide parent trace info header extractor to support distributed
// transactions
return null;
return HttpTracingUtils.getTracingHeaderValue(x, notification);
}

private static boolean hasRemoteParent(PipelineMessageNotification notification) {
// TODO Determine if the notification was published for a request with remote
// parent information.

if (HttpTracingUtils.isHttpEvent(notification) && HttpTracingUtils.hasRemoteParent(notification))
return true;

return false;
}

Expand All @@ -147,7 +150,7 @@ public static void endTransaction(TransactionStore transactionStore, PipelineMes
// rest of the flows invoked through flow-ref are not represented as
// transactions and ignored. Only the corresponding flow-ref step is represented
// as Span.
if (!isEndOfTopFlow(transactionStore, notification))
if (!isEndOfTopFlowOrException(transactionStore, notification))
return;

Transaction transaction = transactionStore.retrieveTransaction(getTransactionId(notification).get())
Expand All @@ -168,7 +171,7 @@ public static void endTransaction(TransactionStore transactionStore, PipelineMes
// return timestamp;
// }

private static boolean isEndOfTopFlow(TransactionStore transactionStore, PipelineMessageNotification notification) {
private static boolean isEndOfTopFlowOrException(TransactionStore transactionStore, PipelineMessageNotification notification) {

Optional<String> transactionId = getTransactionId(notification);

Expand All @@ -182,7 +185,7 @@ private static boolean isEndOfTopFlow(TransactionStore transactionStore, Pipelin

ApmTransaction transaction = (ApmTransaction) optional.get();

if (transaction.getFlowName().equals(getFlowName(notification)))
if (transaction.hasException() || transaction.getFlowName().equals(getFlowName(notification)))
return true;

return false;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package co.elastic.apm.mule4.agent;

import static org.junit.Assert.*;

import java.util.List;

import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.util.EntityUtils;
import org.junit.Test;

import co.elastic.apm.agent.impl.transaction.Transaction;
import co.elastic.apm.mule4.agent.config.BaseAbstractApmMuleTestCase;
import co.elastic.apm.mule4.agent.tracing.HttpTracingUtils;

public class DistributedTracingTest extends BaseAbstractApmMuleTestCase {

private static final String TRACE_ID1 = "0af7651916cd43dd8448eb211c80319c";
private static final String TRACE_PARENT1 = "00-" + TRACE_ID1 + "-b9c7c989f97918e1-01";

@Test
public void testTraceIdPropagation() throws Exception {

HttpGet getRequest = new HttpGet("http://localhost:8998/request");
getRequest.addHeader(HttpTracingUtils.ELASTIC_APM_TRACEPARENT, TRACE_PARENT1);

HttpClient httpClient = HttpClientBuilder.create().build();

HttpResponse response = httpClient.execute(getRequest);

assertEquals(200, response.getStatusLine().getStatusCode());
assertEquals(TRACE_PARENT1, EntityUtils.toString(response.getEntity()));
List<Transaction> transactions = getTransactions();
assertEquals(1, transactions.size());
assertEquals(1, getSpans().size());
assertEquals(TRACE_ID1, getTransaction().getTraceContext().getTraceId().toString());

}

@Test
public void testNoTraceIdPropagation() throws Exception {

HttpGet getRequest = new HttpGet("http://localhost:8998/request");

HttpClient httpClient = HttpClientBuilder.create().build();

HttpResponse response = httpClient.execute(getRequest);

Thread.sleep(1000);

assertEquals(200, response.getStatusLine().getStatusCode());
assertEquals("", EntityUtils.toString(response.getEntity()));
List<Transaction> transactions = getTransactions();
assertEquals(1, transactions.size());
assertEquals(1, getSpans().size());
assertNotEquals(TRACE_ID1, getTransaction().getTraceContext().getTraceId().toString());
assertNotEquals("", getTransaction().getTraceContext().getTraceId().toString());

}

@Test
public void testTraceIdPropagationThroughHttp() throws Exception {

HttpGet getRequest = new HttpGet("http://localhost:8998");
getRequest.addHeader(HttpTracingUtils.ELASTIC_APM_TRACEPARENT, TRACE_PARENT1);

HttpClient httpClient = HttpClientBuilder.create().build();

HttpResponse response = httpClient.execute(getRequest);


}

@Override
protected String getConfigFile() {
return "DistributedTracingTest.xml";
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public void testSimpleFlow() throws Exception {
assertEquals(1, getSpans().size());
assertEquals(1, getErrors().size());
assertEquals("Execute", getSpans().get(0).getNameAsString());
assertEquals("java.lang.Exception: This is an error.", getErrors().get(0).getException().getMessage());
assertEquals("java.lang.Exception: This is an error", getErrors().get(0).getException().getMessage());

}

Expand Down
Loading