From fae105200af947f8e652e3fce7927589b46e84a1 Mon Sep 17 00:00:00 2001 From: James McMullan Date: Wed, 10 Jul 2024 09:24:13 -0400 Subject: [PATCH] HPCC4J-611 Add OpenTelemetry tracing to dfsclient - Updated event attribute reporting - Fixed trace structure in dfsclient protocol - Added tracing to FileUtility Signed-off-by: James McMullan James.McMullan@lexisnexis.com --- .../hpccsystems/dfs/client/FileUtility.java | 4 ++ .../dfs/client/RowServiceInputStream.java | 70 +++++++----------- .../dfs/client/RowServiceOutputStream.java | 72 +++++++------------ .../org/hpccsystems/dfs/client/Utils.java | 21 +++++- 4 files changed, 77 insertions(+), 90 deletions(-) diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/FileUtility.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/FileUtility.java index 40c766323..ddbdda681 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/FileUtility.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/FileUtility.java @@ -33,6 +33,8 @@ import java.nio.file.Files; import java.nio.file.Paths; +import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk; + import org.hpccsystems.commons.ecl.FieldDef; import org.json.JSONArray; import org.json.JSONObject; @@ -1777,6 +1779,8 @@ private static void performWrite(String[] args, TaskContext context) */ public static JSONArray run(String[] args) { + AutoConfiguredOpenTelemetrySdk.initialize().getOpenTelemetrySdk(); + Options options = getTopLevelOptions(); CommandLineParser parser = new DefaultParser(); diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java index c0381c82f..90d30794e 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java @@ -555,8 +555,7 @@ private void setPrefetchException(HpccFileException e) if (readSpan != null) { - Attributes attributes = Attributes.of( ServerAttributes.SERVER_ADDRESS, getIP(), - ServerAttributes.SERVER_PORT, Long.valueOf(getPort()), + Attributes attributes = Attributes.of( AttributeKey.longKey("server.index"), Long.valueOf(getFilePartCopy()), ExceptionAttributes.EXCEPTION_MESSAGE, e.getMessage()); readSpan.recordException(e, attributes); } @@ -725,8 +724,7 @@ public void startBlockingFetchRequest(List fetchOffsets) throws Exception Exception wrappedException = new Exception("Error: attempted to start a fetch request for an input stream in sequential read mode."); if (readSpan != null) { - Attributes attributes = Attributes.of( ServerAttributes.SERVER_ADDRESS, getIP(), - ServerAttributes.SERVER_PORT, Long.valueOf(getPort()), + Attributes attributes = Attributes.of( AttributeKey.longKey("server.index"), Long.valueOf(getFilePartCopy()), ExceptionAttributes.EXCEPTION_MESSAGE, wrappedException.getMessage()); readSpan.recordException(wrappedException, attributes); } @@ -814,8 +812,7 @@ private int startFetch() if (readSpan != null) { - Attributes attributes = Attributes.of(ServerAttributes.SERVER_ADDRESS, getIP(), - ServerAttributes.SERVER_PORT, Long.valueOf(getPort()), + Attributes attributes = Attributes.of( AttributeKey.longKey("server.index"), Long.valueOf(getFilePartCopy()), AttributeKey.longKey("read.offset"), streamPos, AttributeKey.longKey("read.size"), Long.valueOf(maxReadSizeKB*1000)); readSpan.addEvent("RowServiceInputStream.readRequest", attributes); @@ -977,6 +974,9 @@ else if (this.handle == 0) { if (dataLen == 0) { + // Read handle + dis.readInt(); + close(); return 0; } @@ -1038,8 +1038,7 @@ private void readDataInFetch() IOException wrappedException = new IOException(prefix + "Encountered unexpected end of stream mid fetch, this.dis.available() returned " + bytesToRead + " bytes."); if (readSpan != null) { - Attributes attributes = Attributes.of( ServerAttributes.SERVER_ADDRESS, getIP(), - ServerAttributes.SERVER_PORT, Long.valueOf(getPort()), + Attributes attributes = Attributes.of( AttributeKey.longKey("server.index"), Long.valueOf(getFilePartCopy()), ExceptionAttributes.EXCEPTION_MESSAGE, wrappedException.getMessage()); readSpan.recordException(wrappedException, attributes); } @@ -1132,9 +1131,8 @@ private void finishFetch() if (readSpan != null) { - Attributes attributes = Attributes.of(ServerAttributes.SERVER_ADDRESS, getIP(), - ServerAttributes.SERVER_PORT, Long.valueOf(getPort()), - AttributeKey.longKey("read.bytesRead"), Long.valueOf(totalDataInCurrentRequest)); + Attributes attributes = Attributes.of( AttributeKey.longKey("server.index"), Long.valueOf(getFilePartCopy()), + AttributeKey.longKey("read.bytesRead"), Long.valueOf(totalDataInCurrentRequest)); readSpan.addEvent("RowServiceInputStream.readResponse", attributes); } @@ -1146,10 +1144,9 @@ private void finishFetch() { if (readSpan != null) { - Attributes attributes = Attributes.of(ServerAttributes.SERVER_ADDRESS, getIP(), - ServerAttributes.SERVER_PORT, Long.valueOf(getPort()), - AttributeKey.longKey("read.offset"), streamPos, - AttributeKey.longKey("read.size"), Long.valueOf(maxReadSizeKB*1000)); + Attributes attributes = Attributes.of( AttributeKey.longKey("server.index"), Long.valueOf(getFilePartCopy()), + AttributeKey.longKey("read.offset"), streamPos, + AttributeKey.longKey("read.size"), Long.valueOf(maxReadSizeKB*1000)); readSpan.addEvent("RowServiceInputStream.readRequest", attributes); } @@ -1335,8 +1332,7 @@ public int available() throws IOException IOException wrappedException = new IOException(prefix + "End of input stream, bufferLen:" + bufferLen + ", this.readPos:" + this.readPos + ", availableBytes=0"); if (readSpan != null) { - Attributes attributes = Attributes.of( ServerAttributes.SERVER_ADDRESS, getIP(), - ServerAttributes.SERVER_PORT, Long.valueOf(getPort()), + Attributes attributes = Attributes.of( AttributeKey.longKey("server.index"), Long.valueOf(getFilePartCopy()), ExceptionAttributes.EXCEPTION_MESSAGE, wrappedException.getMessage()); readSpan.recordException(wrappedException, attributes); } @@ -1679,8 +1675,7 @@ private void makeActive() throws HpccFileException if (readSpan != null) { - Attributes attributes = Attributes.of(ServerAttributes.SERVER_ADDRESS, getIP(), - ServerAttributes.SERVER_PORT, Long.valueOf(getPort())); + Attributes attributes = Attributes.of( AttributeKey.longKey("server.index"), Long.valueOf(getFilePartCopy()) ); readSpan.addEvent("RowServiceInputStream.connect", attributes); } @@ -1753,8 +1748,7 @@ private void makeActive() throws HpccFileException if (readSpan != null) { - Attributes attributes = Attributes.of(ServerAttributes.SERVER_ADDRESS, getIP(), - ServerAttributes.SERVER_PORT, Long.valueOf(getPort())); + Attributes attributes = Attributes.of( AttributeKey.longKey("server.index"), Long.valueOf(getFilePartCopy()) ); readSpan.addEvent("RowServiceInputStream.versionRequest", attributes); } @@ -1795,9 +1789,8 @@ private void makeActive() throws HpccFileException if (readSpan != null) { - Attributes attributes = Attributes.of(ServerAttributes.SERVER_ADDRESS, getIP(), - ServerAttributes.SERVER_PORT, Long.valueOf(getPort()), - ServiceAttributes.SERVICE_VERSION, rowServiceVersion); + Attributes attributes = Attributes.of( AttributeKey.longKey("server.index"), Long.valueOf(getFilePartCopy()), + ServiceAttributes.SERVICE_VERSION, rowServiceVersion); readSpan.addEvent("RowServiceInputStream.versionResponse", attributes); } } @@ -2127,7 +2120,7 @@ private void makeFetchObject(StringBuilder sb) private String makeGetVersionRequest() { - final String trace = traceContextHeader != null ? "\"_trace/traceparent\" : \"" + traceContextHeader + "\",\n" : ""; + final String trace = traceContextHeader != null ? "\"_trace\": { \"traceparent\" : \"" + traceContextHeader + "\" },\n" : ""; final String versionMsg = RFCCodes.RFCStreamReadCmd + "{ \"command\" : \"version\", \"handle\": \"-1\", " + trace + " \"format\": \"binary\" }"; return versionMsg; } @@ -2140,10 +2133,8 @@ private String makeInitialRequest() sb.append("{ \"format\" : \"binary\", \n"); sb.append("\"replyLimit\" : " + this.maxReadSizeKB + ",\n"); - if (traceContextHeader != null) - { - sb.append("\"_trace/traceparent\" : \"" + traceContextHeader + "\",\n"); - } + final String trace = traceContextHeader != null ? "\"_trace\": { \"traceparent\" : \"" + traceContextHeader + "\" },\n" : ""; + sb.append(trace); if (!useOldProtocol) { @@ -2213,10 +2204,8 @@ private String makeHandleRequest() sb.append("{ \"format\" : \"binary\",\n"); sb.append(" \"handle\" : \"" + Integer.toString(this.handle) + "\","); - if (traceContextHeader != null) - { - sb.append("\"_trace/traceparent\" : \"" + traceContextHeader + "\",\n"); - } + final String trace = traceContextHeader != null ? "\"_trace\": { \"traceparent\" : \"" + traceContextHeader + "\" },\n" : ""; + sb.append(trace); if (!useOldProtocol) { @@ -2241,10 +2230,8 @@ private String makeTokenRequest() sb.append("{ \"format\" : \"binary\",\n"); sb.append("\"replyLimit\" : " + this.maxReadSizeKB + ",\n"); - if (traceContextHeader != null) - { - sb.append("\"_trace/traceparent\" : \"" + traceContextHeader + "\",\n"); - } + final String trace = traceContextHeader != null ? "\"_trace\": { \"traceparent\" : \"" + traceContextHeader + "\" },\n" : ""; + sb.append(trace); if (!useOldProtocol) { @@ -2266,10 +2253,8 @@ private String makeCloseHandleRequest() sb.append("{ \"format\" : \"binary\",\n"); sb.append(" \"handle\" : \"" + Integer.toString(this.handle) + "\","); - if (traceContextHeader != null) - { - sb.append("\"_trace/traceparent\" : \"" + traceContextHeader + "\",\n"); - } + final String trace = traceContextHeader != null ? "\"_trace\": { \"traceparent\" : \"" + traceContextHeader + "\" },\n" : ""; + sb.append(trace); sb.append(" \"command\" : \"close\""); sb.append("\n}"); @@ -2311,8 +2296,7 @@ private void sendCloseFileRequest() throws IOException IOException wrappedException = new IOException(prefix + "Failed to close file. Unable to read response with error: " + e.getMessage(), e); if (readSpan != null) { - Attributes attributes = Attributes.of( ServerAttributes.SERVER_ADDRESS, getIP(), - ServerAttributes.SERVER_PORT, Long.valueOf(getPort()), + Attributes attributes = Attributes.of( AttributeKey.longKey("server.index"), Long.valueOf(getFilePartCopy()), ExceptionAttributes.EXCEPTION_MESSAGE, wrappedException.getMessage()); readSpan.recordException(wrappedException, attributes); } diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceOutputStream.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceOutputStream.java index 35de43b12..40f15f536 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceOutputStream.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceOutputStream.java @@ -18,6 +18,7 @@ import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; +import java.text.AttributedCharacterIterator.Attribute; import java.io.OutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; @@ -245,9 +246,7 @@ private static class RowServiceResponse if (this.writeSpan != null) { - Attributes attributes = Attributes.of(ServerAttributes.SERVER_ADDRESS, this.rowServiceIP, - ServerAttributes.SERVER_PORT, Long.valueOf(this.rowServicePort)); - writeSpan.addEvent("RowServiceOutputStream.connect", attributes); + writeSpan.addEvent("RowServiceOutputStream.connect", getServerAttributes()); } try @@ -295,9 +294,7 @@ private static class RowServiceResponse Exception wrappedException = new Exception(errorMessage, e); if (writeSpan != null) { - Attributes attributes = Attributes.of(ServerAttributes.SERVER_ADDRESS, this.rowServiceIP, - ServerAttributes.SERVER_PORT, Long.valueOf(this.rowServicePort)); - writeSpan.recordException(wrappedException, attributes); + writeSpan.recordException(wrappedException, getServerAttributes()); } throw wrappedException; @@ -309,9 +306,7 @@ private static class RowServiceResponse if (writeSpan != null) { - Attributes attributes = Attributes.of(ServerAttributes.SERVER_ADDRESS, this.rowServiceIP, - ServerAttributes.SERVER_PORT, Long.valueOf(this.rowServicePort)); - writeSpan.addEvent("RowServiceOutputStream.versionRequest", attributes); + writeSpan.addEvent("RowServiceOutputStream.versionRequest", getServerAttributes()); } try @@ -328,9 +323,7 @@ private static class RowServiceResponse HpccFileException wrappedException = new HpccFileException("Failed on initial remote read read trans", e); if (writeSpan != null) { - Attributes attributes = Attributes.of(ServerAttributes.SERVER_ADDRESS, this.rowServiceIP, - ServerAttributes.SERVER_PORT, Long.valueOf(this.rowServicePort)); - writeSpan.recordException(wrappedException, attributes); + writeSpan.recordException(wrappedException, getServerAttributes()); } throw wrappedException; @@ -355,9 +348,7 @@ private static class RowServiceResponse HpccFileException wrappedException = new HpccFileException("Error while attempting to read version response.", e); if (writeSpan != null) { - Attributes attributes = Attributes.of(ServerAttributes.SERVER_ADDRESS, this.rowServiceIP, - ServerAttributes.SERVER_PORT, Long.valueOf(this.rowServicePort)); - writeSpan.recordException(wrappedException, attributes); + writeSpan.recordException(wrappedException, getServerAttributes()); } throw wrappedException; @@ -372,9 +363,16 @@ private static class RowServiceResponse makeInitialWriteRequest(); } + private Attributes getServerAttributes() + { + return Attributes.of(ServerAttributes.SERVER_ADDRESS, this.rowServiceIP, + ServerAttributes.SERVER_PORT, Long.valueOf(this.rowServicePort)); + } + private String makeGetVersionRequest() { - final String trace = traceContextHeader != null ? "\"_trace/traceparent\" : \"" + traceContextHeader + "\",\n" : ""; + final String trace = traceContextHeader != null ? "\"_trace\": { \"traceparent\" : \"" + traceContextHeader + "\" },\n" : ""; + final String versionMsg = RFCCodes.RFCStreamReadCmd + "{ \"command\" : \"version\", \"handle\": \"-1\", " + trace + " \"format\": \"binary\" }"; return versionMsg; } @@ -383,7 +381,7 @@ private void makeInitialWriteRequest() throws Exception { String jsonRecordDef = RecordDefinitionTranslator.toJsonRecord(this.recordDef).toString(); - final String trace = traceContextHeader != null ? "\"_trace/traceparent\" : \"" + traceContextHeader + "\",\n" : ""; + final String trace = traceContextHeader != null ? "\"_trace\": { \"traceparent\" : \"" + traceContextHeader + "\" },\n" : ""; String initialRequest = "\n{\n" + " \"format\" : \"binary\",\n" + trace @@ -429,9 +427,7 @@ private void makeInitialWriteRequest() throws Exception IOException wrappedException = new IOException(response.errorMessage); if (writeSpan != null) { - Attributes attributes = Attributes.of(ServerAttributes.SERVER_ADDRESS, this.rowServiceIP, - ServerAttributes.SERVER_PORT, Long.valueOf(this.rowServicePort)); - writeSpan.recordException(wrappedException, attributes); + writeSpan.recordException(wrappedException, getServerAttributes()); } throw wrappedException; @@ -440,7 +436,7 @@ private void makeInitialWriteRequest() throws Exception private String makeCloseHandleRequest() { - final String trace = traceContextHeader != null ? "\"_trace/traceparent\" : \"" + traceContextHeader + "\",\n" : ""; + final String trace = traceContextHeader != null ? "\"_trace\": { \"traceparent\" : \"" + traceContextHeader + "\" },\n" : ""; StringBuilder sb = new StringBuilder(256); sb.delete(0, sb.length()); @@ -477,9 +473,7 @@ private void sendCloseFileRequest() throws IOException IOException wrappedException = new IOException("Failed on close file with error: ", e); if (writeSpan != null) { - Attributes attributes = Attributes.of(ServerAttributes.SERVER_ADDRESS, this.rowServiceIP, - ServerAttributes.SERVER_PORT, Long.valueOf(this.rowServicePort)); - writeSpan.recordException(wrappedException, attributes); + writeSpan.recordException(wrappedException, getServerAttributes()); } throw wrappedException; @@ -495,9 +489,7 @@ private void sendCloseFileRequest() throws IOException IOException wrappedException = new IOException("Failed to close file. Unable to read response with error: ", e); if (writeSpan != null) { - Attributes attributes = Attributes.of(ServerAttributes.SERVER_ADDRESS, this.rowServiceIP, - ServerAttributes.SERVER_PORT, Long.valueOf(this.rowServicePort)); - writeSpan.recordException(wrappedException, attributes); + writeSpan.recordException(wrappedException, getServerAttributes()); } throw wrappedException; @@ -508,9 +500,7 @@ private void sendCloseFileRequest() throws IOException IOException wrappedException = new IOException(response.errorMessage); if (writeSpan != null) { - Attributes attributes = Attributes.of(ServerAttributes.SERVER_ADDRESS, this.rowServiceIP, - ServerAttributes.SERVER_PORT, Long.valueOf(this.rowServicePort)); - writeSpan.recordException(wrappedException, attributes); + writeSpan.recordException(wrappedException, getServerAttributes()); } throw wrappedException; @@ -579,9 +569,7 @@ private RowServiceResponse readResponse() throws HpccFileException HpccFileException wrappedException = new HpccFileException("Early data termination, no handle"); if (writeSpan != null) { - Attributes attributes = Attributes.of(ServerAttributes.SERVER_ADDRESS, this.rowServiceIP, - ServerAttributes.SERVER_PORT, Long.valueOf(this.rowServicePort)); - writeSpan.recordException(wrappedException, attributes); + writeSpan.recordException(wrappedException, getServerAttributes()); } throw wrappedException; @@ -595,9 +583,7 @@ private RowServiceResponse readResponse() throws HpccFileException HpccFileException wrappedException = new HpccFileException("Error while attempting to read row service response: ", e); if (writeSpan != null) { - Attributes attributes = Attributes.of(ServerAttributes.SERVER_ADDRESS, this.rowServiceIP, - ServerAttributes.SERVER_PORT, Long.valueOf(this.rowServicePort)); - writeSpan.recordException(wrappedException, attributes); + writeSpan.recordException(wrappedException, getServerAttributes()); } throw wrappedException; @@ -624,9 +610,7 @@ else if (bytesWritten == 0 && compressionAlgo != CompressionAlgorithm.NONE) IOException wrappedException = new IOException("Fatal error while closing file. Writing compressed files with 0 length is not supported with the remote HPCC cluster."); if (writeSpan != null) { - Attributes attributes = Attributes.of(ServerAttributes.SERVER_ADDRESS, this.rowServiceIP, - ServerAttributes.SERVER_PORT, Long.valueOf(this.rowServicePort)); - writeSpan.recordException(wrappedException, attributes); + writeSpan.recordException(wrappedException, getServerAttributes()); } throw wrappedException; @@ -662,7 +646,7 @@ public void write(byte[] b) throws IOException */ public void write(byte[] b, int off, int len) throws IOException { - final String trace = traceContextHeader != null ? "\"_trace/traceparent\" : \"" + traceContextHeader + "\",\n" : ""; + final String trace = traceContextHeader != null ? "\"_trace\": { \"traceparent\" : \"" + traceContextHeader + "\" },\n" : ""; String request = "{ \"format\" : \"binary\", \"handle\" : \"" + this.handle + "\"," + trace @@ -702,9 +686,7 @@ public void write(byte[] b, int off, int len) throws IOException IOException wrappedException = new IOException("Failed during write operation. Unable to read response with error: ", e); if (writeSpan != null) { - Attributes attributes = Attributes.of(ServerAttributes.SERVER_ADDRESS, this.rowServiceIP, - ServerAttributes.SERVER_PORT, Long.valueOf(this.rowServicePort)); - writeSpan.recordException(wrappedException, attributes); + writeSpan.recordException(wrappedException, getServerAttributes()); } throw wrappedException; @@ -715,9 +697,7 @@ public void write(byte[] b, int off, int len) throws IOException IOException wrappedException = new IOException(response.errorMessage); if (writeSpan != null) { - Attributes attributes = Attributes.of(ServerAttributes.SERVER_ADDRESS, this.rowServiceIP, - ServerAttributes.SERVER_PORT, Long.valueOf(this.rowServicePort)); - writeSpan.recordException(wrappedException, attributes); + writeSpan.recordException(wrappedException, getServerAttributes()); } throw wrappedException; diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/Utils.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/Utils.java index e33373c44..7462c64dc 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/Utils.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/Utils.java @@ -13,6 +13,7 @@ package org.hpccsystems.dfs.client; import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.SpanKind; import io.opentelemetry.api.trace.Tracer; @@ -20,9 +21,27 @@ public class Utils { + private static OpenTelemetry globalOpenTelemetry = null; + private static Tracer dfsClientTracer = null; + + public static OpenTelemetry getOpenTelemetry() + { + if (globalOpenTelemetry == null) + { + globalOpenTelemetry = GlobalOpenTelemetry.get(); + } + + return globalOpenTelemetry; + } + public static Tracer getTelemetryTracer() { - return GlobalOpenTelemetry.get().getTracer("DFSClient"); + if (dfsClientTracer == null) + { + dfsClientTracer = getOpenTelemetry().getTracer("org.hpccsystems.dfsclient"); + } + + return dfsClientTracer; } public static Span createSpan(String name)