diff --git a/wsclient/src/main/java/org/hpccsystems/ws/client/BaseHPCCWsClient.java b/wsclient/src/main/java/org/hpccsystems/ws/client/BaseHPCCWsClient.java
index c26f75ce4..0dfe94b5e 100644
--- a/wsclient/src/main/java/org/hpccsystems/ws/client/BaseHPCCWsClient.java
+++ b/wsclient/src/main/java/org/hpccsystems/ws/client/BaseHPCCWsClient.java
@@ -40,13 +40,28 @@
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
+import io.opentelemetry.api.GlobalOpenTelemetry;
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.api.trace.SpanBuilder;
+import io.opentelemetry.api.trace.SpanKind;
+import io.opentelemetry.api.trace.StatusCode;
+import io.opentelemetry.api.trace.Tracer;
+import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator;
+import io.opentelemetry.context.Context;
+import io.opentelemetry.context.Scope;
+import io.opentelemetry.semconv.HttpAttributes;
+import io.opentelemetry.semconv.ServerAttributes;
+
/**
- * Defines functionality common to all HPCC Systmes web service clients.
+ * Defines functionality common to all HPCC Systems web service clients.
*
* Typically implemented by specialized HPCC Web service clients.
*/
public abstract class BaseHPCCWsClient extends DataSingleton
{
+ public static final String PROJECT_NAME = "WsClient";
+ private static OpenTelemetry globalOTel = null;
/** Constant log
*/
protected static final Logger log = LogManager.getLogger(BaseHPCCWsClient.class);
/** Constant DEAFULTECLWATCHPORT="8010"
*/
@@ -164,6 +179,53 @@ private String getTargetHPCCBuildVersionString() throws Exception
}
+ public SpanBuilder getWsClientSpanBuilder(String spanName)
+ {
+ SpanBuilder spanBuilder = getWsClientTracer().spanBuilder(spanName)
+ .setAttribute(ServerAttributes.SERVER_ADDRESS, wsconn.getHost())
+ .setAttribute(ServerAttributes.SERVER_PORT, Long.getLong(wsconn.getPort()))
+ .setAttribute(HttpAttributes.HTTP_REQUEST_METHOD, HttpAttributes.HttpRequestMethodValues.GET)
+ .setSpanKind(SpanKind.CLIENT);
+
+ return spanBuilder;
+ }
+
+ static public void injectCurrentSpanTraceParentHeader(Stub clientStub)
+ {
+ if (clientStub != null)
+ {
+ injectCurrentSpanTraceParentHeader(clientStub._getServiceClient().getOptions());
+ }
+ }
+
+ static public void injectCurrentSpanTraceParentHeader(Options options)
+ {
+ if (options != null)
+ {
+ W3CTraceContextPropagator.getInstance().inject(Context.current(), options, Options::setProperty);
+ }
+ }
+
+ /**
+ * Performs all Otel initialization
+ */
+ private void initOTel()
+ {
+ /*
+ * If using the OpenTelemetry SDK, you may want to instantiate the OpenTelemetry toprovide configuration, for example of Resource or Sampler. See OpenTelemetrySdk and OpenTelemetrySdk.builder for information on how to construct theSDK's OpenTelemetry implementation.
+ * WARNING: Due to the inherent complications around initialization order involving this classand its single global instance, we strongly recommend *not* using GlobalOpenTelemetry unless youhave a use-case that absolutely requires it. Please favor using instances of OpenTelemetrywherever possible.
+ * If you are using the OpenTelemetry javaagent, it is generally best to only callGlobalOpenTelemetry.get() once, and then pass the resulting reference where you need to use it.
+ */
+ globalOTel = GlobalOpenTelemetry.get();
+ }
+
+ public Tracer getWsClientTracer()
+ {
+ if (globalOTel == null)
+ initOTel();
+
+ return globalOTel.getTracer(PROJECT_NAME);
+ }
/**
* All instances of HPCCWsXYZClient should utilize this init function
* Attempts to establish the target HPCC build version and its container mode
@@ -175,36 +237,55 @@ private String getTargetHPCCBuildVersionString() throws Exception
*/
protected boolean initBaseWsClient(Connection connection, boolean fetchVersionAndContainerMode)
{
+ initOTel();
+
boolean success = true;
initErrMessage = "";
setActiveConnectionInfo(connection);
if (fetchVersionAndContainerMode)
{
- try
+ Span fetchHPCCVerSpan = getWsClientSpanBuilder("FetchHPCCVersion").setSpanKind(SpanKind.INTERNAL).startSpan();
+ try (Scope scope = fetchHPCCVerSpan.makeCurrent())
{
- targetHPCCBuildVersion = new Version(getTargetHPCCBuildVersionString());
+ try
+ {
+ targetHPCCBuildVersion = new Version(getTargetHPCCBuildVersionString());
+ }
+ catch (Exception e)
+ {
+ initErrMessage = "BaseHPCCWsClient: Could not stablish target HPCC bulid version, review all HPCC connection values";
+ if (!e.getLocalizedMessage().isEmpty())
+ initErrMessage = initErrMessage + "\n" + e.getLocalizedMessage();
+ success = false;
+ }
}
- catch (Exception e)
+ finally
{
- initErrMessage = "BaseHPCCWsClient: Could not stablish target HPCC bulid version, review all HPCC connection values";
- if (!e.getLocalizedMessage().isEmpty())
- initErrMessage = initErrMessage + "\n" + e.getLocalizedMessage();
-
- success = false;
+ fetchHPCCVerSpan.setStatus(success ? StatusCode.OK : StatusCode.ERROR, initErrMessage);
+ fetchHPCCVerSpan.end();
}
- try
+ Span fetchHPCCContainerMode = getWsClientSpanBuilder("FetchHPCCContainerMode").startSpan();
+ try (Scope scope = fetchHPCCContainerMode.makeCurrent())
{
- targetsContainerizedHPCC = getTargetHPCCIsContainerized(wsconn);
+ try
+ {
+ targetsContainerizedHPCC = getTargetHPCCIsContainerized(wsconn);
+ }
+ catch (Exception e)
+ {
+ initErrMessage = initErrMessage + "\nBaseHPCCWsClient: Could not determine target HPCC Containerization mode, review all HPCC connection values";
+ if (!e.getLocalizedMessage().isEmpty())
+ initErrMessage = initErrMessage + "\n" + e.getLocalizedMessage();
+
+ success = false;
+ }
}
- catch (Exception e)
+ finally
{
- initErrMessage = initErrMessage + "\nBaseHPCCWsClient: Could not determine target HPCC Containerization mode, review all HPCC connection values";
- if (!e.getLocalizedMessage().isEmpty())
- initErrMessage = initErrMessage + "\n" + e.getLocalizedMessage();
-
- success = false;
+ fetchHPCCContainerMode.setStatus(success ? StatusCode.OK : StatusCode.ERROR, initErrMessage);
+ fetchHPCCContainerMode.end();
}
}
if (!initErrMessage.isEmpty())
@@ -401,7 +482,10 @@ public String getInitError()
protected Stub verifyStub() throws Exception
{
if (stub != null)
+ {
+ injectCurrentSpanTraceParentHeader(stub);
return stub;
+ }
else
throw new Exception("WS Client Stub not available." + (hasInitError() ? "\n" + initErrMessage : ""));
}
@@ -687,6 +771,7 @@ protected void handleEspExceptions(ArrayOfEspExceptionWrapper exp, String messag
if (message != null && !message.isEmpty()) exp.setWsClientMessage(message);
log.error(exp.toString());
+
throw exp;
}
diff --git a/wsclient/src/main/java/org/hpccsystems/ws/client/HPCCWsClient.java b/wsclient/src/main/java/org/hpccsystems/ws/client/HPCCWsClient.java
index e5a7de7da..0979c8d85 100644
--- a/wsclient/src/main/java/org/hpccsystems/ws/client/HPCCWsClient.java
+++ b/wsclient/src/main/java/org/hpccsystems/ws/client/HPCCWsClient.java
@@ -19,7 +19,6 @@
import org.hpccsystems.ws.client.utils.Utils;
import org.hpccsystems.ws.client.wrappers.ArrayOfECLExceptionWrapper;
import org.hpccsystems.ws.client.wrappers.gen.filespray.ProgressResponseWrapper;
-import org.hpccsystems.ws.client.wrappers.gen.wstopology.TpGroupQueryResponseWrapper;
import org.hpccsystems.ws.client.wrappers.gen.wstopology.TpGroupWrapper;
import org.hpccsystems.ws.client.wrappers.wsworkunits.WorkunitWrapper;
diff --git a/wsclient/src/main/java/org/hpccsystems/ws/client/HPCCWsWorkUnitsClient.java b/wsclient/src/main/java/org/hpccsystems/ws/client/HPCCWsWorkUnitsClient.java
index 829644a84..92a27d690 100644
--- a/wsclient/src/main/java/org/hpccsystems/ws/client/HPCCWsWorkUnitsClient.java
+++ b/wsclient/src/main/java/org/hpccsystems/ws/client/HPCCWsWorkUnitsClient.java
@@ -100,6 +100,10 @@
import org.hpccsystems.ws.client.wrappers.wsworkunits.WorkunitWrapper;
import org.hpccsystems.ws.client.wrappers.wsworkunits.WsWorkunitsClientStubWrapper;
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.api.trace.StatusCode;
+import io.opentelemetry.context.Scope;
+
/**
* Facilitates ECL WorkUnit related actions.
*
@@ -303,7 +307,6 @@ public void fastWURefresh(WorkunitWrapper wu) throws Exception, ArrayOfEspExcept
request.setCount(1);
WUQueryResponse response = null;
-
try
{
response = ((WsWorkunits) stub).wUQuery(request);
@@ -326,7 +329,8 @@ public void fastWURefresh(WorkunitWrapper wu) throws Exception, ArrayOfEspExcept
{
ECLWorkunit[] eclWorkunit = response.getWorkunits().getECLWorkunit();
- if (eclWorkunit != null && eclWorkunit.length == 1) wu.update(eclWorkunit[0]);
+ if (eclWorkunit != null && eclWorkunit.length == 1)
+ wu.update(eclWorkunit[0]);
}
if (previousState != getStateID(wu))
@@ -2551,18 +2555,27 @@ public List deleteQueries(Set querynames, String clu
*/
public boolean ping() throws Exception
{
- verifyStub();
-
- Ping request = new Ping();
-
- try
+ Span span = getWsClientSpanBuilder("WsWUClient_Ping").startSpan();
+ try (Scope scope = span.makeCurrent())
{
- ((WsWorkunitsStub) stub).ping(request);
+ verifyStub(); // must be called within span scope for proper context propagation
+
+ Ping request = new Ping();
+ try
+ {
+ ((WsWorkunitsStub) stub).ping(request);
+ span.setStatus(StatusCode.OK);
+ }
+ catch (Exception e)
+ {
+ span.recordException(e);
+ log.error(e.getLocalizedMessage());
+ return false;
+ }
}
- catch (Exception e)
+ finally
{
- log.error(e.getLocalizedMessage());
- return false;
+ span.end();
}
return true;
diff --git a/wsclient/src/main/java/org/hpccsystems/ws/client/utils/Connection.java b/wsclient/src/main/java/org/hpccsystems/ws/client/utils/Connection.java
index a28da183d..c5acac2e7 100644
--- a/wsclient/src/main/java/org/hpccsystems/ws/client/utils/Connection.java
+++ b/wsclient/src/main/java/org/hpccsystems/ws/client/utils/Connection.java
@@ -12,9 +12,20 @@
import java.util.Base64;
import java.util.Base64.Decoder;
import java.util.Base64.Encoder;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import org.hpccsystems.ws.client.BaseHPCCWsClient;
+
+import io.opentelemetry.api.GlobalOpenTelemetry;
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.api.trace.SpanKind;
+import io.opentelemetry.api.trace.StatusCode;
+import io.opentelemetry.context.Scope;
+import io.opentelemetry.semconv.HttpAttributes;
+import io.opentelemetry.semconv.ServerAttributes;
/**
* Represents and structures connection information.
@@ -205,6 +216,9 @@ public int hashCode()
private StringBuffer baseUrl;
private StringBuffer uriAndParams;
+ // Note: this pattern is very basic and is only meant to extract hostnames from URLs
+ public final static Pattern URL_HOSTNAME_PATTERN = Pattern.compile("((https?|ftp|file):\\/\\/)?(?([\\da-z\\.\\-_]+)(\\.[a-z\\.]{2,6})?)(:\\d{2,6})?.*");
+
/** Constant CONNECT_TIMEOUT_PARAM="connecttimeoutmillis"
*/
final static public String CONNECT_TIMEOUT_PARAM = "connecttimeoutmillis";
/** Constant READ_TIMEOUT_PARAM="readtimeoutmillis"
*/
@@ -287,7 +301,27 @@ public static boolean isSslProtocol(String protocol)
*/
public Connection(String connectionstring) throws MalformedURLException
{
- URL theurl = new URL(connectionstring);
+ URL theurl = null;
+ try
+ {
+ theurl = new URL(connectionstring);
+ }
+ catch (MalformedURLException e)
+ {
+ Matcher matcher = URL_HOSTNAME_PATTERN.matcher(connectionstring);
+ if (matcher.matches())
+ {
+ String hostName = matcher.group("hostname");
+ if (hostName.contains("_"))
+ {
+ throw new MalformedURLException("Invalid URL: Hostname contains invalid underscores: '" + connectionstring + "': " + e.getMessage());
+ }
+ }
+ else
+ {
+ throw e;
+ }
+ }
setProtocol(theurl.getProtocol());
@@ -1028,39 +1062,61 @@ public String sendHTTPRequest(String uri, String method) throws Exception
URL url = new URL (getBaseUrl() + (uri != null && uri.startsWith("/") ? "" : "/") + uri);
- HttpURLConnection httpURLConnection = (HttpURLConnection) url.openConnection(); //throws IOException
-
- Connection.log.info("Sending HTTP " + method + "Request to:" + url.toString());
+ Span sendHTTPReqSpan = GlobalOpenTelemetry.get().getTracer(BaseHPCCWsClient.PROJECT_NAME)
+ .spanBuilder(method.toUpperCase() + " " + url.toExternalForm())
+ .setAttribute(ServerAttributes.SERVER_ADDRESS, getHost())
+ .setAttribute(ServerAttributes.SERVER_PORT, Long.getLong(getPort()))
+ .setAttribute(HttpAttributes.HTTP_REQUEST_METHOD, method)
+ .setSpanKind(SpanKind.CLIENT)
+ .startSpan();
- if (hasCredentials())
- {
- httpURLConnection.setRequestProperty("Authorization", getBasicAuthString());
- }
+ HttpURLConnection httpURLConnection = (HttpURLConnection) url.openConnection(); //throws IOException
- httpURLConnection.setRequestMethod(method); //throws ProtocolException
+ Connection.log.info("Sending HTTP " + method + "Request to:" + url.toString());
- int responseCode = httpURLConnection.getResponseCode(); //throws IOException
+ if (hasCredentials())
+ {
+ httpURLConnection.setRequestProperty("Authorization", getBasicAuthString());
+ sendHTTPReqSpan.setAttribute("hasCredentials", true);
+ }
+ else
+ {
+ sendHTTPReqSpan.setAttribute("hasCredentials", false);
+ }
- Connection.log.info("HTTP Response code: " + responseCode);
+ try (Scope scope = sendHTTPReqSpan.makeCurrent())
+ {
+ httpURLConnection.setRequestProperty("traceparent", Utils.getCurrentSpanTraceParentHeader());
+ httpURLConnection.setRequestMethod(method); //throws ProtocolException
- if (responseCode == HttpURLConnection.HTTP_OK) //success
- {
- BufferedReader in = new BufferedReader(new InputStreamReader(httpURLConnection.getInputStream())); //throws IOException
- String inputLine;
- StringBuffer response = new StringBuffer();
+ int responseCode = httpURLConnection.getResponseCode(); //throws IOException
+ sendHTTPReqSpan.setAttribute("http.response.status_code", responseCode);
+ Connection.log.info("HTTP Response code: " + responseCode);
- while ((inputLine = in.readLine()) != null) // throws IOException
- {
- response.append(inputLine);
- }
+ if (responseCode == HttpURLConnection.HTTP_OK) //success
+ {
+ BufferedReader in = new BufferedReader(new InputStreamReader(httpURLConnection.getInputStream())); //throws IOException
+ String inputLine;
+ StringBuffer response = new StringBuffer();
- in.close(); //throws IOException
+ while ((inputLine = in.readLine()) != null) // throws IOException
+ {
+ response.append(inputLine);
+ }
- return response.toString();
- }
- else
- {
- throw new IOException("HTTP request failed! Code (" + responseCode + ") " + httpURLConnection.getResponseMessage() );
- }
+ in.close(); //throws IOException
+ sendHTTPReqSpan.setStatus(StatusCode.OK);
+ return response.toString();
+ }
+ else
+ {
+ sendHTTPReqSpan.setStatus(StatusCode.ERROR);
+ throw new IOException("HTTP request failed! Code (" + responseCode + ") " + httpURLConnection.getResponseMessage() );
+ }
+ }
+ finally
+ {
+ sendHTTPReqSpan.end();
+ }
}
}
diff --git a/wsclient/src/main/java/org/hpccsystems/ws/client/utils/Utils.java b/wsclient/src/main/java/org/hpccsystems/ws/client/utils/Utils.java
index df7a56162..bf71bc6e9 100644
--- a/wsclient/src/main/java/org/hpccsystems/ws/client/utils/Utils.java
+++ b/wsclient/src/main/java/org/hpccsystems/ws/client/utils/Utils.java
@@ -16,7 +16,9 @@
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import javax.xml.XMLConstants;
import javax.xml.parsers.DocumentBuilder;
@@ -39,6 +41,11 @@
import org.w3c.dom.NodeList;
import org.xml.sax.SAXException;
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator;
+import io.opentelemetry.context.Context;
+import io.opentelemetry.context.propagation.TextMapSetter;
+
/**
* Provides multiple functions which support HPCCWsClient actions.
*
@@ -1121,4 +1128,25 @@ public static String trimTrailing(String originalStr)
return originalStr.substring(0,strIndex+1);
}
+
+ /**
+ * Returns traceparent value for Open Telemetry based context propagation
+ * @return traceparent of current span if valid, otherwise invalid traceparent header value
+ */
+ static public String getCurrentSpanTraceParentHeader()
+ {
+ String traceparent = null;
+ Span currentSpan = Span.current();
+ if (currentSpan != null && currentSpan.getSpanContext().isValid())
+ {
+ Map carrier = new HashMap<>();
+ TextMapSetter