Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/candidate-9.6.x'
Browse files Browse the repository at this point in the history
Signed-off-by: Gavin Halliday <[email protected]>

# Conflicts:
#	commons-hpcc/pom.xml
#	dfsclient/pom.xml
#	pom.xml
#	wsclient/pom.xml
  • Loading branch information
ghalliday committed Jun 7, 2024
2 parents a16004e + ce6160c commit 2e63d2c
Show file tree
Hide file tree
Showing 9 changed files with 541 additions and 247 deletions.
119 changes: 102 additions & 17 deletions wsclient/src/main/java/org/hpccsystems/ws/client/BaseHPCCWsClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,28 @@
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;

import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanBuilder;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.semconv.HttpAttributes;
import io.opentelemetry.semconv.ServerAttributes;

/**
* Defines functionality common to all HPCC Systmes web service clients.
* Defines functionality common to all HPCC Systems web service clients.
*
* Typically implemented by specialized HPCC Web service clients.
*/
public abstract class BaseHPCCWsClient extends DataSingleton
{
public static final String PROJECT_NAME = "WsClient";
private static OpenTelemetry globalOTel = null;
/** Constant <code>log</code> */
protected static final Logger log = LogManager.getLogger(BaseHPCCWsClient.class);
/** Constant <code>DEAFULTECLWATCHPORT="8010"</code> */
Expand Down Expand Up @@ -164,6 +179,53 @@ private String getTargetHPCCBuildVersionString() throws Exception

}

public SpanBuilder getWsClientSpanBuilder(String spanName)
{
SpanBuilder spanBuilder = getWsClientTracer().spanBuilder(spanName)
.setAttribute(ServerAttributes.SERVER_ADDRESS, wsconn.getHost())
.setAttribute(ServerAttributes.SERVER_PORT, Long.getLong(wsconn.getPort()))
.setAttribute(HttpAttributes.HTTP_REQUEST_METHOD, HttpAttributes.HttpRequestMethodValues.GET)
.setSpanKind(SpanKind.CLIENT);

return spanBuilder;
}

static public void injectCurrentSpanTraceParentHeader(Stub clientStub)
{
if (clientStub != null)
{
injectCurrentSpanTraceParentHeader(clientStub._getServiceClient().getOptions());
}
}

static public void injectCurrentSpanTraceParentHeader(Options options)
{
if (options != null)
{
W3CTraceContextPropagator.getInstance().inject(Context.current(), options, Options::setProperty);
}
}

/**
* Performs all Otel initialization
*/
private void initOTel()
{
/*
* If using the OpenTelemetry SDK, you may want to instantiate the OpenTelemetry toprovide configuration, for example of Resource or Sampler. See OpenTelemetrySdk and OpenTelemetrySdk.builder for information on how to construct theSDK's OpenTelemetry implementation.
* WARNING: Due to the inherent complications around initialization order involving this classand its single global instance, we strongly recommend *not* using GlobalOpenTelemetry unless youhave a use-case that absolutely requires it. Please favor using instances of OpenTelemetrywherever possible.
* If you are using the OpenTelemetry javaagent, it is generally best to only callGlobalOpenTelemetry.get() once, and then pass the resulting reference where you need to use it.
*/
globalOTel = GlobalOpenTelemetry.get();
}

