Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HPCC-586 Add Otel Tracing support #714

Merged
merged 1 commit into from
Jun 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 39 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@
<groups>org.hpccsystems.commons.annotations.BaseTests</groups>
<codehaus.template.version>1.0.0</codehaus.template.version>
<project.benchmarking>false</project.benchmarking>
<opentelemetry.bom.version>1.38.0</opentelemetry.bom.version>
<opentelemetry.semconv.version>1.25.0-alpha</opentelemetry.semconv.version>
</properties>

<scm>
Expand Down Expand Up @@ -98,8 +100,44 @@
</snapshots>
</repository>
</repositories>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-bom</artifactId>
<version>${opentelemetry.bom.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-api</artifactId>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk</artifactId>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-exporter-logging</artifactId>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-extension-autoconfigure</artifactId>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-extension-autoconfigure-spi</artifactId>
</dependency>
<dependency>
<!-- Not managed by opentelemetry-bom -->
<groupId>io.opentelemetry.semconv</groupId>
<artifactId>opentelemetry-semconv</artifactId>
<version>1.25.0-alpha</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Expand Down
119 changes: 102 additions & 17 deletions wsclient/src/main/java/org/hpccsystems/ws/client/BaseHPCCWsClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,28 @@
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;

import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanBuilder;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.semconv.HttpAttributes;
import io.opentelemetry.semconv.ServerAttributes;

/**
* Defines functionality common to all HPCC Systmes web service clients.
* Defines functionality common to all HPCC Systems web service clients.
*
* Typically implemented by specialized HPCC Web service clients.
*/
public abstract class BaseHPCCWsClient extends DataSingleton
{
public static final String PROJECT_NAME = "WsClient";
private static OpenTelemetry globalOTel = null;
/** Constant <code>log</code> */
protected static final Logger log = LogManager.getLogger(BaseHPCCWsClient.class);
/** Constant <code>DEAFULTECLWATCHPORT="8010"</code> */
Expand Down Expand Up @@ -164,6 +179,53 @@ private String getTargetHPCCBuildVersionString() throws Exception

}

public SpanBuilder getWsClientSpanBuilder(String spanName)
{
SpanBuilder spanBuilder = getWsClientTracer().spanBuilder(spanName)
.setAttribute(ServerAttributes.SERVER_ADDRESS, wsconn.getHost())
.setAttribute(ServerAttributes.SERVER_PORT, Long.getLong(wsconn.getPort()))
.setAttribute(HttpAttributes.HTTP_REQUEST_METHOD, HttpAttributes.HttpRequestMethodValues.GET)
.setSpanKind(SpanKind.CLIENT);

return spanBuilder;
}

static public void injectCurrentSpanTraceParentHeader(Stub clientStub)
{
if (clientStub != null)
{
injectCurrentSpanTraceParentHeader(clientStub._getServiceClient().getOptions());
}
}

static public void injectCurrentSpanTraceParentHeader(Options options)
{
if (options != null)
{
W3CTraceContextPropagator.getInstance().inject(Context.current(), options, Options::setProperty);
}
}

/**
* Performs all Otel initialization
*/
private void initOTel()
{
/*
* If using the OpenTelemetry SDK, you may want to instantiate the OpenTelemetry toprovide configuration, for example of Resource or Sampler. See OpenTelemetrySdk and OpenTelemetrySdk.builder for information on how to construct theSDK's OpenTelemetry implementation.
* WARNING: Due to the inherent complications around initialization order involving this classand its single global instance, we strongly recommend *not* using GlobalOpenTelemetry unless youhave a use-case that absolutely requires it. Please favor using instances of OpenTelemetrywherever possible.
* If you are using the OpenTelemetry javaagent, it is generally best to only callGlobalOpenTelemetry.get() once, and then pass the resulting reference where you need to use it.
*/
globalOTel = GlobalOpenTelemetry.get();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we handle defaults here? Should we check if defaults have been set and if not default to no-op and / or logging exporter?

}

public Tracer getWsClientTracer()
{
if (globalOTel == null)
initOTel();

return globalOTel.getTracer(PROJECT_NAME);
}
/**
* All instances of HPCCWsXYZClient should utilize this init function
* Attempts to establish the target HPCC build version and its container mode
Expand All @@ -175,36 +237,55 @@ private String getTargetHPCCBuildVersionString() throws Exception
*/
protected boolean initBaseWsClient(Connection connection, boolean fetchVersionAndContainerMode)
{
initOTel();

boolean success = true;
initErrMessage = "";
setActiveConnectionInfo(connection);

if (fetchVersionAndContainerMode)
{
try
Span fetchHPCCVerSpan = getWsClientSpanBuilder("FetchHPCCVersion").setSpanKind(SpanKind.INTERNAL).startSpan();
try (Scope scope = fetchHPCCVerSpan.makeCurrent())
{
targetHPCCBuildVersion = new Version(getTargetHPCCBuildVersionString());
try
{
targetHPCCBuildVersion = new Version(getTargetHPCCBuildVersionString());
}
catch (Exception e)
{
initErrMessage = "BaseHPCCWsClient: Could not stablish target HPCC bulid version, review all HPCC connection values";
if (!e.getLocalizedMessage().isEmpty())
initErrMessage = initErrMessage + "\n" + e.getLocalizedMessage();
success = false;
}
}
catch (Exception e)
finally
{
initErrMessage = "BaseHPCCWsClient: Could not stablish target HPCC bulid version, review all HPCC connection values";
if (!e.getLocalizedMessage().isEmpty())
initErrMessage = initErrMessage + "\n" + e.getLocalizedMessage();

success = false;
fetchHPCCVerSpan.setStatus(success ? StatusCode.OK : StatusCode.ERROR, initErrMessage);
fetchHPCCVerSpan.end();
}

try
Span fetchHPCCContainerMode = getWsClientSpanBuilder("FetchHPCCContainerMode").startSpan();
try (Scope scope = fetchHPCCContainerMode.makeCurrent())
{
targetsContainerizedHPCC = getTargetHPCCIsContainerized(wsconn);
try
{
targetsContainerizedHPCC = getTargetHPCCIsContainerized(wsconn);
}
catch (Exception e)
{
initErrMessage = initErrMessage + "\nBaseHPCCWsClient: Could not determine target HPCC Containerization mode, review all HPCC connection values";
if (!e.getLocalizedMessage().isEmpty())
initErrMessage = initErrMessage + "\n" + e.getLocalizedMessage();

success = false;
}
}
catch (Exception e)
finally
{
initErrMessage = initErrMessage + "\nBaseHPCCWsClient: Could not determine target HPCC Containerization mode, review all HPCC connection values";
if (!e.getLocalizedMessage().isEmpty())
initErrMessage = initErrMessage + "\n" + e.getLocalizedMessage();

success = false;
fetchHPCCContainerMode.setStatus(success ? StatusCode.OK : StatusCode.ERROR, initErrMessage);
fetchHPCCContainerMode.end();
}
}
if (!initErrMessage.isEmpty())
Expand Down Expand Up @@ -401,7 +482,10 @@ public String getInitError()
protected Stub verifyStub() throws Exception
{
if (stub != null)
{
injectCurrentSpanTraceParentHeader(stub);
return stub;
}
else
throw new Exception("WS Client Stub not available." + (hasInitError() ? "\n" + initErrMessage : ""));
}
Expand Down Expand Up @@ -687,6 +771,7 @@ protected void handleEspExceptions(ArrayOfEspExceptionWrapper exp, String messag
if (message != null && !message.isEmpty()) exp.setWsClientMessage(message);

log.error(exp.toString());

throw exp;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.hpccsystems.ws.client.utils.Utils;
import org.hpccsystems.ws.client.wrappers.ArrayOfECLExceptionWrapper;
import org.hpccsystems.ws.client.wrappers.gen.filespray.ProgressResponseWrapper;
import org.hpccsystems.ws.client.wrappers.gen.wstopology.TpGroupQueryResponseWrapper;
import org.hpccsystems.ws.client.wrappers.gen.wstopology.TpGroupWrapper;
import org.hpccsystems.ws.client.wrappers.wsworkunits.WorkunitWrapper;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@
import org.hpccsystems.ws.client.wrappers.wsworkunits.WorkunitWrapper;
import org.hpccsystems.ws.client.wrappers.wsworkunits.WsWorkunitsClientStubWrapper;

import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.context.Scope;

/**
* Facilitates ECL WorkUnit related actions.
*
Expand Down Expand Up @@ -303,7 +307,6 @@ public void fastWURefresh(WorkunitWrapper wu) throws Exception, ArrayOfEspExcept
request.setCount(1);

WUQueryResponse response = null;

try
{
response = ((WsWorkunits) stub).wUQuery(request);
Expand All @@ -326,7 +329,8 @@ public void fastWURefresh(WorkunitWrapper wu) throws Exception, ArrayOfEspExcept
{
ECLWorkunit[] eclWorkunit = response.getWorkunits().getECLWorkunit();

if (eclWorkunit != null && eclWorkunit.length == 1) wu.update(eclWorkunit[0]);
if (eclWorkunit != null && eclWorkunit.length == 1)
wu.update(eclWorkunit[0]);
}

if (previousState != getStateID(wu))
Expand Down Expand Up @@ -2551,18 +2555,27 @@ public List<QueryResultWrapper> deleteQueries(Set<String> querynames, String clu
*/
public boolean ping() throws Exception
{
verifyStub();

Ping request = new Ping();

try
Span span = getWsClientSpanBuilder("WsWUClient_Ping").startSpan();
try (Scope scope = span.makeCurrent())
{
((WsWorkunitsStub) stub).ping(request);
verifyStub(); // must be called within span scope for proper context propagation

Ping request = new Ping();
try
{
((WsWorkunitsStub) stub).ping(request);
span.setStatus(StatusCode.OK);
}
catch (Exception e)
{
span.recordException(e);
log.error(e.getLocalizedMessage());
return false;
}
}
catch (Exception e)
finally
{
log.error(e.getLocalizedMessage());
return false;
span.end();
}

return true;
Expand Down
Loading
Loading