diff --git a/pom.xml b/pom.xml index 4762a4faf..643ee26ff 100644 --- a/pom.xml +++ b/pom.xml @@ -63,6 +63,7 @@ org.hpccsystems.commons.annotations.BaseTests 1.0.0 false + 2.4.0-alpha @@ -99,7 +100,48 @@ + + + + io.opentelemetry + opentelemetry-bom + 1.38.0 + pom + import + + + + + io.opentelemetry + opentelemetry-api + + + io.opentelemetry + opentelemetry-sdk + + + io.opentelemetry + opentelemetry-exporter-logging + + + io.opentelemetry + opentelemetry-sdk-extension-autoconfigure + + + io.opentelemetry + opentelemetry-sdk-extension-autoconfigure-spi + + + io.opentelemetry + opentelemetry-exporter-otlp + + + + io.opentelemetry.semconv + opentelemetry-semconv + 1.25.0-alpha + junit junit diff --git a/wsclient/src/main/java/org/hpccsystems/ws/client/BaseHPCCWsClient.java b/wsclient/src/main/java/org/hpccsystems/ws/client/BaseHPCCWsClient.java index 2a856a5a2..58a5f3a21 100644 --- a/wsclient/src/main/java/org/hpccsystems/ws/client/BaseHPCCWsClient.java +++ b/wsclient/src/main/java/org/hpccsystems/ws/client/BaseHPCCWsClient.java @@ -4,6 +4,11 @@ import java.net.MalformedURLException; import java.net.URL; import java.nio.charset.StandardCharsets; +import java.rmi.RemoteException; +import java.text.Format; +import java.util.HashMap; +import java.util.Map; +import java.util.function.IntConsumer; import javax.xml.parsers.DocumentBuilder; import javax.xml.parsers.ParserConfigurationException; @@ -27,6 +32,11 @@ import org.apache.http.impl.client.HttpClientBuilder; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.hpccsystems.ws.client.gen.axis2.wsworkunits.latest.ECLWorkunit; +import org.hpccsystems.ws.client.gen.axis2.wsworkunits.latest.EspSoapFault; +import org.hpccsystems.ws.client.gen.axis2.wsworkunits.latest.WUQuery; +import org.hpccsystems.ws.client.gen.axis2.wsworkunits.latest.WUQueryResponse; +import org.hpccsystems.ws.client.gen.axis2.wsworkunits.latest.WsWorkunits; import org.hpccsystems.ws.client.platform.Version; import org.hpccsystems.ws.client.utils.Connection; import org.hpccsystems.ws.client.utils.DataSingleton; @@ -36,10 +46,36 @@ import org.hpccsystems.ws.client.wrappers.ArrayOfECLExceptionWrapper; import org.hpccsystems.ws.client.wrappers.ArrayOfEspExceptionWrapper; import org.hpccsystems.ws.client.wrappers.EspSoapFaultWrapper; +import org.hpccsystems.ws.client.wrappers.WUState; +import org.hpccsystems.ws.client.wrappers.wsworkunits.WorkunitWrapper; import org.w3c.dom.Document; import org.w3c.dom.Node; import org.w3c.dom.NodeList; +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanBuilder; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import io.opentelemetry.context.propagation.ContextPropagators; +import io.opentelemetry.context.propagation.TextMapPropagator; +import io.opentelemetry.context.propagation.TextMapSetter; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk; +import io.opentelemetry.sdk.metrics.SdkMeterProvider; +import io.opentelemetry.sdk.resources.Resource; +import io.opentelemetry.sdk.trace.SdkTracerProvider; +import io.opentelemetry.sdk.trace.export.BatchSpanProcessor; +import io.opentelemetry.sdk.trace.samplers.Sampler; +import io.opentelemetry.semconv.HttpAttributes; +import io.opentelemetry.semconv.ServerAttributes; + /** * Defines functionality common to all HPCC Systmes web service clients. * @@ -47,6 +83,8 @@ */ public abstract class BaseHPCCWsClient extends DataSingleton { + public static final String INSTRUMENTED_LIBRARY_NAME = "WsClient"; + private Tracer tracer = null; /** Constant log */ protected static final Logger log = LogManager.getLogger(BaseHPCCWsClient.class); /** Constant DEAFULTECLWATCHPORT="8010" */ @@ -164,6 +202,89 @@ private String getTargetHPCCBuildVersionString() throws Exception } + public SpanBuilder getSpanBuilder(String spanName) + { + Tracer tracer = getTracer(INSTRUMENTED_LIBRARY_NAME); + SpanBuilder spanBuilder = tracer.spanBuilder(spanName) + .setAttribute(ServerAttributes.SERVER_ADDRESS, wsconn.getHost()) + .setAttribute(ServerAttributes.SERVER_PORT, Long.getLong(wsconn.getPort())) + .setAttribute(HttpAttributes.HTTP_REQUEST_METHOD, HttpAttributes.HttpRequestMethodValues.GET) + .setSpanKind(SpanKind.CLIENT); + //.startSpan(); + + return spanBuilder; + } + + public void injectTraceParentHeader(Span currentSpan) + { + injectTraceParentHeader(stub._getServiceClient().getOptions(), currentSpan); + } + + static public void injectTraceParentHeader(Options options, Span currentSpan) + { + if (options != null) + { + if (currentSpan != null && currentSpan.getSpanContext().isValid()) + W3CTraceContextPropagator.getInstance().inject(Context.current(), options, Options::setProperty); + } + } + + static public void injectCurrentSpanTraceParentHeader(Stub clientStub) + { + if (clientStub != null) + { + injectCurrentSpanTraceParentHeader(clientStub._getServiceClient().getOptions()); + } + } + + static public void injectCurrentSpanTraceParentHeader(Options options) + { + if (options != null) + { + injectTraceParentHeader(options, Span.current()); + } + } + + static public String getTraceParentHeader() + { + String traceparent = null; + Span currentSpan = Span.current(); + if (currentSpan != null && currentSpan.getSpanContext().isValid()) + { + Map carrier = new HashMap<>(); + TextMapSetter> setter = Map::put; + W3CTraceContextPropagator.getInstance().inject(Context.current(), carrier, setter); + + traceparent = carrier.getOrDefault("traceparent", "00-" + currentSpan.getSpanContext().getTraceId() + "-" + currentSpan.getSpanContext().getSpanId() + "-00"); + carrier.clear(); + } + + return traceparent; + } + + private void initOtel(String tracerName) + { + if (GlobalOpenTelemetry.get() == null) + { + GlobalOpenTelemetry.set(AutoConfiguredOpenTelemetrySdk.initialize().getOpenTelemetrySdk()); + /*if we want to overwrite auto configuration: + openTelemetry = AutoConfiguredOpenTelemetrySdk.builder() + .addTracerProviderCustomizer( + (sdkTracerProviderBuilder, configProperties) -> + sdkTracerProviderBuilder.addSpanProcessor( + new SpanProcessor() { // implementation omitted for brevity + })).build(); + */ + } + tracer = GlobalOpenTelemetry.getTracer(tracerName); + } + + public Tracer getTracer(String tracerName) + { + if (tracer == null) + initOtel(tracerName); + return tracer; + } /** * All instances of HPCCWsXYZClient should utilize this init function * Attempts to establish the target HPCC build version and its container mode @@ -175,36 +296,54 @@ private String getTargetHPCCBuildVersionString() throws Exception */ protected boolean initBaseWsClient(Connection connection, boolean fetchVersionAndContainerMode) { + initOtel(INSTRUMENTED_LIBRARY_NAME); + boolean success = true; initErrMessage = ""; setActiveConnectionInfo(connection); if (fetchVersionAndContainerMode) { - try + Span fetchHPCCVerSpan = getSpanBuilder("FetchHPCCVersion").startSpan(); + try (Scope scope = fetchHPCCVerSpan.makeCurrent()) { - targetHPCCBuildVersion = new Version(getTargetHPCCBuildVersionString()); + try + { + targetHPCCBuildVersion = new Version(getTargetHPCCBuildVersionString()); + } + catch (Exception e) + { + initErrMessage = "BaseHPCCWsClient: Could not stablish target HPCC bulid version, review all HPCC connection values"; + if (!e.getLocalizedMessage().isEmpty()) + initErrMessage = initErrMessage + "\n" + e.getLocalizedMessage(); + success = false; + } } - catch (Exception e) + finally { - initErrMessage = "BaseHPCCWsClient: Could not stablish target HPCC bulid version, review all HPCC connection values"; - if (!e.getLocalizedMessage().isEmpty()) - initErrMessage = initErrMessage + "\n" + e.getLocalizedMessage(); - - success = false; + fetchHPCCVerSpan.setStatus(success ? StatusCode.OK : StatusCode.ERROR, initErrMessage); + fetchHPCCVerSpan.end(); } - - try + Span fetchHPCCContainerMode = getSpanBuilder("FetchHPCCContainerMode").startSpan(); + try (Scope scope = fetchHPCCContainerMode.makeCurrent()) { - targetsContainerizedHPCC = getTargetHPCCIsContainerized(wsconn); + try + { + targetsContainerizedHPCC = getTargetHPCCIsContainerized(wsconn); + } + catch (Exception e) + { + initErrMessage = initErrMessage + "\nBaseHPCCWsClient: Could not determine target HPCC Containerization mode, review all HPCC connection values"; + if (!e.getLocalizedMessage().isEmpty()) + initErrMessage = initErrMessage + "\n" + e.getLocalizedMessage(); + + success = false; + } } - catch (Exception e) + finally { - initErrMessage = initErrMessage + "\nBaseHPCCWsClient: Could not determine target HPCC Containerization mode, review all HPCC connection values"; - if (!e.getLocalizedMessage().isEmpty()) - initErrMessage = initErrMessage + "\n" + e.getLocalizedMessage(); - - success = false; + fetchHPCCContainerMode.setStatus(success ? StatusCode.OK : StatusCode.ERROR, initErrMessage); + fetchHPCCContainerMode.end(); } } if (!initErrMessage.isEmpty()) @@ -401,7 +540,12 @@ public String getInitError() protected Stub verifyStub() throws Exception { if (stub != null) + { + Span currentSpan = Span.current(); + if (currentSpan.getSpanContext().isValid()) + injectTraceParentHeader(currentSpan); return stub; + } else throw new Exception("WS Client Stub not available." + (hasInitError() ? "\n" + initErrMessage : "")); } @@ -582,6 +726,10 @@ static public Stub setStubOptions(Stub thestub, Connection connection) throws Ax opt.setProperty(HTTPConstants.CHUNKED, Boolean.FALSE); + //only do this if tracing enabled? + //injectCurrentSpanTraceParentHeader(opt); + //opt.setProperty("traceparent", getTraceParentHeader()); + if (connection.getPreemptiveHTTPAuthenticate()) { //Axis2 now forces connection authenticate, even if target is not secure @@ -676,12 +824,32 @@ protected void handleEspSoapFaults(EspSoapFaultWrapper e, String message) throws * the array of esp exception wrapper */ protected void handleEspExceptions(ArrayOfEspExceptionWrapper exp, String message) throws ArrayOfEspExceptionWrapper + { + handleEspExceptions(exp, message, null); + } + + /** + * Handle esp exceptions. + * + * @param exp + * the exp + * @param message + * the message + * @param span + * optional span. Resulting exception reported on span + * @throws org.hpccsystems.ws.client.wrappers.ArrayOfEspExceptionWrapper + * the array of esp exception wrapper + */ + protected void handleEspExceptions(ArrayOfEspExceptionWrapper exp, String message, Span span) throws ArrayOfEspExceptionWrapper { if (exp == null || exp.getExceptions() == null || exp.getExceptions().size() <= 0) return; if (message != null && !message.isEmpty()) exp.setWsClientMessage(message); log.error(exp.toString()); + if (span != null && span.getSpanContext().isValid()) + span.recordException(exp); + throw exp; } diff --git a/wsclient/src/main/java/org/hpccsystems/ws/client/HPCCWsWorkUnitsClient.java b/wsclient/src/main/java/org/hpccsystems/ws/client/HPCCWsWorkUnitsClient.java index 829644a84..0b35e215f 100644 --- a/wsclient/src/main/java/org/hpccsystems/ws/client/HPCCWsWorkUnitsClient.java +++ b/wsclient/src/main/java/org/hpccsystems/ws/client/HPCCWsWorkUnitsClient.java @@ -100,6 +100,10 @@ import org.hpccsystems.ws.client.wrappers.wsworkunits.WorkunitWrapper; import org.hpccsystems.ws.client.wrappers.wsworkunits.WsWorkunitsClientStubWrapper; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.context.Scope; + /** * Facilitates ECL WorkUnit related actions. * @@ -295,6 +299,8 @@ public void fastWURefresh(WorkunitWrapper wu) throws Exception, ArrayOfEspExcept { verifyStub(); // Throws exception if stub failed + Span span = getSpanBuilder("FastWURefresh").startSpan(); + WUQuery request = new WUQuery(); WUState previousState = getStateID(wu); @@ -303,37 +309,49 @@ public void fastWURefresh(WorkunitWrapper wu) throws Exception, ArrayOfEspExcept request.setCount(1); WUQueryResponse response = null; - - try - { - response = ((WsWorkunits) stub).wUQuery(request); - } - catch (RemoteException e) - { - throw new Exception("WsWorkunits.fastWURefresh(...) encountered RemoteException.", e); - } - catch (EspSoapFault e) - { - handleEspSoapFaults(new EspSoapFaultWrapper(e), "Could Not perform fastWURefresh"); - } - - if (response != null) + try (Scope scope = span.makeCurrent()) { - if (response.getExceptions() != null) - handleEspExceptions(new ArrayOfEspExceptionWrapper(response.getExceptions()), "Could Not perform fastWURefresh"); - - if (response.getWorkunits() != null) + try { - ECLWorkunit[] eclWorkunit = response.getWorkunits().getECLWorkunit(); - - if (eclWorkunit != null && eclWorkunit.length == 1) wu.update(eclWorkunit[0]); + response = ((WsWorkunits) stub).wUQuery(request); + } + catch (RemoteException e) + { + span.recordException(e); + span.setStatus(StatusCode.ERROR, e.getLocalizedMessage()); + throw new Exception("WsWorkunits.fastWURefresh(...) encountered RemoteException.", e); + } + catch (EspSoapFault e) + { + span.recordException(e); + span.setStatus(StatusCode.ERROR, e.getLocalizedMessage()); + handleEspSoapFaults(new EspSoapFaultWrapper(e), "Could Not perform fastWURefresh"); } - if (previousState != getStateID(wu)) + if (response != null) { - fullWURefresh(wu); + if (response.getExceptions() != null) + handleEspExceptions(new ArrayOfEspExceptionWrapper(response.getExceptions()), "Could Not perform fastWURefresh", span); + + span.setStatus(StatusCode.OK); + if (response.getWorkunits() != null) + { + ECLWorkunit[] eclWorkunit = response.getWorkunits().getECLWorkunit(); + + if (eclWorkunit != null && eclWorkunit.length == 1) + wu.update(eclWorkunit[0]); + } + + if (previousState != getStateID(wu)) + { + fullWURefresh(wu); + } } } + finally + { + span.end(); + } } /** @@ -2551,18 +2569,27 @@ public List deleteQueries(Set querynames, String clu */ public boolean ping() throws Exception { - verifyStub(); - - Ping request = new Ping(); - - try + Span span = getSpanBuilder("WsWUClient_Ping").startSpan(); + try (Scope scope = span.makeCurrent()) { - ((WsWorkunitsStub) stub).ping(request); + verifyStub(); + + Ping request = new Ping(); + try + { + ((WsWorkunitsStub) stub).ping(request); + span.setStatus(StatusCode.OK); + } + catch (Exception e) + { + span.recordException(e); + log.error(e.getLocalizedMessage()); + return false; + } } - catch (Exception e) + finally { - log.error(e.getLocalizedMessage()); - return false; + span.end(); } return true; diff --git a/wsclient/src/test/java/org/hpccsystems/ws/client/BaseRemoteTest.java b/wsclient/src/test/java/org/hpccsystems/ws/client/BaseRemoteTest.java index 14b0f23ce..12a74d728 100644 --- a/wsclient/src/test/java/org/hpccsystems/ws/client/BaseRemoteTest.java +++ b/wsclient/src/test/java/org/hpccsystems/ws/client/BaseRemoteTest.java @@ -17,6 +17,10 @@ HPCC SYSTEMS software Copyright (C) 2019 HPCC Systems®. package org.hpccsystems.ws.client; +import static io.opentelemetry.api.common.AttributeKey.stringKey; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + import java.io.ByteArrayOutputStream; import java.io.InputStream; import java.net.InetAddress; @@ -30,28 +34,33 @@ HPCC SYSTEMS software Copyright (C) 2019 HPCC Systems®. import java.util.concurrent.Executors; import java.util.concurrent.Future; -import org.hpccsystems.ws.client.HPCCWsWorkUnitsClient; -import org.hpccsystems.ws.client.wrappers.wsworkunits.WorkunitWrapper; -import org.hpccsystems.ws.client.HPCCWsClient; import org.hpccsystems.ws.client.HPCCWsTopologyClient.TopologyGroupQueryKind; import org.hpccsystems.ws.client.platform.Platform; import org.hpccsystems.ws.client.utils.Connection; import org.hpccsystems.ws.client.wrappers.gen.wstopology.TpGroupWrapper; import org.junit.Assume; +import org.hpccsystems.ws.client.wrappers.wsworkunits.WorkunitWrapper; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.experimental.categories.Category; -import java.net.URL; - -import java.nio.file.Paths; -import java.nio.file.Path; -import java.nio.file.Files; +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator; +import io.opentelemetry.context.propagation.ContextPropagators; +import io.opentelemetry.exporter.logging.LoggingSpanExporter; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.resources.Resource; +import io.opentelemetry.sdk.trace.SdkTracerProvider; +import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor; +import io.opentelemetry.sdk.trace.samplers.Sampler; @Category(org.hpccsystems.commons.annotations.RemoteTests.class) public abstract class BaseRemoteTest { public static Exception initializationException = null; + private final static String INSTRUMENTED_LIB_NAME = "WsClientJUnitSuite"; protected static Platform platform; protected static HPCCWsClient wsclient; @@ -65,6 +74,7 @@ public abstract class BaseRemoteTest protected final static String defaultUserName = "JunitUser"; protected static Connection connection = null; + protected final static boolean enableTracing = Boolean.valueOf(System.getProperty("trace", "true")); protected final static String hpccUser = System.getProperty("hpccuser", defaultUserName); protected final static String hpccPass = System.getProperty("hpccpass", ""); protected final static Integer connTO = System.getProperty("connecttimeoutmillis")==null?null:Integer.valueOf(System.getProperty("connecttimeoutmillis")); @@ -72,6 +82,7 @@ public abstract class BaseRemoteTest protected final static int testThreadCount = Integer.parseInt(System.getProperty("testthreadcount", "10")); public static final String DEFAULTHPCCFILENAME = "benchmark::all_types::200kb"; + protected static Tracer tracer = null; /* * Code to generate superfile with default file as subfile @@ -111,6 +122,34 @@ public static void initCheck() public static void initialize() throws Exception { + if (enableTracing) + { + // set service name on all OTel signals + Resource resource = Resource.getDefault().merge(Resource.create( + Attributes.of(stringKey("service.name"), INSTRUMENTED_LIB_NAME, + stringKey("deployment.environment"),"1.0", + stringKey("deployment.environment"),"test"))); + + // init OTel trace provider https://opentelemetry.io/docs/languages/java/exporters/ + SdkTracerProvider sdkTracerProvider = SdkTracerProvider.builder() + .setResource(resource) + .setSampler(Sampler.alwaysOn()) + .addSpanProcessor( + //BatchSpanProcessor.builder( + SimpleSpanProcessor.create(LoggingSpanExporter.create()) + //OtlpGrpcSpanExporter.builder().setEndpoint(System.getenv("OTEL_EXPORTER_OTLP_ENDPOINT")).addHeader("Authorization", "Bearer " + System.getenv("ELASTIC_APM_SECRET_TOKEN")).build()).build()).build(); + ).build(); + + // create sdk object and set it as global + OpenTelemetrySdk sdk = OpenTelemetrySdk.builder().setTracerProvider(sdkTracerProvider) + .setPropagators(ContextPropagators.create(W3CTraceContextPropagator.getInstance())) + .buildAndRegisterGlobal(); + + Runtime.getRuntime().addShutdownHook(new Thread(sdk::close)); + + tracer = GlobalOpenTelemetry.getTracer(INSTRUMENTED_LIB_NAME); + } + // This allows testing against locally created self signed certs to work. // In production certs will need to be created valid hostnames javax.net.ssl.HttpsURLConnection.setDefaultHostnameVerifier( @@ -226,7 +265,7 @@ public boolean verify(String hostname,javax.net.ssl.SSLSession sslSession) } else { - System.out.println("RemoteTest: 'roxiegroupname': '" + roxieclustername + "'"); + System.out.println("RemoteTest: 'roxiegroupname': '" + roxieclustername + "'"); } } catch (Exception e) @@ -240,7 +279,16 @@ public boolean verify(String hostname,javax.net.ssl.SSLSession sslSession) // Run the generate-datasets.ecl script if present in the project resources try { - executeECLScript("generate-datasets.ecl"); + //Span eclscriptspan = tracer.spanBuilder("generate-datasets.ecl").startSpan(); + + //try (Scope innerScope = eclscriptspan.makeCurrent()) + { + executeECLScript("generate-datasets.ecl"); + } + //finally + //{ + // eclscriptspan.end(); + //} } catch (Exception e) { diff --git a/wsclient/src/test/java/org/hpccsystems/ws/client/WSWorkunitsTest.java b/wsclient/src/test/java/org/hpccsystems/ws/client/WSWorkunitsTest.java index deb12aca5..17edcfc99 100644 --- a/wsclient/src/test/java/org/hpccsystems/ws/client/WSWorkunitsTest.java +++ b/wsclient/src/test/java/org/hpccsystems/ws/client/WSWorkunitsTest.java @@ -38,6 +38,10 @@ HPCC SYSTEMS software Copyright (C) 2019 HPCC Systems®. import org.junit.Test; import org.junit.runners.MethodSorters; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.context.Scope; + @FixMethodOrder(MethodSorters.NAME_ASCENDING) public class WSWorkunitsTest extends BaseRemoteTest { @@ -82,18 +86,30 @@ public void testSharedWsWUgets() throws InterruptedException @Test public void stageA_ping() throws Exception { - try - { - Assert.assertTrue(client.ping()); - } - catch (AxisFault e) + Span pingSpan = tracer.spanBuilder("WsWUTests-PingTest").startSpan(); + + try (Scope innerScope = pingSpan.makeCurrent()) { - e.printStackTrace(); - Assert.fail(); + try + { + Assert.assertTrue(client.ping()); + pingSpan.setStatus(StatusCode.OK); + } + catch (AxisFault e) + { + pingSpan.recordException(e); + e.printStackTrace(); + Assert.fail(); + } + catch (Exception e) + { + pingSpan.recordException(e); + Assert.fail(e.toString()); + } } - catch (Exception e) + finally { - Assert.fail(e.toString()); + pingSpan.end(); } }