public Tracer getWsClientTracer()
{
if (globalOTel == null)
initOTel();

return globalOTel.getTracer(PROJECT_NAME);
}
/**
* All instances of HPCCWsXYZClient should utilize this init function
* Attempts to establish the target HPCC build version and its container mode
Expand All @@ -175,36 +237,55 @@ private String getTargetHPCCBuildVersionString() throws Exception
*/
protected boolean initBaseWsClient(Connection connection, boolean fetchVersionAndContainerMode)
{
initOTel();

boolean success = true;
initErrMessage = "";
setActiveConnectionInfo(connection);

if (fetchVersionAndContainerMode)
{
try
Span fetchHPCCVerSpan = getWsClientSpanBuilder("FetchHPCCVersion").setSpanKind(SpanKind.INTERNAL).startSpan();
try (Scope scope = fetchHPCCVerSpan.makeCurrent())
{
targetHPCCBuildVersion = new Version(getTargetHPCCBuildVersionString());
try
{
targetHPCCBuildVersion = new Version(getTargetHPCCBuildVersionString());
}
catch (Exception e)
{
initErrMessage = "BaseHPCCWsClient: Could not stablish target HPCC bulid version, review all HPCC connection values";
if (!e.getLocalizedMessage().isEmpty())
initErrMessage = initErrMessage + "\n" + e.getLocalizedMessage();
success = false;
}
}
catch (Exception e)
finally
{
initErrMessage = "BaseHPCCWsClient: Could not stablish target HPCC bulid version, review all HPCC connection values";
if (!e.getLocalizedMessage().isEmpty())
initErrMessage = initErrMessage + "\n" + e.getLocalizedMessage();

success = false;
fetchHPCCVerSpan.setStatus(success ? StatusCode.OK : StatusCode.ERROR, initErrMessage);
fetchHPCCVerSpan.end();
}

try
Span fetchHPCCContainerMode = getWsClientSpanBuilder("FetchHPCCContainerMode").startSpan();
try (Scope scope = fetchHPCCContainerMode.makeCurrent())
{
targetsContainerizedHPCC = getTargetHPCCIsContainerized(wsconn);
try
{
targetsContainerizedHPCC = getTargetHPCCIsContainerized(wsconn);
}
catch (Exception e)
{
initErrMessage = initErrMessage + "\nBaseHPCCWsClient: Could not determine target HPCC Containerization mode, review all HPCC connection values";
if (!e.getLocalizedMessage().isEmpty())
initErrMessage = initErrMessage + "\n" + e.getLocalizedMessage();

success = false;
}
}
catch (Exception e)
finally
{
initErrMessage = initErrMessage + "\nBaseHPCCWsClient: Could not determine target HPCC Containerization mode, review all HPCC connection values";
if (!e.getLocalizedMessage().isEmpty())
initErrMessage = initErrMessage + "\n" + e.getLocalizedMessage();

success = false;
fetchHPCCContainerMode.setStatus(success ? StatusCode.OK : StatusCode.ERROR, initErrMessage);
fetchHPCCContainerMode.end();
}
}
if (!initErrMessage.isEmpty())
Expand Down Expand Up @@ -401,7 +482,10 @@ public String getInitError()
protected Stub verifyStub() throws Exception
{
if (stub != null)
{
injectCurrentSpanTraceParentHeader(stub);
return stub;
}
else
throw new Exception("WS Client Stub not available." + (hasInitError() ? "\n" + initErrMessage : ""));
}
Expand Down Expand Up @@ -687,6 +771,7 @@ protected void handleEspExceptions(ArrayOfEspExceptionWrapper exp, String messag
if (message != null && !message.isEmpty()) exp.setWsClientMessage(message);

log.error(exp.toString());

throw exp;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.hpccsystems.ws.client.utils.Utils;
import org.hpccsystems.ws.client.wrappers.ArrayOfECLExceptionWrapper;
import org.hpccsystems.ws.client.wrappers.gen.filespray.ProgressResponseWrapper;
import org.hpccsystems.ws.client.wrappers.gen.wstopology.TpGroupQueryResponseWrapper;
import org.hpccsystems.ws.client.wrappers.gen.wstopology.TpGroupWrapper;
import org.hpccsystems.ws.client.wrappers.wsworkunits.WorkunitWrapper;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@
import org.hpccsystems.ws.client.wrappers.wsworkunits.WorkunitWrapper;
import org.hpccsystems.ws.client.wrappers.wsworkunits.WsWorkunitsClientStubWrapper;

import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.context.Scope;

/**
* Facilitates ECL WorkUnit related actions.
*
Expand Down Expand Up @@ -303,7 +307,6 @@ public void fastWURefresh(WorkunitWrapper wu) throws Exception, ArrayOfEspExcept
request.setCount(1);

WUQueryResponse response = null;

try
{
response = ((WsWorkunits) stub).wUQuery(request);
Expand All @@ -326,7 +329,8 @@ public void fastWURefresh(WorkunitWrapper wu) throws Exception, ArrayOfEspExcept
{
ECLWorkunit[] eclWorkunit = response.getWorkunits().getECLWorkunit();

if (eclWorkunit != null && eclWorkunit.length == 1) wu.update(eclWorkunit[0]);
if (eclWorkunit != null && eclWorkunit.length == 1)
wu.update(eclWorkunit[0]);
}

if (previousState != getStateID(wu))
Expand Down Expand Up @@ -2551,18 +2555,27 @@ public List<QueryResultWrapper> deleteQueries(Set<String> querynames, String clu
*/
public boolean ping() throws Exception
{
verifyStub();

Ping request = new Ping();

try
Span span = getWsClientSpanBuilder("WsWUClient_Ping").startSpan();
try (Scope scope = span.makeCurrent())
{
((WsWorkunitsStub) stub).ping(request);
verifyStub(); // must be called within span scope for proper context propagation

Ping request = new Ping();
try
{
((WsWorkunitsStub) stub).ping(request);
span.setStatus(StatusCode.OK);
}
catch (Exception e)
{
span.recordException(e);
log.error(e.getLocalizedMessage());
return false;
}
}
catch (Exception e)
finally
{
log.error(e.getLocalizedMessage());
return false;
span.end();
}

return true;
Expand Down
110 changes: 83 additions & 27 deletions wsclient/src/main/java/org/hpccsystems/ws/client/utils/Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,20 @@
import java.util.Base64;
import java.util.Base64.Decoder;
import java.util.Base64.Encoder;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.hpccsystems.ws.client.BaseHPCCWsClient;

import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.context.Scope;
import io.opentelemetry.semconv.HttpAttributes;
import io.opentelemetry.semconv.ServerAttributes;

/**
* Represents and structures connection information.
Expand Down Expand Up @@ -205,6 +216,9 @@ public int hashCode()
private StringBuffer baseUrl;
private StringBuffer uriAndParams;

// Note: this pattern is very basic and is only meant to extract hostnames from URLs
public final static Pattern URL_HOSTNAME_PATTERN = Pattern.compile("((https?|ftp|file):\\/\\/)?(?<hostname>([\\da-z\\.\\-_]+)(\\.[a-z\\.]{2,6})?)(:\\d{2,6})?.*");

/** Constant <code>CONNECT_TIMEOUT_PARAM="connecttimeoutmillis"</code> */
final static public String CONNECT_TIMEOUT_PARAM = "connecttimeoutmillis";
/** Constant <code>READ_TIMEOUT_PARAM="readtimeoutmillis"</code> */
Expand Down Expand Up @@ -287,7 +301,27 @@ public static boolean isSslProtocol(String protocol)
*/
public Connection(String connectionstring) throws MalformedURLException
{
URL theurl = new URL(connectionstring);
URL theurl = null;
try
{
theurl = new URL(connectionstring);
}
catch (MalformedURLException e)
{
Matcher matcher = URL_HOSTNAME_PATTERN.matcher(connectionstring);
if (matcher.matches())
{
String hostName = matcher.group("hostname");
if (hostName.contains("_"))
{
throw new MalformedURLException("Invalid URL: Hostname contains invalid underscores: '" + connectionstring + "': " + e.getMessage());
}
}
else
{
throw e;
}
}

setProtocol(theurl.getProtocol());

Expand Down Expand Up @@ -1028,39 +1062,61 @@ public String sendHTTPRequest(String uri, String method) throws Exception

URL url = new URL (getBaseUrl() + (uri != null && uri.startsWith("/") ? "" : "/") + uri);

HttpURLConnection httpURLConnection = (HttpURLConnection) url.openConnection(); //throws IOException

Connection.log.info("Sending HTTP " + method + "Request to:" + url.toString());
Span sendHTTPReqSpan = GlobalOpenTelemetry.get().getTracer(BaseHPCCWsClient.PROJECT_NAME)
.spanBuilder(method.toUpperCase() + " " + url.toExternalForm())
.setAttribute(ServerAttributes.SERVER_ADDRESS, getHost())
.setAttribute(ServerAttributes.SERVER_PORT, Long.getLong(getPort()))
.setAttribute(HttpAttributes.HTTP_REQUEST_METHOD, method)
.setSpanKind(SpanKind.CLIENT)
.startSpan();

if (hasCredentials())
{
httpURLConnection.setRequestProperty("Authorization", getBasicAuthString());
}
HttpURLConnection httpURLConnection = (HttpURLConnection) url.openConnection(); //throws IOException

httpURLConnection.setRequestMethod(method); //throws ProtocolException
Connection.log.info("Sending HTTP " + method + "Request to:" + url.toString());

int responseCode = httpURLConnection.getResponseCode(); //throws IOException
if (hasCredentials())
{
httpURLConnection.setRequestProperty("Authorization", getBasicAuthString());
sendHTTPReqSpan.setAttribute("hasCredentials", true);
}
else
{
sendHTTPReqSpan.setAttribute("hasCredentials", false);
}

Connection.log.info("HTTP Response code: " + responseCode);
try (Scope scope = sendHTTPReqSpan.makeCurrent())
{
httpURLConnection.setRequestProperty("traceparent", Utils.getCurrentSpanTraceParentHeader());
httpURLConnection.setRequestMethod(method); //throws ProtocolException

if (responseCode == HttpURLConnection.HTTP_OK) //success
{
BufferedReader in = new BufferedReader(new InputStreamReader(httpURLConnection.getInputStream())); //throws IOException
String inputLine;
StringBuffer response = new StringBuffer();
int responseCode = httpURLConnection.getResponseCode(); //throws IOException
sendHTTPReqSpan.setAttribute("http.response.status_code", responseCode);
Connection.log.info("HTTP Response code: " + responseCode);

while ((inputLine = in.readLine()) != null) // throws IOException
{
response.append(inputLine);
}
if (responseCode == HttpURLConnection.HTTP_OK) //success
{
BufferedReader in = new BufferedReader(new InputStreamReader(httpURLConnection.getInputStream())); //throws IOException
String inputLine;
StringBuffer response = new StringBuffer();

in.close(); //throws IOException
while ((inputLine = in.readLine()) != null) // throws IOException
{
response.append(inputLine);
}

return response.toString();
}
else
{
throw new IOException("HTTP request failed! Code (" + responseCode + ") " + httpURLConnection.getResponseMessage() );
}
in.close(); //throws IOException
sendHTTPReqSpan.setStatus(StatusCode.OK);
return response.toString();
}
else
{
sendHTTPReqSpan.setStatus(StatusCode.ERROR);
throw new IOException("HTTP request failed! Code (" + responseCode + ") " + httpURLConnection.getResponseMessage() );
}
}
finally
{
sendHTTPReqSpan.end();
}
}
}
Loading

0 comments on commit 2e63d2c

Please sign in to comment.