diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/FileUtility.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/FileUtility.java index 40c766323..116c37c21 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/FileUtility.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/FileUtility.java @@ -17,6 +17,7 @@ package org.hpccsystems.dfs.client; import java.util.List; +import java.util.Stack; import java.util.concurrent.atomic.AtomicLong; import java.util.regex.Pattern; import java.util.ArrayList; @@ -33,6 +34,13 @@ import java.nio.file.Files; import java.nio.file.Paths; +import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.Attributes; + import org.hpccsystems.commons.ecl.FieldDef; import org.json.JSONArray; import org.json.JSONObject; @@ -66,127 +74,240 @@ public class FileUtility private static final int NUM_DEFAULT_THREADS = 4; static private final int DEFAULT_ACCESS_EXPIRY_SECONDS = 120; + private static boolean otelInitialized = false; + private static class TaskContext { - public AtomicLong recordsRead = new AtomicLong(0); - public AtomicLong recordsWritten = new AtomicLong(0); + private static class TaskOperation + { + public String currentOperationDesc = ""; + public long operationStartNS = 0; - public AtomicLong bytesRead = new AtomicLong(0); - public AtomicLong bytesWritten = new AtomicLong(0); + public List errorMessages = new ArrayList(); + public List warnMessages = new ArrayList(); - private List errorMessages = new ArrayList(); - private List warnMessages = new ArrayList(); + public AtomicLong recordsRead = new AtomicLong(0); + public AtomicLong recordsWritten = new AtomicLong(0); - private String currentOperationDesc = ""; - private long operationStart = 0; - private List operationResults = new ArrayList(); + public AtomicLong bytesRead = new AtomicLong(0); + public AtomicLong bytesWritten = new AtomicLong(0); - public boolean hasError() + public Span operationSpan = null; + + public JSONObject end(boolean success) + { + if (success) + { + operationSpan.setStatus(StatusCode.OK); + } + else + { + operationSpan.setStatus(StatusCode.ERROR); + } + + operationSpan.end(); + + long totalOperationTime = System.nanoTime(); + totalOperationTime -= operationStartNS; + + double timeInSeconds = (double) totalOperationTime / 1_000_000_000.0; + + JSONObject results = new JSONObject(); + + results.put("operation", currentOperationDesc); + results.put("successful", success); + + JSONArray errors = new JSONArray(); + for (String err : errorMessages) + { + errors.put(err); + } + results.put("errors", errors); + + JSONArray warns = new JSONArray(); + for (String warn : warnMessages) + { + warns.put(warn); + } + results.put("warns", warns); + + results.put("bytesWritten", bytesWritten.get()); + results.put("recordsWritten", recordsWritten.get()); + + results.put("bytesRead", bytesRead.get()); + results.put("recordsRead", recordsRead.get()); + + results.put("time", String.format("%.2f s",timeInSeconds)); + + double readBandwidth = (double) bytesRead.get() / (1_000_000.0 * timeInSeconds); + results.put("Read Bandwidth", String.format("%.2f MB/s", readBandwidth)); + + double writeBandwidth = (double) bytesWritten.get() / (1_000_000.0 * timeInSeconds); + results.put("Write Bandwidth", String.format("%.2f MB/s", writeBandwidth)); + + return results; + } + } + + private Stack operations = new Stack(); + public List operationResults = new ArrayList(); + + public void setCurrentOperationSpanAttributes(Attributes attributes) { - boolean err = false; - synchronized(errorMessages) + if (!hasCurrentOperation()) { - err = errorMessages.size() > 0; + return; } + TaskOperation op = getCurrentOperation(); - return err; + synchronized(op.operationSpan) + { + op.operationSpan.setAllAttributes(attributes); + } } - public void addError(String error) + public void addCurrentOperationSpanAttribute(AttributeKey key, Object value) { - synchronized(errorMessages) + if (!hasCurrentOperation()) { - errorMessages.add(error); + return; + } + TaskOperation op = getCurrentOperation(); + + synchronized(op.operationSpan) + { + op.operationSpan.setAttribute(key, value); } } - public void addWarn(String warn) + public void makeCurrentOperationSpanCurrent() { - synchronized(warnMessages) + if (!hasCurrentOperation()) + { + return; + } + TaskOperation op = getCurrentOperation(); + + synchronized(op.operationSpan) { - warnMessages.add(warn); + op.operationSpan.makeCurrent(); } } - public void clear() + public boolean hasError() { - currentOperationDesc = ""; - operationStart = 0; - recordsRead.set(0); - recordsWritten.set(0); + if (!hasCurrentOperation()) + { + return false; + } - bytesRead.set(0); - bytesWritten.set(0); + TaskOperation op = getCurrentOperation(); - errorMessages.clear(); - warnMessages.clear(); - } + boolean err = false; + synchronized(op.errorMessages) + { + err = op.errorMessages.size() > 0; + } - public boolean hasOperation() - { - return !currentOperationDesc.isEmpty(); + return err; } - public void startOperation(String operationName) + public void addError(String error) { - clear(); - currentOperationDesc = operationName; - operationStart = System.nanoTime(); - } + if (!hasCurrentOperation()) + { + return; + } - public void endOperation() - { - endOperation(true); + TaskOperation op = getCurrentOperation(); + + synchronized(op.errorMessages) + { + op.errorMessages.add(error); + } + + synchronized(op.operationSpan) + { + op.operationSpan.recordException(new Exception(error)); + } } - public void endOperation(boolean success) + public void addWarn(String warn) { - if (!hasOperation()) + if (!hasCurrentOperation()) { return; } - long totalOperationTime = System.nanoTime(); - totalOperationTime -= operationStart; + TaskOperation op = getCurrentOperation(); - double timeInSeconds = (double) totalOperationTime / 1_000_000_000.0; - - JSONObject results = new JSONObject(); + synchronized(op.warnMessages) + { + op.warnMessages.add(warn); + } - results.put("operation", currentOperationDesc); - results.put("successful", success); + synchronized(op.operationSpan) + { + op.operationSpan.addEvent(warn); + } + } - JSONArray errors = new JSONArray(); - for (String err : errorMessages) + public boolean hasCurrentOperation() + { + if (operations.isEmpty()) { - errors.put(err); + return false; } - results.put("errors", errors); - JSONArray warns = new JSONArray(); - for (String warn : warnMessages) + return true; + } + + public TaskOperation getCurrentOperation() + { + if (!hasCurrentOperation()) { - warns.put(warn); + return null; } - results.put("warns", warns); - results.put("bytesWritten", bytesWritten.get()); - results.put("recordsWritten", recordsWritten.get()); + return operations.peek(); + } - results.put("bytesRead", bytesRead.get()); - results.put("recordsRead", recordsRead.get()); + private void setCurrentOperation(TaskOperation op) + { + operations.push(op); + } - results.put("time", String.format("%.2f s",timeInSeconds)); + public void startOperation(String operationName) + { + TaskOperation op = new TaskOperation(); + op.currentOperationDesc = operationName; + op.operationStartNS = System.nanoTime(); - double readBandwidth = (double) bytesRead.get() / (1_000_000.0 * timeInSeconds); - results.put("Read Bandwidth", String.format("%.2f MB/s", readBandwidth)); + Span parentSpan = null; + TaskOperation prevOp = getCurrentOperation(); + if (prevOp != null) + { + parentSpan = prevOp.operationSpan; + } - double writeBandwidth = (double) bytesWritten.get() / (1_000_000.0 * timeInSeconds); - results.put("Write Bandwidth", String.format("%.2f MB/s", writeBandwidth)); + op.operationSpan = Utils.createChildSpan(parentSpan, operationName); + setCurrentOperation(op); + } + + public void endOperation() + { + endOperation(true); + } - operationResults.add(results); + public void endOperation(boolean success) + { + if (!hasCurrentOperation()) + { + return; + } - clear(); + operationResults.add(getCurrentOperation().end(success)); + operations.pop(); } public JSONArray generateResultsMessage() @@ -633,7 +754,7 @@ private static String[] filterFilesByFormat(String[] srcFiles, FileFormat format return filteredFiles.toArray(new String[0]); } - private static void executeTasks(Runnable[] tasks, int numThreads) throws Exception + private static void executeTasks(Runnable[] tasks, int numThreads, TaskContext context) throws Exception { int numTasksPerThread = tasks.length / numThreads; int numResidualTasks = tasks.length % numThreads; @@ -659,6 +780,10 @@ private static void executeTasks(Runnable[] tasks, int numThreads) throws Except public void run() { + // Make sure the span is current for the thread, otherwise spans created + // within this thread will not be children of the task span + context.makeCurrentOperationSpanCurrent(); + for (int j = 0; j < numSubTasks; j++) { subTasks[startingSubTask + j].run(); @@ -690,16 +815,19 @@ public void run() { try { - HpccRemoteFileReader fileReader = new HpccRemoteFileReader(filePart, recordDef, new HPCCRecordBuilder(recordDef)); + HpccRemoteFileReader.FileReadContext readContext = new HpccRemoteFileReader.FileReadContext(); + readContext.parentSpan = context.getCurrentOperation().operationSpan; + readContext.originalRD = recordDef; + HpccRemoteFileReader fileReader = new HpccRemoteFileReader(readContext, filePart, new HPCCRecordBuilder(recordDef)); while (fileReader.hasNext()) { HPCCRecord record = fileReader.next(); - context.recordsRead.incrementAndGet(); + context.getCurrentOperation().recordsRead.incrementAndGet(); } fileReader.close(); - context.bytesRead.addAndGet(fileReader.getStreamPosition()); + context.getCurrentOperation().bytesRead.addAndGet(fileReader.getStreamPosition()); } catch (Exception e) { @@ -719,7 +847,11 @@ private static Runnable[] createReadToThorTasks(DataPartition[] fileParts, Split for (int i = 0; i < tasks.length; i++) { final int taskIndex = i; - final HpccRemoteFileReader filePartReader = new HpccRemoteFileReader(fileParts[taskIndex], recordDef, new HPCCRecordBuilder(recordDef)); + + HpccRemoteFileReader.FileReadContext readContext = new HpccRemoteFileReader.FileReadContext(); + readContext.parentSpan = context.getCurrentOperation().operationSpan; + readContext.originalRD = recordDef; + final HpccRemoteFileReader filePartReader = new HpccRemoteFileReader(readContext, fileParts[taskIndex], new HPCCRecordBuilder(recordDef)); final String filePath = outFilePaths[taskIndex]; final FileOutputStream outStream = new FileOutputStream(filePath); @@ -743,13 +875,13 @@ public void run() splitTable.addRecordPosition(fileReader.getStreamPosition()); HPCCRecord record = fileReader.next(); fileWriter.writeRecord(record); - context.recordsRead.incrementAndGet(); + context.getCurrentOperation().recordsRead.incrementAndGet(); } splitTable.finish(fileReader.getStreamPosition()); fileReader.close(); - context.bytesRead.addAndGet(fileReader.getStreamPosition()); + context.getCurrentOperation().bytesRead.addAndGet(fileReader.getStreamPosition()); fileWriter.finalize(); outputStream.close(); @@ -839,12 +971,19 @@ private static Runnable[] createNonRedistributingCopyTasks(HPCCFile file, DFUCre for (int j = 0; j < numIncomingParts; j++) { DataPartition inFilePart = inFileParts[incomingFilePartIndex + j]; - filePartReaders[j] = new HpccRemoteFileReader(inFilePart, recordDef, new HPCCRecordBuilder(recordDef)); + HpccRemoteFileReader.FileReadContext readContext = new HpccRemoteFileReader.FileReadContext(); + readContext.parentSpan = context.getCurrentOperation().operationSpan; + readContext.originalRD = recordDef; + filePartReaders[j] = new HpccRemoteFileReader(readContext, inFilePart, new HPCCRecordBuilder(recordDef)); } incomingFilePartIndex += numIncomingParts; HPCCRecordAccessor recordAccessor = new HPCCRecordAccessor(recordDef); - final HPCCRemoteFileWriter partFileWriter = new HPCCRemoteFileWriter(outFilePart, recordDef, recordAccessor, CompressionAlgorithm.NONE); + HPCCRemoteFileWriter.FileWriteContext writeContext = new HPCCRemoteFileWriter.FileWriteContext(); + writeContext.parentSpan = context.getCurrentOperation().operationSpan; + writeContext.recordDef = recordDef; + writeContext.fileCompression = CompressionAlgorithm.NONE; + final HPCCRemoteFileWriter partFileWriter = new HPCCRemoteFileWriter(writeContext, outFilePart, recordAccessor); tasks[taskIndex] = new Runnable() { @@ -862,16 +1001,16 @@ public void run() { HPCCRecord record = fileReader.next(); fileWriter.writeRecord(record); - context.recordsWritten.incrementAndGet(); - context.recordsRead.incrementAndGet(); + context.getCurrentOperation().recordsWritten.incrementAndGet(); + context.getCurrentOperation().recordsRead.incrementAndGet(); } fileReader.close(); - context.bytesRead.addAndGet(fileReader.getStreamPosition()); + context.getCurrentOperation().bytesRead.addAndGet(fileReader.getStreamPosition()); } System.out.println("Closing file writer for task: " + taskIndex); fileWriter.close(); - context.bytesWritten.addAndGet(fileWriter.getBytesWritten()); + context.getCurrentOperation().bytesWritten.addAndGet(fileWriter.getBytesWritten()); } catch (Exception e) { @@ -968,7 +1107,12 @@ private static Runnable[] createWriteTasks(String[] srcFiles, SplitTable[] split DataPartition outFilePart = outFileParts[taskIndex]; HPCCRecordAccessor recordAccessor = new HPCCRecordAccessor(recordDef); - HPCCRemoteFileWriter filePartWriter = new HPCCRemoteFileWriter(outFilePart, recordDef, recordAccessor, CompressionAlgorithm.NONE); + + HPCCRemoteFileWriter.FileWriteContext writeContext = new HPCCRemoteFileWriter.FileWriteContext(); + writeContext.parentSpan = context.getCurrentOperation().operationSpan; + writeContext.recordDef = recordDef; + writeContext.fileCompression = CompressionAlgorithm.NONE; + HPCCRemoteFileWriter filePartWriter = new HPCCRemoteFileWriter(writeContext, outFilePart, recordAccessor); tasks[taskIndex] = new Runnable() { @@ -1015,15 +1159,15 @@ public void run() { HPCCRecord record = (HPCCRecord) fileReader.getNext(); fileWriter.writeRecord(record); - context.recordsWritten.incrementAndGet(); - context.recordsRead.incrementAndGet(); + context.getCurrentOperation().recordsWritten.incrementAndGet(); + context.getCurrentOperation().recordsRead.incrementAndGet(); } - context.bytesRead.addAndGet(fileReader.getStreamPosAfterLastRecord()); + context.getCurrentOperation().bytesRead.addAndGet(fileReader.getStreamPosAfterLastRecord()); inputStreams[j].close(); } fileWriter.close(); - context.bytesWritten.addAndGet(fileWriter.getBytesWritten()); + context.getCurrentOperation().bytesWritten.addAndGet(fileWriter.getBytesWritten()); } catch (Exception e) { @@ -1095,7 +1239,8 @@ private static void performRead(String[] args, TaskContext context) for (int i = 0; i < datasets.length; i++) { String datasetName = datasets[i]; - context.startOperation("Read " + datasetName); + context.startOperation("FileUtility.Read_" + datasetName); + context.setCurrentOperationSpanAttributes(Attributes.of(AttributeKey.stringKey("server.url"), connString)); HPCCFile file = null; try @@ -1104,7 +1249,8 @@ private static void performRead(String[] args, TaskContext context) } catch (Exception e) { - System.out.println("Error while attempting to open file: '" + datasetName + "': " + e.getMessage()); + String error = "Error while attempting to open file: '" + datasetName + "': " + e.getMessage(); + context.addError(error); return; } @@ -1117,7 +1263,8 @@ private static void performRead(String[] args, TaskContext context) } catch (Exception e) { - System.out.println("Error while retrieving file parts for: '" + datasetName + "': " + e.getMessage()); + String error = "Error while retrieving file parts for: '" + datasetName + "': " + e.getMessage(); + context.addError(error); return; } @@ -1163,7 +1310,7 @@ private static void performRead(String[] args, TaskContext context) try { - executeTasks(tasks, numThreads); + executeTasks(tasks, numThreads, context); } catch (Exception e) { @@ -1279,7 +1426,9 @@ private static void performReadTest(String[] args, TaskContext context) } String datasetName = cmd.getOptionValue("read_test"); - context.startOperation("Read Test " + datasetName); + context.startOperation("FileUtility.ReadTest_" + datasetName); + + context.setCurrentOperationSpanAttributes(Attributes.of(AttributeKey.stringKey("server.url"), connString)); HPCCFile file = null; try @@ -1289,7 +1438,7 @@ private static void performReadTest(String[] args, TaskContext context) } catch (Exception e) { - System.out.println("Error while attempting to open file: '" + datasetName + "': " + e.getMessage()); + context.addError("Error while attempting to open file: '" + datasetName + "': " + e.getMessage()); return; } @@ -1302,7 +1451,7 @@ private static void performReadTest(String[] args, TaskContext context) } catch (Exception e) { - System.out.println("Error while retrieving file parts for: '" + datasetName + "': " + e.getMessage()); + context.addError("Error while retrieving file parts for: '" + datasetName + "': " + e.getMessage()); return; } @@ -1317,16 +1466,15 @@ private static void performReadTest(String[] args, TaskContext context) int filePartIndex = Integer.parseInt(filePartsStrs[i]) - 1; if (filePartIndex < 0 || filePartIndex >= fileParts.length) { - System.out.println("Skipping invalid file part index: " + filePartsStrs[i] + context.addWarn("InvalidParams: Skipping invalid file part index: " + filePartsStrs[i] + " outside of range: [0," + fileParts.length + "]"); - continue; } filePartList.add(fileParts[filePartIndex]); } catch (NumberFormatException e) { - System.out.println("Skipping invalid file part index: " + filePartsStrs[i]); + context.addWarn("InvalidParams: Skipping invalid file part index: " + filePartsStrs[i]); } } } @@ -1352,7 +1500,7 @@ private static void performReadTest(String[] args, TaskContext context) try { - executeTasks(tasks, numThreads); + executeTasks(tasks, numThreads, context); } catch (Exception e) { @@ -1463,7 +1611,9 @@ private static void performCopy(String[] args, TaskContext context) String srcFile = copyPairs[i]; String destFile = copyPairs[i+1]; - context.startOperation("Copy " + srcFile + " -> " + destFile); + context.startOperation("FileUtility.Copy_ " + srcFile + " -> " + destFile); + context.setCurrentOperationSpanAttributes(Attributes.of(AttributeKey.stringKey("server.src.url"), srcURL, + AttributeKey.stringKey("server.dest.url"), destURL)); HPCCFile file = null; try @@ -1484,6 +1634,7 @@ private static void performCopy(String[] args, TaskContext context) catch (HpccFileException e) { context.addError("Error while retrieving file parts for: '" + srcFile + "': " + e.getMessage()); + return; } boolean shouldRedistribute = true; @@ -1517,7 +1668,7 @@ private static void performCopy(String[] args, TaskContext context) try { - executeTasks(tasks, numThreads); + executeTasks(tasks, numThreads, context); } catch (Exception e) { @@ -1527,13 +1678,13 @@ private static void performCopy(String[] args, TaskContext context) if (context.hasError()) { - return; + return; } try { - long bytesWritten = context.bytesWritten.get(); - long recordsWritten = context.recordsWritten.get(); + long bytesWritten = context.getCurrentOperation().bytesWritten.get(); + long recordsWritten = context.getCurrentOperation().recordsWritten.get(); dfuClient.publishFile(createResult.getFileID(), eclRecordDefn, recordsWritten, bytesWritten, true); } catch (Exception e) @@ -1641,7 +1792,10 @@ private static void performWrite(String[] args, TaskContext context) String srcFile = writePairs[pairIdx]; String destFile = writePairs[pairIdx+1]; - context.startOperation("Write " + srcFile + " -> " + destFile); + context.startOperation( "FileUtility.Write_" + srcFile + "_to_" + destFile); + + Attributes attributes = Attributes.of(AttributeKey.stringKey("server.url"), destURL); + context.setCurrentOperationSpanAttributes(attributes); SplitTable[] splitTables = null; String[] srcFiles = null; @@ -1685,7 +1839,7 @@ private static void performWrite(String[] args, TaskContext context) try { - executeTasks(tasks, numThreads); + executeTasks(tasks, numThreads, context); } catch (Exception e) { @@ -1741,7 +1895,7 @@ private static void performWrite(String[] args, TaskContext context) try { - executeTasks(tasks, numThreads); + executeTasks(tasks, numThreads, context); } catch (Exception e) { @@ -1756,8 +1910,8 @@ private static void performWrite(String[] args, TaskContext context) try { - long bytesWritten = context.bytesWritten.get(); - long recordsWritten = context.recordsWritten.get(); + long bytesWritten = context.getCurrentOperation().bytesWritten.get(); + long recordsWritten = context.getCurrentOperation().recordsWritten.get(); dfuClient.publishFile(createResult.getFileID(), eclRecordDefn, recordsWritten, bytesWritten, true); } catch (Exception e) @@ -1777,6 +1931,28 @@ private static void performWrite(String[] args, TaskContext context) */ public static JSONArray run(String[] args) { + if (!otelInitialized) + { + if (Boolean.getBoolean("otel.java.global-autoconfigure.enabled")) + { + System.out.println("OpenTelemetry autoconfiguration enabled with following values."); + System.out.println("If any of these options are not provided, they will defalt to values which could require additional CLASSPATH dependancies."); + System.out.println("If missing dependancies arise, utility will halt!"); + System.out.println(" otel.traces.exporter sys property: " + System.getProperty("otel.traces.exporter")); + System.out.println(" OTEL_TRACES_EXPORTER Env var: " + System.getenv("OTEL_TRACES_EXPORTER")); + System.out.println(" OTEL_TRACES_SAMPLER Env var: " + System.getenv("OTEL_TRACES_SAMPLER")); + System.out.println(" otel.traces.sampler sys property: " + System.getProperty("otel.traces.sampler")); + System.out.println(" otel.logs.exporter: "+ System.getProperty("otel.logs.exporter")); + System.out.println(" OTEL_LOGS_EXPORTER Env var: " + System.getenv("OTEL_LOGS_EXPORTER")); + System.out.println(" otel.metrics.exporter: "+ System.getProperty("otel.metrics.exporter")); + System.out.println(" OTEL_METRICS_EXPORTER Env var: " + System.getenv("OTEL_METRICS_EXPORTER")); + + OpenTelemetry otel = AutoConfiguredOpenTelemetrySdk.initialize().getOpenTelemetrySdk(); + } + + otelInitialized = true; + } + Options options = getTopLevelOptions(); CommandLineParser parser = new DefaultParser(); @@ -1815,7 +1991,7 @@ else if (cmd.hasOption("write")) } // If we are still in the middle of an operation there was a failure - if (context.hasOperation()) + if (context.hasCurrentOperation()) { boolean succeded = false; context.endOperation(succeded); diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/HPCCRemoteFileWriter.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/HPCCRemoteFileWriter.java index 05092272d..f1f4c5322 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/HPCCRemoteFileWriter.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/HPCCRemoteFileWriter.java @@ -36,7 +36,6 @@ public class HPCCRemoteFileWriter { private static final Logger log = LogManager.getLogger(HPCCRemoteFileWriter.class); - private FieldDef recordDef = null; private DataPartition dataPartition = null; private RowServiceOutputStream outputStream = null; private BinaryRecordWriter binaryRecordWriter = null; @@ -44,9 +43,31 @@ public class HPCCRemoteFileWriter private long recordsWritten = 0; private long openTimeMs = 0; + private FileWriteContext context = null; + private Span writeSpan = null; private String writeSpanName = null; + public static class FileWriteContext + { + public FieldDef recordDef = null; + public CompressionAlgorithm fileCompression = CompressionAlgorithm.DEFAULT; + public int connectTimeoutMs = -1; + public int socketOpTimeoutMs = -1; + public Span parentSpan = null; + } + + private static FileWriteContext constructReadContext(FieldDef recordDef, CompressionAlgorithm fileCompression, int connectTimeoutMs, int socketOpTimeoutMs) + { + FileWriteContext context = new FileWriteContext(); + context.recordDef = recordDef; + context.fileCompression = fileCompression; + context.connectTimeoutMs = connectTimeoutMs; + context.socketOpTimeoutMs = socketOpTimeoutMs; + + return context; + } + /** * A remote file writer. * @@ -110,13 +131,19 @@ public HPCCRemoteFileWriter(DataPartition dp, FieldDef recordDef, IRecordAccesso public HPCCRemoteFileWriter(DataPartition dp, FieldDef recordDef, IRecordAccessor recordAccessor, CompressionAlgorithm fileCompression, int connectTimeoutMs, int socketOpTimeoutMs) throws Exception { - this.recordDef = recordDef; + this(constructReadContext(recordDef, fileCompression, connectTimeoutMs, socketOpTimeoutMs), dp, recordAccessor); + } + + public HPCCRemoteFileWriter(FileWriteContext ctx, DataPartition dp, IRecordAccessor recordAccessor) + throws Exception + { this.dataPartition = dp; + this.context = ctx; this.recordAccessor = recordAccessor; this.writeSpanName = "HPCCRemoteFileWriter.RowService/Write_" + dp.getFileName() + "_" + dp.getThisPart(); - this.writeSpan = Utils.createSpan(writeSpanName); + this.writeSpan = Utils.createChildSpan(context.parentSpan, writeSpanName); String primaryIP = dp.getCopyIP(0); String secondaryIP = ""; @@ -125,22 +152,22 @@ public HPCCRemoteFileWriter(DataPartition dp, FieldDef recordDef, IRecordAccesso secondaryIP = dp.getCopyIP(1); } - Attributes attributes = Attributes.of( AttributeKey.stringKey("server.primary.address"), primaryIP, - AttributeKey.stringKey("server.secondary.address"), secondaryIP, + Attributes attributes = Attributes.of( AttributeKey.stringKey("server.0.address"), primaryIP, + AttributeKey.stringKey("server.1.address"), secondaryIP, ServerAttributes.SERVER_PORT, Long.valueOf(dp.getPort())); writeSpan.setAllAttributes(attributes); this.outputStream = new RowServiceOutputStream(dataPartition.getCopyIP(0), dataPartition.getPort(), dataPartition.getUseSsl(), - dataPartition.getFileAccessBlob(), this.recordDef, this.dataPartition.getThisPart(), this.dataPartition.getCopyPath(0), - fileCompression, connectTimeoutMs, socketOpTimeoutMs, this.writeSpan); + dataPartition.getFileAccessBlob(), context.recordDef, this.dataPartition.getThisPart(), this.dataPartition.getCopyPath(0), + context.fileCompression, context.connectTimeoutMs, context.socketOpTimeoutMs, this.writeSpan); this.binaryRecordWriter = new BinaryRecordWriter(this.outputStream); this.binaryRecordWriter.initialize(this.recordAccessor); log.info("HPCCRemoteFileWriter: Opening file part: " + dataPartition.getThisPart() - + " compression: " + fileCompression.name()); + + " compression: " + context.fileCompression.name()); log.trace("Record definition:\n" - + RecordDefinitionTranslator.toJsonRecord(this.recordDef)); + + RecordDefinitionTranslator.toJsonRecord(context.recordDef)); openTimeMs = System.currentTimeMillis(); } diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/HpccRemoteFileReader.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/HpccRemoteFileReader.java index d8e2a1aef..4e0c640c4 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/HpccRemoteFileReader.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/HpccRemoteFileReader.java @@ -38,7 +38,6 @@ public class HpccRemoteFileReader implements Iterator { private static final Logger log = LogManager.getLogger(HpccRemoteFileReader.class); - private FieldDef originalRecordDef = null; private DataPartition dataPartition = null; private RowServiceInputStream inputStream = null; private BinaryRecordReader binaryRecordReader; @@ -46,18 +45,13 @@ public class HpccRemoteFileReader implements Iterator private boolean handlePrefetch = true; private boolean isClosed = false; private boolean canReadNext = true; - private boolean createPrefetchThread = true; private int retryCount = 0; - private int connectTimeout = 0; - private int readSizeKB = 0; - private int limit = -1; private int maxReadRetries = DEFAULT_READ_RETRIES; - private int socketOpTimeoutMs = 0; private long openTimeMs = 0; private long recordsRead = 0; + private FileReadContext context = null; private Span readSpan = null; - private String readSpanName = null; public static final int NO_RECORD_LIMIT = -1; public static final int DEFAULT_READ_SIZE_OPTION = -1; @@ -71,6 +65,31 @@ public static class FileReadResumeInfo public long recordReaderStreamPos = 0; }; + public static class FileReadContext + { + public FieldDef originalRD = null; + public int connectTimeout = -1; + public int socketOpTimeoutMS = -1; + public int recordReadLimit = -1; + public boolean createPrefetchThread = true; + public int readSizeKB = -1; + public Span parentSpan = null; + }; + + private static FileReadContext constructReadContext(FieldDef originalRD, int connectTimeout, int socketOpTimeoutMS, + int recordReadLimit, boolean createPrefetchThread, int readSizeKB) + { + FileReadContext context = new FileReadContext(); + context.originalRD = originalRD; + context.connectTimeout = connectTimeout; + context.socketOpTimeoutMS = socketOpTimeoutMS; + context.recordReadLimit = recordReadLimit; + context.createPrefetchThread = createPrefetchThread; + context.readSizeKB = readSizeKB; + + return context; + } + /** * Instantiates a new hpcc remote file reader. * @@ -204,19 +223,51 @@ public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilde * @throws Exception * general exception */ - public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilder recBuilder, int connectTimeout, int limit, boolean createPrefetchThread, int readSizeKB, FileReadResumeInfo resumeInfo, int socketOpTimeoutMs) throws Exception + public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilder recBuilder, int connectTimeout, int limit, boolean createPrefetchThread, int readSizeKB, FileReadResumeInfo resumeInfo, int socketOpTimeoutMs) throws Exception + { + this(constructReadContext(originalRD, connectTimeout, socketOpTimeoutMs, limit, createPrefetchThread, readSizeKB), dp, recBuilder, resumeInfo); + } + + /** + * A remote file reader that reads the part identified by the HpccPart object using the record definition provided. + * + * @param ctx + * the FileReadContext + * @param dp + * the part of the file, name and location + * @param recBuilder + * the IRecordBuilder used to construct records + * @throws Exception + * general exception + */ + public HpccRemoteFileReader(FileReadContext ctx, DataPartition dp, IRecordBuilder recBuilder) throws Exception + { + this(ctx, dp, recBuilder, null); + } + + /** + * A remote file reader that reads the part identified by the HpccPart object using the record definition provided. + * + * @param ctx + * the FileReadContext + * @param dp + * the part of the file, name and location + * @param recBuilder + * the IRecordBuilder used to construct records + * @param resumeInfo + * FileReadeResumeInfo data required to restart a read from a particular point in a file, null for reading from start + * @throws Exception + * general exception + */ + public HpccRemoteFileReader(FileReadContext ctx, DataPartition dp, IRecordBuilder recBuilder, FileReadResumeInfo resumeInfo) throws Exception { - this.handlePrefetch = createPrefetchThread; - this.originalRecordDef = originalRD; + this.context = ctx; + this.handlePrefetch = context.createPrefetchThread; this.dataPartition = dp; this.recordBuilder = recBuilder; - this.readSizeKB = readSizeKB; - this.limit = limit; - this.createPrefetchThread = createPrefetchThread; - this.socketOpTimeoutMs = socketOpTimeoutMs; - this.readSpanName = "HPCCRemoteFileReader.RowService/Read_" + dp.getFileName() + "_" + dp.getThisPart(); - this.readSpan = Utils.createSpan(readSpanName); + String readSpanName = "HPCCRemoteFileReader.RowService/Read_" + dataPartition.getFileName() + "_" + dataPartition.getThisPart(); + this.readSpan = Utils.createChildSpan(context.parentSpan, readSpanName); String primaryIP = dp.getCopyIP(0); String secondaryIP = ""; @@ -225,19 +276,13 @@ public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilde secondaryIP = dp.getCopyIP(1); } - Attributes attributes = Attributes.of( AttributeKey.stringKey("server.primary.address"), primaryIP, - AttributeKey.stringKey("server.secondary.address"), secondaryIP, + Attributes attributes = Attributes.of( AttributeKey.stringKey("server.0.address"), primaryIP, + AttributeKey.stringKey("server.1.address"), secondaryIP, ServerAttributes.SERVER_PORT, Long.valueOf(dp.getPort()), - AttributeKey.longKey("read.size"), Long.valueOf(readSizeKB*1000)); - readSpan.setAllAttributes(attributes); - - if (connectTimeout < 1) - { - connectTimeout = RowServiceInputStream.DEFAULT_CONNECT_TIMEOUT_MILIS; - } - this.connectTimeout = connectTimeout; + AttributeKey.longKey("read.size"), Long.valueOf(context.readSizeKB*1000)); + this.readSpan.setAllAttributes(attributes); - if (this.originalRecordDef == null) + if (context.originalRD == null) { Exception e = new Exception("HpccRemoteFileReader: Provided original record definition is null, original record definition is required."); this.readSpan.recordException(e); @@ -256,7 +301,9 @@ public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilde if (resumeInfo == null) { - this.inputStream = new RowServiceInputStream(this.dataPartition, this.originalRecordDef, projectedRecordDefinition, connectTimeout, limit, createPrefetchThread, readSizeKB, null, false, socketOpTimeoutMs, readSpan); + this.inputStream = new RowServiceInputStream(this.dataPartition, context.originalRD, projectedRecordDefinition, context.connectTimeout, + context.recordReadLimit, context.createPrefetchThread, context.readSizeKB, null, + false, context.socketOpTimeoutMS, this.readSpan); this.binaryRecordReader = new BinaryRecordReader(this.inputStream); this.binaryRecordReader.initialize(this.recordBuilder); @@ -271,7 +318,9 @@ public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilde restartInfo.streamPos = resumeInfo.inputStreamPos; restartInfo.tokenBin = resumeInfo.tokenBin; - this.inputStream = new RowServiceInputStream(this.dataPartition, this.originalRecordDef, projectedRecordDefinition, connectTimeout, limit, createPrefetchThread, readSizeKB, restartInfo, false, socketOpTimeoutMs, this.readSpan); + this.inputStream = new RowServiceInputStream(this.dataPartition, context.originalRD, projectedRecordDefinition, context.connectTimeout, + context.recordReadLimit, context.createPrefetchThread, context.readSizeKB, restartInfo, + false, context.socketOpTimeoutMS, this.readSpan); long bytesToSkip = resumeInfo.recordReaderStreamPos - resumeInfo.inputStreamPos; if (bytesToSkip < 0) @@ -289,7 +338,7 @@ public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilde log.info("HPCCRemoteFileReader: Opening file part: " + dataPartition.getThisPart() + (resumeInfo != null ? " resume position: " + resumeInfo.inputStreamPos : "" )); log.trace("Original record definition:\n" - + RecordDefinitionTranslator.toJsonRecord(originalRD) + + RecordDefinitionTranslator.toJsonRecord(context.originalRD) + " projected record definition:\n" + RecordDefinitionTranslator.toJsonRecord(projectedRecordDefinition)); openTimeMs = System.currentTimeMillis(); @@ -315,10 +364,19 @@ private boolean retryRead() try { - this.readSpan = Utils.createSpan(readSpanName); - this.inputStream = new RowServiceInputStream(this.dataPartition, this.originalRecordDef, - this.recordBuilder.getRecordDefinition(), this.connectTimeout, this.limit, this.createPrefetchThread, - this.readSizeKB, restartInfo, false, this.socketOpTimeoutMs, this.readSpan); + String readSpanName = "HPCCRemoteFileReader.RowService/Read_" + dataPartition.getFileName() + "_" + dataPartition.getThisPart(); + if (context.parentSpan != null) + { + this.readSpan = Utils.createChildSpan(context.parentSpan, readSpanName); + } + else + { + this.readSpan = Utils.createSpan(readSpanName); + } + + this.inputStream = new RowServiceInputStream(this.dataPartition, context.originalRD,this.recordBuilder.getRecordDefinition(), + context.connectTimeout, context.recordReadLimit, context.createPrefetchThread, + context.readSizeKB, restartInfo, false, context.socketOpTimeoutMS, this.readSpan); long bytesToSkip = resumeInfo.recordReaderStreamPos - resumeInfo.inputStreamPos; if (bytesToSkip < 0) diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java index c0381c82f..b1fbb7c66 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java @@ -410,8 +410,17 @@ public RowServiceInputStream(DataPartition dp, FieldDef rd, FieldDef pRd, int co this.handle = 0; this.tokenBin = null; this.simulateFail = false; - this.connectTimeout = connectTimeout; - this.socketOpTimeoutMs = socketOpTimeoutMS; + + if (connectTimeout > 0) + { + this.connectTimeout = connectTimeout; + } + + if (socketOpTimeoutMS > 0) + { + this.socketOpTimeoutMs = socketOpTimeoutMS; + } + this.recordLimit = limit; this.readBufferCapacity.set(this.maxReadSizeKB*1024*2); @@ -555,8 +564,7 @@ private void setPrefetchException(HpccFileException e) if (readSpan != null) { - Attributes attributes = Attributes.of( ServerAttributes.SERVER_ADDRESS, getIP(), - ServerAttributes.SERVER_PORT, Long.valueOf(getPort()), + Attributes attributes = Attributes.of( AttributeKey.longKey("server.index"), Long.valueOf(getFilePartCopy()), ExceptionAttributes.EXCEPTION_MESSAGE, e.getMessage()); readSpan.recordException(e, attributes); } @@ -725,8 +733,7 @@ public void startBlockingFetchRequest(List fetchOffsets) throws Exception Exception wrappedException = new Exception("Error: attempted to start a fetch request for an input stream in sequential read mode."); if (readSpan != null) { - Attributes attributes = Attributes.of( ServerAttributes.SERVER_ADDRESS, getIP(), - ServerAttributes.SERVER_PORT, Long.valueOf(getPort()), + Attributes attributes = Attributes.of( AttributeKey.longKey("server.index"), Long.valueOf(getFilePartCopy()), ExceptionAttributes.EXCEPTION_MESSAGE, wrappedException.getMessage()); readSpan.recordException(wrappedException, attributes); } @@ -812,15 +819,6 @@ private int startFetch() } String prefix = "RowServiceInputStream.startFetch(), file " + dataPart.getFileName() + " part " + dataPart.getThisPart() + " on IP " + getIP() + ":"; - if (readSpan != null) - { - Attributes attributes = Attributes.of(ServerAttributes.SERVER_ADDRESS, getIP(), - ServerAttributes.SERVER_PORT, Long.valueOf(getPort()), - AttributeKey.longKey("read.offset"), streamPos, - AttributeKey.longKey("read.size"), Long.valueOf(maxReadSizeKB*1000)); - readSpan.addEvent("RowServiceInputStream.readRequest", attributes); - } - //------------------------------------------------------------------------------ // If we haven't made the connection active, activate it now and send the // first request. @@ -833,6 +831,14 @@ private int startFetch() numFetches++; if (!this.active.get()) { + if (readSpan != null) + { + Attributes attributes = Attributes.of( AttributeKey.longKey("server.index"), Long.valueOf(getFilePartCopy()), + AttributeKey.longKey("read.offset"), streamPos, + AttributeKey.longKey("read.size"), Long.valueOf(maxReadSizeKB*1000)); + readSpan.addEvent("RowServiceInputStream.readRequest", attributes); + } + try { makeActive(); @@ -1038,8 +1044,7 @@ private void readDataInFetch() IOException wrappedException = new IOException(prefix + "Encountered unexpected end of stream mid fetch, this.dis.available() returned " + bytesToRead + " bytes."); if (readSpan != null) { - Attributes attributes = Attributes.of( ServerAttributes.SERVER_ADDRESS, getIP(), - ServerAttributes.SERVER_PORT, Long.valueOf(getPort()), + Attributes attributes = Attributes.of( AttributeKey.longKey("server.index"), Long.valueOf(getFilePartCopy()), ExceptionAttributes.EXCEPTION_MESSAGE, wrappedException.getMessage()); readSpan.recordException(wrappedException, attributes); } @@ -1132,9 +1137,8 @@ private void finishFetch() if (readSpan != null) { - Attributes attributes = Attributes.of(ServerAttributes.SERVER_ADDRESS, getIP(), - ServerAttributes.SERVER_PORT, Long.valueOf(getPort()), - AttributeKey.longKey("read.bytesRead"), Long.valueOf(totalDataInCurrentRequest)); + Attributes attributes = Attributes.of( AttributeKey.longKey("server.index"), Long.valueOf(getFilePartCopy()), + AttributeKey.longKey("read.bytesRead"), Long.valueOf(totalDataInCurrentRequest)); readSpan.addEvent("RowServiceInputStream.readResponse", attributes); } @@ -1146,10 +1150,9 @@ private void finishFetch() { if (readSpan != null) { - Attributes attributes = Attributes.of(ServerAttributes.SERVER_ADDRESS, getIP(), - ServerAttributes.SERVER_PORT, Long.valueOf(getPort()), - AttributeKey.longKey("read.offset"), streamPos, - AttributeKey.longKey("read.size"), Long.valueOf(maxReadSizeKB*1000)); + Attributes attributes = Attributes.of( AttributeKey.longKey("server.index"), Long.valueOf(getFilePartCopy()), + AttributeKey.longKey("read.offset"), streamPos, + AttributeKey.longKey("read.size"), Long.valueOf(maxReadSizeKB*1000)); readSpan.addEvent("RowServiceInputStream.readRequest", attributes); } @@ -1333,13 +1336,6 @@ public int available() throws IOException { // this.bufferWriteMutex.release(); IOException wrappedException = new IOException(prefix + "End of input stream, bufferLen:" + bufferLen + ", this.readPos:" + this.readPos + ", availableBytes=0"); - if (readSpan != null) - { - Attributes attributes = Attributes.of( ServerAttributes.SERVER_ADDRESS, getIP(), - ServerAttributes.SERVER_PORT, Long.valueOf(getPort()), - ExceptionAttributes.EXCEPTION_MESSAGE, wrappedException.getMessage()); - readSpan.recordException(wrappedException, attributes); - } throw wrappedException; } } @@ -1679,8 +1675,7 @@ private void makeActive() throws HpccFileException if (readSpan != null) { - Attributes attributes = Attributes.of(ServerAttributes.SERVER_ADDRESS, getIP(), - ServerAttributes.SERVER_PORT, Long.valueOf(getPort())); + Attributes attributes = Attributes.of( AttributeKey.longKey("server.index"), Long.valueOf(getFilePartCopy()) ); readSpan.addEvent("RowServiceInputStream.connect", attributes); } @@ -1753,8 +1748,7 @@ private void makeActive() throws HpccFileException if (readSpan != null) { - Attributes attributes = Attributes.of(ServerAttributes.SERVER_ADDRESS, getIP(), - ServerAttributes.SERVER_PORT, Long.valueOf(getPort())); + Attributes attributes = Attributes.of( AttributeKey.longKey("server.index"), Long.valueOf(getFilePartCopy()) ); readSpan.addEvent("RowServiceInputStream.versionRequest", attributes); } @@ -1795,9 +1789,8 @@ private void makeActive() throws HpccFileException if (readSpan != null) { - Attributes attributes = Attributes.of(ServerAttributes.SERVER_ADDRESS, getIP(), - ServerAttributes.SERVER_PORT, Long.valueOf(getPort()), - ServiceAttributes.SERVICE_VERSION, rowServiceVersion); + Attributes attributes = Attributes.of( AttributeKey.longKey("server.index"), Long.valueOf(getFilePartCopy()), + ServiceAttributes.SERVICE_VERSION, rowServiceVersion); readSpan.addEvent("RowServiceInputStream.versionResponse", attributes); } } @@ -2127,7 +2120,7 @@ private void makeFetchObject(StringBuilder sb) private String makeGetVersionRequest() { - final String trace = traceContextHeader != null ? "\"_trace/traceparent\" : \"" + traceContextHeader + "\",\n" : ""; + final String trace = traceContextHeader != null ? "\"_trace\": { \"traceparent\" : \"" + traceContextHeader + "\" },\n" : ""; final String versionMsg = RFCCodes.RFCStreamReadCmd + "{ \"command\" : \"version\", \"handle\": \"-1\", " + trace + " \"format\": \"binary\" }"; return versionMsg; } @@ -2140,10 +2133,8 @@ private String makeInitialRequest() sb.append("{ \"format\" : \"binary\", \n"); sb.append("\"replyLimit\" : " + this.maxReadSizeKB + ",\n"); - if (traceContextHeader != null) - { - sb.append("\"_trace/traceparent\" : \"" + traceContextHeader + "\",\n"); - } + final String trace = traceContextHeader != null ? "\"_trace\": { \"traceparent\" : \"" + traceContextHeader + "\" },\n" : ""; + sb.append(trace); if (!useOldProtocol) { @@ -2213,10 +2204,8 @@ private String makeHandleRequest() sb.append("{ \"format\" : \"binary\",\n"); sb.append(" \"handle\" : \"" + Integer.toString(this.handle) + "\","); - if (traceContextHeader != null) - { - sb.append("\"_trace/traceparent\" : \"" + traceContextHeader + "\",\n"); - } + final String trace = traceContextHeader != null ? "\"_trace\": { \"traceparent\" : \"" + traceContextHeader + "\" },\n" : ""; + sb.append(trace); if (!useOldProtocol) { @@ -2241,10 +2230,8 @@ private String makeTokenRequest() sb.append("{ \"format\" : \"binary\",\n"); sb.append("\"replyLimit\" : " + this.maxReadSizeKB + ",\n"); - if (traceContextHeader != null) - { - sb.append("\"_trace/traceparent\" : \"" + traceContextHeader + "\",\n"); - } + final String trace = traceContextHeader != null ? "\"_trace\": { \"traceparent\" : \"" + traceContextHeader + "\" },\n" : ""; + sb.append(trace); if (!useOldProtocol) { @@ -2266,10 +2253,8 @@ private String makeCloseHandleRequest() sb.append("{ \"format\" : \"binary\",\n"); sb.append(" \"handle\" : \"" + Integer.toString(this.handle) + "\","); - if (traceContextHeader != null) - { - sb.append("\"_trace/traceparent\" : \"" + traceContextHeader + "\",\n"); - } + final String trace = traceContextHeader != null ? "\"_trace\": { \"traceparent\" : \"" + traceContextHeader + "\" },\n" : ""; + sb.append(trace); sb.append(" \"command\" : \"close\""); sb.append("\n}"); @@ -2311,8 +2296,7 @@ private void sendCloseFileRequest() throws IOException IOException wrappedException = new IOException(prefix + "Failed to close file. Unable to read response with error: " + e.getMessage(), e); if (readSpan != null) { - Attributes attributes = Attributes.of( ServerAttributes.SERVER_ADDRESS, getIP(), - ServerAttributes.SERVER_PORT, Long.valueOf(getPort()), + Attributes attributes = Attributes.of( AttributeKey.longKey("server.index"), Long.valueOf(getFilePartCopy()), ExceptionAttributes.EXCEPTION_MESSAGE, wrappedException.getMessage()); readSpan.recordException(wrappedException, attributes); } diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceOutputStream.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceOutputStream.java index 35de43b12..e21808f8d 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceOutputStream.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceOutputStream.java @@ -18,6 +18,7 @@ import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; +import java.text.AttributedCharacterIterator.Attribute; import java.io.OutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; @@ -235,8 +236,18 @@ private static class RowServiceResponse this.filePath = filePartPath; this.accessToken = accessToken; this.compressionAlgo = fileCompression; + + if (sockOpTimeoutMS < 0) + { + sockOpTimeoutMS = DEFAULT_SOCKET_OP_TIMEOUT_MS; + } this.sockOpTimeoutMs = sockOpTimeoutMS; + if (connectTimeoutMs < 0) + { + connectTimeoutMs = DEFAULT_CONNECT_TIMEOUT_MILIS; + } + if (writeSpan != null && writeSpan.getSpanContext().isValid()) { this.writeSpan = writeSpan; @@ -245,9 +256,7 @@ private static class RowServiceResponse if (this.writeSpan != null) { - Attributes attributes = Attributes.of(ServerAttributes.SERVER_ADDRESS, this.rowServiceIP, - ServerAttributes.SERVER_PORT, Long.valueOf(this.rowServicePort)); - writeSpan.addEvent("RowServiceOutputStream.connect", attributes); + writeSpan.addEvent("RowServiceOutputStream.connect", getServerAttributes()); } try @@ -295,9 +304,7 @@ private static class RowServiceResponse Exception wrappedException = new Exception(errorMessage, e); if (writeSpan != null) { - Attributes attributes = Attributes.of(ServerAttributes.SERVER_ADDRESS, this.rowServiceIP, - ServerAttributes.SERVER_PORT, Long.valueOf(this.rowServicePort)); - writeSpan.recordException(wrappedException, attributes); + writeSpan.recordException(wrappedException, getServerAttributes()); } throw wrappedException; @@ -309,9 +316,7 @@ private static class RowServiceResponse if (writeSpan != null) { - Attributes attributes = Attributes.of(ServerAttributes.SERVER_ADDRESS, this.rowServiceIP, - ServerAttributes.SERVER_PORT, Long.valueOf(this.rowServicePort)); - writeSpan.addEvent("RowServiceOutputStream.versionRequest", attributes); + writeSpan.addEvent("RowServiceOutputStream.versionRequest", getServerAttributes()); } try @@ -328,9 +333,7 @@ private static class RowServiceResponse HpccFileException wrappedException = new HpccFileException("Failed on initial remote read read trans", e); if (writeSpan != null) { - Attributes attributes = Attributes.of(ServerAttributes.SERVER_ADDRESS, this.rowServiceIP, - ServerAttributes.SERVER_PORT, Long.valueOf(this.rowServicePort)); - writeSpan.recordException(wrappedException, attributes); + writeSpan.recordException(wrappedException, getServerAttributes()); } throw wrappedException; @@ -355,9 +358,7 @@ private static class RowServiceResponse HpccFileException wrappedException = new HpccFileException("Error while attempting to read version response.", e); if (writeSpan != null) { - Attributes attributes = Attributes.of(ServerAttributes.SERVER_ADDRESS, this.rowServiceIP, - ServerAttributes.SERVER_PORT, Long.valueOf(this.rowServicePort)); - writeSpan.recordException(wrappedException, attributes); + writeSpan.recordException(wrappedException, getServerAttributes()); } throw wrappedException; @@ -372,9 +373,16 @@ private static class RowServiceResponse makeInitialWriteRequest(); } + private Attributes getServerAttributes() + { + return Attributes.of(ServerAttributes.SERVER_ADDRESS, this.rowServiceIP, + ServerAttributes.SERVER_PORT, Long.valueOf(this.rowServicePort)); + } + private String makeGetVersionRequest() { - final String trace = traceContextHeader != null ? "\"_trace/traceparent\" : \"" + traceContextHeader + "\",\n" : ""; + final String trace = traceContextHeader != null ? "\"_trace\": { \"traceparent\" : \"" + traceContextHeader + "\" },\n" : ""; + final String versionMsg = RFCCodes.RFCStreamReadCmd + "{ \"command\" : \"version\", \"handle\": \"-1\", " + trace + " \"format\": \"binary\" }"; return versionMsg; } @@ -383,7 +391,7 @@ private void makeInitialWriteRequest() throws Exception { String jsonRecordDef = RecordDefinitionTranslator.toJsonRecord(this.recordDef).toString(); - final String trace = traceContextHeader != null ? "\"_trace/traceparent\" : \"" + traceContextHeader + "\",\n" : ""; + final String trace = traceContextHeader != null ? "\"_trace\": { \"traceparent\" : \"" + traceContextHeader + "\" },\n" : ""; String initialRequest = "\n{\n" + " \"format\" : \"binary\",\n" + trace @@ -429,9 +437,7 @@ private void makeInitialWriteRequest() throws Exception IOException wrappedException = new IOException(response.errorMessage); if (writeSpan != null) { - Attributes attributes = Attributes.of(ServerAttributes.SERVER_ADDRESS, this.rowServiceIP, - ServerAttributes.SERVER_PORT, Long.valueOf(this.rowServicePort)); - writeSpan.recordException(wrappedException, attributes); + writeSpan.recordException(wrappedException, getServerAttributes()); } throw wrappedException; @@ -440,7 +446,7 @@ private void makeInitialWriteRequest() throws Exception private String makeCloseHandleRequest() { - final String trace = traceContextHeader != null ? "\"_trace/traceparent\" : \"" + traceContextHeader + "\",\n" : ""; + final String trace = traceContextHeader != null ? "\"_trace\": { \"traceparent\" : \"" + traceContextHeader + "\" },\n" : ""; StringBuilder sb = new StringBuilder(256); sb.delete(0, sb.length()); @@ -477,9 +483,7 @@ private void sendCloseFileRequest() throws IOException IOException wrappedException = new IOException("Failed on close file with error: ", e); if (writeSpan != null) { - Attributes attributes = Attributes.of(ServerAttributes.SERVER_ADDRESS, this.rowServiceIP, - ServerAttributes.SERVER_PORT, Long.valueOf(this.rowServicePort)); - writeSpan.recordException(wrappedException, attributes); + writeSpan.recordException(wrappedException, getServerAttributes()); } throw wrappedException; @@ -495,9 +499,7 @@ private void sendCloseFileRequest() throws IOException IOException wrappedException = new IOException("Failed to close file. Unable to read response with error: ", e); if (writeSpan != null) { - Attributes attributes = Attributes.of(ServerAttributes.SERVER_ADDRESS, this.rowServiceIP, - ServerAttributes.SERVER_PORT, Long.valueOf(this.rowServicePort)); - writeSpan.recordException(wrappedException, attributes); + writeSpan.recordException(wrappedException, getServerAttributes()); } throw wrappedException; @@ -508,9 +510,7 @@ private void sendCloseFileRequest() throws IOException IOException wrappedException = new IOException(response.errorMessage); if (writeSpan != null) { - Attributes attributes = Attributes.of(ServerAttributes.SERVER_ADDRESS, this.rowServiceIP, - ServerAttributes.SERVER_PORT, Long.valueOf(this.rowServicePort)); - writeSpan.recordException(wrappedException, attributes); + writeSpan.recordException(wrappedException, getServerAttributes()); } throw wrappedException; @@ -579,9 +579,7 @@ private RowServiceResponse readResponse() throws HpccFileException HpccFileException wrappedException = new HpccFileException("Early data termination, no handle"); if (writeSpan != null) { - Attributes attributes = Attributes.of(ServerAttributes.SERVER_ADDRESS, this.rowServiceIP, - ServerAttributes.SERVER_PORT, Long.valueOf(this.rowServicePort)); - writeSpan.recordException(wrappedException, attributes); + writeSpan.recordException(wrappedException, getServerAttributes()); } throw wrappedException; @@ -595,9 +593,7 @@ private RowServiceResponse readResponse() throws HpccFileException HpccFileException wrappedException = new HpccFileException("Error while attempting to read row service response: ", e); if (writeSpan != null) { - Attributes attributes = Attributes.of(ServerAttributes.SERVER_ADDRESS, this.rowServiceIP, - ServerAttributes.SERVER_PORT, Long.valueOf(this.rowServicePort)); - writeSpan.recordException(wrappedException, attributes); + writeSpan.recordException(wrappedException, getServerAttributes()); } throw wrappedException; @@ -624,9 +620,7 @@ else if (bytesWritten == 0 && compressionAlgo != CompressionAlgorithm.NONE) IOException wrappedException = new IOException("Fatal error while closing file. Writing compressed files with 0 length is not supported with the remote HPCC cluster."); if (writeSpan != null) { - Attributes attributes = Attributes.of(ServerAttributes.SERVER_ADDRESS, this.rowServiceIP, - ServerAttributes.SERVER_PORT, Long.valueOf(this.rowServicePort)); - writeSpan.recordException(wrappedException, attributes); + writeSpan.recordException(wrappedException, getServerAttributes()); } throw wrappedException; @@ -662,7 +656,7 @@ public void write(byte[] b) throws IOException */ public void write(byte[] b, int off, int len) throws IOException { - final String trace = traceContextHeader != null ? "\"_trace/traceparent\" : \"" + traceContextHeader + "\",\n" : ""; + final String trace = traceContextHeader != null ? "\"_trace\": { \"traceparent\" : \"" + traceContextHeader + "\" },\n" : ""; String request = "{ \"format\" : \"binary\", \"handle\" : \"" + this.handle + "\"," + trace @@ -702,9 +696,7 @@ public void write(byte[] b, int off, int len) throws IOException IOException wrappedException = new IOException("Failed during write operation. Unable to read response with error: ", e); if (writeSpan != null) { - Attributes attributes = Attributes.of(ServerAttributes.SERVER_ADDRESS, this.rowServiceIP, - ServerAttributes.SERVER_PORT, Long.valueOf(this.rowServicePort)); - writeSpan.recordException(wrappedException, attributes); + writeSpan.recordException(wrappedException, getServerAttributes()); } throw wrappedException; @@ -715,9 +707,7 @@ public void write(byte[] b, int off, int len) throws IOException IOException wrappedException = new IOException(response.errorMessage); if (writeSpan != null) { - Attributes attributes = Attributes.of(ServerAttributes.SERVER_ADDRESS, this.rowServiceIP, - ServerAttributes.SERVER_PORT, Long.valueOf(this.rowServicePort)); - writeSpan.recordException(wrappedException, attributes); + writeSpan.recordException(wrappedException, getServerAttributes()); } throw wrappedException; diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/Utils.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/Utils.java index e33373c44..20f9b8607 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/Utils.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/Utils.java @@ -13,6 +13,7 @@ package org.hpccsystems.dfs.client; import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.SpanKind; import io.opentelemetry.api.trace.Tracer; @@ -20,24 +21,53 @@ public class Utils { + private static OpenTelemetry globalOpenTelemetry = null; + private static Tracer dfsClientTracer = null; + + public static OpenTelemetry getOpenTelemetry() + { + if (globalOpenTelemetry == null) + { + globalOpenTelemetry = GlobalOpenTelemetry.get(); + } + + return globalOpenTelemetry; + } + public static Tracer getTelemetryTracer() { - return GlobalOpenTelemetry.get().getTracer("DFSClient"); + if (dfsClientTracer == null) + { + dfsClientTracer = getOpenTelemetry().getTracer("org.hpccsystems.dfsclient"); + } + + return dfsClientTracer; } public static Span createSpan(String name) { - return Utils.getTelemetryTracer().spanBuilder(name) - .setSpanKind(SpanKind.CLIENT) - .startSpan(); + return createChildSpan(null, name); } public static Span createChildSpan(Span parentSpan, String name) { - return Utils.getTelemetryTracer().spanBuilder(name) - .setParent(Context.current().with(parentSpan)) - .setSpanKind(SpanKind.CLIENT) - .startSpan(); + Span span = null; + if (parentSpan == null) + { + span = Utils.getTelemetryTracer().spanBuilder(name) + .setSpanKind(SpanKind.CLIENT) + .startSpan(); + } + else + { + span = Utils.getTelemetryTracer().spanBuilder(name) + .setParent(Context.current().with(parentSpan)) + .setSpanKind(SpanKind.CLIENT) + .startSpan(); + } + + span.makeCurrent(); + return span; } }