From 18b9ff635228054575a10337e7bf786509b5dbba Mon Sep 17 00:00:00 2001 From: James McMullan Date: Fri, 19 Jul 2024 11:34:00 -0400 Subject: [PATCH] Code review changes --- .../hpccsystems/dfs/client/FileUtility.java | 376 +++++++++++++----- .../dfs/client/HPCCRemoteFileWriter.java | 45 ++- .../dfs/client/HpccRemoteFileReader.java | 126 ++++-- .../dfs/client/RowServiceInputStream.java | 38 +- .../dfs/client/RowServiceOutputStream.java | 10 + .../org/hpccsystems/dfs/client/Utils.java | 14 +- 6 files changed, 435 insertions(+), 174 deletions(-) diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/FileUtility.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/FileUtility.java index ddbdda681..28e4311c5 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/FileUtility.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/FileUtility.java @@ -17,6 +17,7 @@ package org.hpccsystems.dfs.client; import java.util.List; +import java.util.Stack; import java.util.concurrent.atomic.AtomicLong; import java.util.regex.Pattern; import java.util.ArrayList; @@ -34,6 +35,11 @@ import java.nio.file.Paths; import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.Attributes; import org.hpccsystems.commons.ecl.FieldDef; import org.json.JSONArray; @@ -68,27 +74,125 @@ public class FileUtility private static final int NUM_DEFAULT_THREADS = 4; static private final int DEFAULT_ACCESS_EXPIRY_SECONDS = 120; + private static boolean otelInitialized = false; + private static class TaskContext { - public AtomicLong recordsRead = new AtomicLong(0); - public AtomicLong recordsWritten = new AtomicLong(0); + private static class TaskOperation + { + public String currentOperationDesc = ""; + public long operationStart = 0; + + public List errorMessages = new ArrayList(); + public List warnMessages = new ArrayList(); + + public AtomicLong recordsRead = new AtomicLong(0); + public AtomicLong recordsWritten = new AtomicLong(0); + + public AtomicLong bytesRead = new AtomicLong(0); + public AtomicLong bytesWritten = new AtomicLong(0); + + public Span taskSpan = null; + + public JSONObject end(boolean success) + { + if (success) + { + taskSpan.setStatus(StatusCode.OK); + } + else + { + taskSpan.setStatus(StatusCode.ERROR); + } + + taskSpan.end(); + + long totalOperationTime = System.nanoTime(); + totalOperationTime -= operationStart; - public AtomicLong bytesRead = new AtomicLong(0); - public AtomicLong bytesWritten = new AtomicLong(0); + double timeInSeconds = (double) totalOperationTime / 1_000_000_000.0; - private List errorMessages = new ArrayList(); - private List warnMessages = new ArrayList(); + JSONObject results = new JSONObject(); - private String currentOperationDesc = ""; - private long operationStart = 0; - private List operationResults = new ArrayList(); + results.put("operation", currentOperationDesc); + results.put("successful", success); + + JSONArray errors = new JSONArray(); + for (String err : errorMessages) + { + errors.put(err); + } + results.put("errors", errors); + + JSONArray warns = new JSONArray(); + for (String warn : warnMessages) + { + warns.put(warn); + } + results.put("warns", warns); + + results.put("bytesWritten", bytesWritten.get()); + results.put("recordsWritten", recordsWritten.get()); + + results.put("bytesRead", bytesRead.get()); + results.put("recordsRead", recordsRead.get()); + + results.put("time", String.format("%.2f s",timeInSeconds)); + + double readBandwidth = (double) bytesRead.get() / (1_000_000.0 * timeInSeconds); + results.put("Read Bandwidth", String.format("%.2f MB/s", readBandwidth)); + + double writeBandwidth = (double) bytesWritten.get() / (1_000_000.0 * timeInSeconds); + results.put("Write Bandwidth", String.format("%.2f MB/s", writeBandwidth)); + + return results; + } + } + + private Stack operations = new Stack(); + public List operationResults = new ArrayList(); + + public void setTaskSpanAttributes(Attributes attributes) + { + if (operations.empty()) + { + return; + } + TaskOperation op = operations.peek(); + + synchronized(op.taskSpan) + { + op.taskSpan.setAllAttributes(attributes); + } + } + + public void makeTaskSpanCurrent() + { + if (operations.empty()) + { + return; + } + TaskOperation op = operations.peek(); + + synchronized(op.taskSpan) + { + op.taskSpan.makeCurrent(); + } + } public boolean hasError() { + if (operations.empty()) + { + return false; + } + + TaskOperation op = operations.peek(); + boolean err = false; - synchronized(errorMessages) + synchronized(op.errorMessages) { - err = errorMessages.size() > 0; + err = op.errorMessages.size() > 0; } return err; @@ -96,44 +200,80 @@ public boolean hasError() public void addError(String error) { - synchronized(errorMessages) + if (operations.empty()) { - errorMessages.add(error); + return; + } + + TaskOperation op = operations.peek(); + + synchronized(op.errorMessages) + { + op.errorMessages.add(error); + } + + synchronized(op.taskSpan) + { + op.taskSpan.recordException(new Exception(error)); } } public void addWarn(String warn) { - synchronized(warnMessages) + if (operations.empty()) { - warnMessages.add(warn); + return; } - } - public void clear() - { - currentOperationDesc = ""; - operationStart = 0; - recordsRead.set(0); - recordsWritten.set(0); + TaskOperation op = operations.peek(); - bytesRead.set(0); - bytesWritten.set(0); + synchronized(op.warnMessages) + { + op.warnMessages.add(warn); + } - errorMessages.clear(); - warnMessages.clear(); + synchronized(op.taskSpan) + { + op.taskSpan.addEvent(warn); + } } public boolean hasOperation() { - return !currentOperationDesc.isEmpty(); + if (operations.empty()) + { + return false; + } + + return true; + } + + public TaskOperation getOperation() + { + if (operations.empty()) + { + return null; + } + + return operations.peek(); } public void startOperation(String operationName) { - clear(); - currentOperationDesc = operationName; - operationStart = System.nanoTime(); + TaskOperation op = new TaskOperation(); + op.currentOperationDesc = operationName; + op.operationStart = System.nanoTime(); + + Span parentSpan = null; + TaskOperation prevOp = getOperation(); + if (prevOp != null) + { + parentSpan = prevOp.taskSpan; + } + + op.taskSpan = Utils.createChildSpan(parentSpan, operationName); + + operations.push(op); } public void endOperation() @@ -148,47 +288,10 @@ public void endOperation(boolean success) return; } - long totalOperationTime = System.nanoTime(); - totalOperationTime -= operationStart; - - double timeInSeconds = (double) totalOperationTime / 1_000_000_000.0; - - JSONObject results = new JSONObject(); - - results.put("operation", currentOperationDesc); - results.put("successful", success); - - JSONArray errors = new JSONArray(); - for (String err : errorMessages) - { - errors.put(err); - } - results.put("errors", errors); - - JSONArray warns = new JSONArray(); - for (String warn : warnMessages) - { - warns.put(warn); - } - results.put("warns", warns); - - results.put("bytesWritten", bytesWritten.get()); - results.put("recordsWritten", recordsWritten.get()); - - results.put("bytesRead", bytesRead.get()); - results.put("recordsRead", recordsRead.get()); - - results.put("time", String.format("%.2f s",timeInSeconds)); - - double readBandwidth = (double) bytesRead.get() / (1_000_000.0 * timeInSeconds); - results.put("Read Bandwidth", String.format("%.2f MB/s", readBandwidth)); - - double writeBandwidth = (double) bytesWritten.get() / (1_000_000.0 * timeInSeconds); - results.put("Write Bandwidth", String.format("%.2f MB/s", writeBandwidth)); - + TaskOperation op = getOperation(); + JSONObject results = op.end(success); operationResults.add(results); - - clear(); + operations.pop(); } public JSONArray generateResultsMessage() @@ -635,7 +738,7 @@ private static String[] filterFilesByFormat(String[] srcFiles, FileFormat format return filteredFiles.toArray(new String[0]); } - private static void executeTasks(Runnable[] tasks, int numThreads) throws Exception + private static void executeTasks(Runnable[] tasks, int numThreads, TaskContext context) throws Exception { int numTasksPerThread = tasks.length / numThreads; int numResidualTasks = tasks.length % numThreads; @@ -661,6 +764,10 @@ private static void executeTasks(Runnable[] tasks, int numThreads) throws Except public void run() { + // Make the task span is current for the thread, otherwise spans created + // within this thread will not be children of the task span + context.makeTaskSpanCurrent(); + for (int j = 0; j < numSubTasks; j++) { subTasks[startingSubTask + j].run(); @@ -692,16 +799,19 @@ public void run() { try { - HpccRemoteFileReader fileReader = new HpccRemoteFileReader(filePart, recordDef, new HPCCRecordBuilder(recordDef)); + HpccRemoteFileReader.FileReadContext readContext = new HpccRemoteFileReader.FileReadContext(); + readContext.parentSpan = context.getOperation().taskSpan; + readContext.originalRD = recordDef; + HpccRemoteFileReader fileReader = new HpccRemoteFileReader(readContext, filePart, new HPCCRecordBuilder(recordDef)); while (fileReader.hasNext()) { HPCCRecord record = fileReader.next(); - context.recordsRead.incrementAndGet(); + context.getOperation().recordsRead.incrementAndGet(); } fileReader.close(); - context.bytesRead.addAndGet(fileReader.getStreamPosition()); + context.getOperation().bytesRead.addAndGet(fileReader.getStreamPosition()); } catch (Exception e) { @@ -721,7 +831,11 @@ private static Runnable[] createReadToThorTasks(DataPartition[] fileParts, Split for (int i = 0; i < tasks.length; i++) { final int taskIndex = i; - final HpccRemoteFileReader filePartReader = new HpccRemoteFileReader(fileParts[taskIndex], recordDef, new HPCCRecordBuilder(recordDef)); + + HpccRemoteFileReader.FileReadContext readContext = new HpccRemoteFileReader.FileReadContext(); + readContext.parentSpan = context.getOperation().taskSpan; + readContext.originalRD = recordDef; + final HpccRemoteFileReader filePartReader = new HpccRemoteFileReader(readContext, fileParts[taskIndex], new HPCCRecordBuilder(recordDef)); final String filePath = outFilePaths[taskIndex]; final FileOutputStream outStream = new FileOutputStream(filePath); @@ -745,13 +859,13 @@ public void run() splitTable.addRecordPosition(fileReader.getStreamPosition()); HPCCRecord record = fileReader.next(); fileWriter.writeRecord(record); - context.recordsRead.incrementAndGet(); + context.getOperation().recordsRead.incrementAndGet(); } splitTable.finish(fileReader.getStreamPosition()); fileReader.close(); - context.bytesRead.addAndGet(fileReader.getStreamPosition()); + context.getOperation().bytesRead.addAndGet(fileReader.getStreamPosition()); fileWriter.finalize(); outputStream.close(); @@ -841,12 +955,19 @@ private static Runnable[] createNonRedistributingCopyTasks(HPCCFile file, DFUCre for (int j = 0; j < numIncomingParts; j++) { DataPartition inFilePart = inFileParts[incomingFilePartIndex + j]; - filePartReaders[j] = new HpccRemoteFileReader(inFilePart, recordDef, new HPCCRecordBuilder(recordDef)); + HpccRemoteFileReader.FileReadContext readContext = new HpccRemoteFileReader.FileReadContext(); + readContext.parentSpan = context.getOperation().taskSpan; + readContext.originalRD = recordDef; + filePartReaders[j] = new HpccRemoteFileReader(readContext, inFilePart, new HPCCRecordBuilder(recordDef)); } incomingFilePartIndex += numIncomingParts; HPCCRecordAccessor recordAccessor = new HPCCRecordAccessor(recordDef); - final HPCCRemoteFileWriter partFileWriter = new HPCCRemoteFileWriter(outFilePart, recordDef, recordAccessor, CompressionAlgorithm.NONE); + HPCCRemoteFileWriter.FileWriteContext writeContext = new HPCCRemoteFileWriter.FileWriteContext(); + writeContext.parentSpan = context.getOperation().taskSpan; + writeContext.recordDef = recordDef; + writeContext.fileCompression = CompressionAlgorithm.NONE; + final HPCCRemoteFileWriter partFileWriter = new HPCCRemoteFileWriter(writeContext, outFilePart, recordAccessor); tasks[taskIndex] = new Runnable() { @@ -864,16 +985,16 @@ public void run() { HPCCRecord record = fileReader.next(); fileWriter.writeRecord(record); - context.recordsWritten.incrementAndGet(); - context.recordsRead.incrementAndGet(); + context.getOperation().recordsWritten.incrementAndGet(); + context.getOperation().recordsRead.incrementAndGet(); } fileReader.close(); - context.bytesRead.addAndGet(fileReader.getStreamPosition()); + context.getOperation().bytesRead.addAndGet(fileReader.getStreamPosition()); } System.out.println("Closing file writer for task: " + taskIndex); fileWriter.close(); - context.bytesWritten.addAndGet(fileWriter.getBytesWritten()); + context.getOperation().bytesWritten.addAndGet(fileWriter.getBytesWritten()); } catch (Exception e) { @@ -970,7 +1091,12 @@ private static Runnable[] createWriteTasks(String[] srcFiles, SplitTable[] split DataPartition outFilePart = outFileParts[taskIndex]; HPCCRecordAccessor recordAccessor = new HPCCRecordAccessor(recordDef); - HPCCRemoteFileWriter filePartWriter = new HPCCRemoteFileWriter(outFilePart, recordDef, recordAccessor, CompressionAlgorithm.NONE); + + HPCCRemoteFileWriter.FileWriteContext writeContext = new HPCCRemoteFileWriter.FileWriteContext(); + writeContext.parentSpan = context.getOperation().taskSpan; + writeContext.recordDef = recordDef; + writeContext.fileCompression = CompressionAlgorithm.NONE; + HPCCRemoteFileWriter filePartWriter = new HPCCRemoteFileWriter(writeContext, outFilePart, recordAccessor); tasks[taskIndex] = new Runnable() { @@ -1017,15 +1143,15 @@ public void run() { HPCCRecord record = (HPCCRecord) fileReader.getNext(); fileWriter.writeRecord(record); - context.recordsWritten.incrementAndGet(); - context.recordsRead.incrementAndGet(); + context.getOperation().recordsWritten.incrementAndGet(); + context.getOperation().recordsRead.incrementAndGet(); } - context.bytesRead.addAndGet(fileReader.getStreamPosAfterLastRecord()); + context.getOperation().bytesRead.addAndGet(fileReader.getStreamPosAfterLastRecord()); inputStreams[j].close(); } fileWriter.close(); - context.bytesWritten.addAndGet(fileWriter.getBytesWritten()); + context.getOperation().bytesWritten.addAndGet(fileWriter.getBytesWritten()); } catch (Exception e) { @@ -1097,7 +1223,8 @@ private static void performRead(String[] args, TaskContext context) for (int i = 0; i < datasets.length; i++) { String datasetName = datasets[i]; - context.startOperation("Read " + datasetName); + context.startOperation("FileUtility.Read_" + datasetName); + context.setTaskSpanAttributes(Attributes.of(AttributeKey.stringKey("server.url"), connString)); HPCCFile file = null; try @@ -1106,7 +1233,8 @@ private static void performRead(String[] args, TaskContext context) } catch (Exception e) { - System.out.println("Error while attempting to open file: '" + datasetName + "': " + e.getMessage()); + String error = "Error while attempting to open file: '" + datasetName + "': " + e.getMessage(); + context.addError(error); return; } @@ -1119,7 +1247,8 @@ private static void performRead(String[] args, TaskContext context) } catch (Exception e) { - System.out.println("Error while retrieving file parts for: '" + datasetName + "': " + e.getMessage()); + String error = "Error while retrieving file parts for: '" + datasetName + "': " + e.getMessage(); + context.addError(error); return; } @@ -1165,7 +1294,7 @@ private static void performRead(String[] args, TaskContext context) try { - executeTasks(tasks, numThreads); + executeTasks(tasks, numThreads, context); } catch (Exception e) { @@ -1281,7 +1410,9 @@ private static void performReadTest(String[] args, TaskContext context) } String datasetName = cmd.getOptionValue("read_test"); - context.startOperation("Read Test " + datasetName); + context.startOperation("FileUtility.ReadTest_" + datasetName); + + context.setTaskSpanAttributes(Attributes.of(AttributeKey.stringKey("server.url"), connString)); HPCCFile file = null; try @@ -1291,7 +1422,7 @@ private static void performReadTest(String[] args, TaskContext context) } catch (Exception e) { - System.out.println("Error while attempting to open file: '" + datasetName + "': " + e.getMessage()); + context.addError("Error while attempting to open file: '" + datasetName + "': " + e.getMessage()); return; } @@ -1304,7 +1435,7 @@ private static void performReadTest(String[] args, TaskContext context) } catch (Exception e) { - System.out.println("Error while retrieving file parts for: '" + datasetName + "': " + e.getMessage()); + context.addError("Error while retrieving file parts for: '" + datasetName + "': " + e.getMessage()); return; } @@ -1319,16 +1450,15 @@ private static void performReadTest(String[] args, TaskContext context) int filePartIndex = Integer.parseInt(filePartsStrs[i]) - 1; if (filePartIndex < 0 || filePartIndex >= fileParts.length) { - System.out.println("Skipping invalid file part index: " + filePartsStrs[i] + context.addWarn("InvalidParams: Skipping invalid file part index: " + filePartsStrs[i] + " outside of range: [0," + fileParts.length + "]"); - continue; } filePartList.add(fileParts[filePartIndex]); } catch (NumberFormatException e) { - System.out.println("Skipping invalid file part index: " + filePartsStrs[i]); + context.addWarn("InvalidParams: Skipping invalid file part index: " + filePartsStrs[i]); } } } @@ -1354,7 +1484,7 @@ private static void performReadTest(String[] args, TaskContext context) try { - executeTasks(tasks, numThreads); + executeTasks(tasks, numThreads, context); } catch (Exception e) { @@ -1465,7 +1595,9 @@ private static void performCopy(String[] args, TaskContext context) String srcFile = copyPairs[i]; String destFile = copyPairs[i+1]; - context.startOperation("Copy " + srcFile + " -> " + destFile); + context.startOperation("FileUtility.Copy_ " + srcFile + " -> " + destFile); + context.setTaskSpanAttributes(Attributes.of(AttributeKey.stringKey("server.src.url"), srcURL, + AttributeKey.stringKey("server.dest.url"), destURL)); HPCCFile file = null; try @@ -1486,6 +1618,7 @@ private static void performCopy(String[] args, TaskContext context) catch (HpccFileException e) { context.addError("Error while retrieving file parts for: '" + srcFile + "': " + e.getMessage()); + return; } boolean shouldRedistribute = true; @@ -1519,7 +1652,7 @@ private static void performCopy(String[] args, TaskContext context) try { - executeTasks(tasks, numThreads); + executeTasks(tasks, numThreads, context); } catch (Exception e) { @@ -1529,13 +1662,13 @@ private static void performCopy(String[] args, TaskContext context) if (context.hasError()) { - return; + return; } try { - long bytesWritten = context.bytesWritten.get(); - long recordsWritten = context.recordsWritten.get(); + long bytesWritten = context.getOperation().bytesWritten.get(); + long recordsWritten = context.getOperation().recordsWritten.get(); dfuClient.publishFile(createResult.getFileID(), eclRecordDefn, recordsWritten, bytesWritten, true); } catch (Exception e) @@ -1643,7 +1776,10 @@ private static void performWrite(String[] args, TaskContext context) String srcFile = writePairs[pairIdx]; String destFile = writePairs[pairIdx+1]; - context.startOperation("Write " + srcFile + " -> " + destFile); + context.startOperation( "FileUtility.Write_" + srcFile + "_to_" + destFile); + + Attributes attributes = Attributes.of(AttributeKey.stringKey("server.url"), destURL); + context.setTaskSpanAttributes(attributes); SplitTable[] splitTables = null; String[] srcFiles = null; @@ -1687,7 +1823,7 @@ private static void performWrite(String[] args, TaskContext context) try { - executeTasks(tasks, numThreads); + executeTasks(tasks, numThreads, context); } catch (Exception e) { @@ -1743,7 +1879,7 @@ private static void performWrite(String[] args, TaskContext context) try { - executeTasks(tasks, numThreads); + executeTasks(tasks, numThreads, context); } catch (Exception e) { @@ -1758,8 +1894,8 @@ private static void performWrite(String[] args, TaskContext context) try { - long bytesWritten = context.bytesWritten.get(); - long recordsWritten = context.recordsWritten.get(); + long bytesWritten = context.getOperation().bytesWritten.get(); + long recordsWritten = context.getOperation().recordsWritten.get(); dfuClient.publishFile(createResult.getFileID(), eclRecordDefn, recordsWritten, bytesWritten, true); } catch (Exception e) @@ -1779,7 +1915,27 @@ private static void performWrite(String[] args, TaskContext context) */ public static JSONArray run(String[] args) { - AutoConfiguredOpenTelemetrySdk.initialize().getOpenTelemetrySdk(); + if (!otelInitialized) + { + if (Boolean.getBoolean("otel.java.global-autoconfigure.enabled")) + { + System.out.println("OpenTelemetry autoconfiguration enabled with following values."); + System.out.println("If any of these options are not provided, they will defalt to values which could require additional CLASSPATH dependancies."); + System.out.println("If missing dependancies arise, utility will halt!"); + System.out.println(" otel.traces.exporter sys property: " + System.getProperty("otel.traces.exporter")); + System.out.println(" OTEL_TRACES_EXPORTER Env var: " + System.getenv("OTEL_TRACES_EXPORTER")); + System.out.println(" OTEL_TRACES_SAMPLER Env var: " + System.getenv("OTEL_TRACES_SAMPLER")); + System.out.println(" otel.traces.sampler sys property: " + System.getProperty("otel.traces.sampler")); + System.out.println(" otel.logs.exporter: "+ System.getProperty("otel.logs.exporter")); + System.out.println(" OTEL_LOGS_EXPORTER Env var: " + System.getenv("OTEL_LOGS_EXPORTER")); + System.out.println(" otel.metrics.exporter: "+ System.getProperty("otel.metrics.exporter")); + System.out.println(" OTEL_METRICS_EXPORTER Env var: " + System.getenv("OTEL_METRICS_EXPORTER")); + + OpenTelemetry otel = AutoConfiguredOpenTelemetrySdk.initialize().getOpenTelemetrySdk(); + } + + otelInitialized = true; + } Options options = getTopLevelOptions(); CommandLineParser parser = new DefaultParser(); diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/HPCCRemoteFileWriter.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/HPCCRemoteFileWriter.java index 05092272d..f1f4c5322 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/HPCCRemoteFileWriter.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/HPCCRemoteFileWriter.java @@ -36,7 +36,6 @@ public class HPCCRemoteFileWriter { private static final Logger log = LogManager.getLogger(HPCCRemoteFileWriter.class); - private FieldDef recordDef = null; private DataPartition dataPartition = null; private RowServiceOutputStream outputStream = null; private BinaryRecordWriter binaryRecordWriter = null; @@ -44,9 +43,31 @@ public class HPCCRemoteFileWriter private long recordsWritten = 0; private long openTimeMs = 0; + private FileWriteContext context = null; + private Span writeSpan = null; private String writeSpanName = null; + public static class FileWriteContext + { + public FieldDef recordDef = null; + public CompressionAlgorithm fileCompression = CompressionAlgorithm.DEFAULT; + public int connectTimeoutMs = -1; + public int socketOpTimeoutMs = -1; + public Span parentSpan = null; + } + + private static FileWriteContext constructReadContext(FieldDef recordDef, CompressionAlgorithm fileCompression, int connectTimeoutMs, int socketOpTimeoutMs) + { + FileWriteContext context = new FileWriteContext(); + context.recordDef = recordDef; + context.fileCompression = fileCompression; + context.connectTimeoutMs = connectTimeoutMs; + context.socketOpTimeoutMs = socketOpTimeoutMs; + + return context; + } + /** * A remote file writer. * @@ -110,13 +131,19 @@ public HPCCRemoteFileWriter(DataPartition dp, FieldDef recordDef, IRecordAccesso public HPCCRemoteFileWriter(DataPartition dp, FieldDef recordDef, IRecordAccessor recordAccessor, CompressionAlgorithm fileCompression, int connectTimeoutMs, int socketOpTimeoutMs) throws Exception { - this.recordDef = recordDef; + this(constructReadContext(recordDef, fileCompression, connectTimeoutMs, socketOpTimeoutMs), dp, recordAccessor); + } + + public HPCCRemoteFileWriter(FileWriteContext ctx, DataPartition dp, IRecordAccessor recordAccessor) + throws Exception + { this.dataPartition = dp; + this.context = ctx; this.recordAccessor = recordAccessor; this.writeSpanName = "HPCCRemoteFileWriter.RowService/Write_" + dp.getFileName() + "_" + dp.getThisPart(); - this.writeSpan = Utils.createSpan(writeSpanName); + this.writeSpan = Utils.createChildSpan(context.parentSpan, writeSpanName); String primaryIP = dp.getCopyIP(0); String secondaryIP = ""; @@ -125,22 +152,22 @@ public HPCCRemoteFileWriter(DataPartition dp, FieldDef recordDef, IRecordAccesso secondaryIP = dp.getCopyIP(1); } - Attributes attributes = Attributes.of( AttributeKey.stringKey("server.primary.address"), primaryIP, - AttributeKey.stringKey("server.secondary.address"), secondaryIP, + Attributes attributes = Attributes.of( AttributeKey.stringKey("server.0.address"), primaryIP, + AttributeKey.stringKey("server.1.address"), secondaryIP, ServerAttributes.SERVER_PORT, Long.valueOf(dp.getPort())); writeSpan.setAllAttributes(attributes); this.outputStream = new RowServiceOutputStream(dataPartition.getCopyIP(0), dataPartition.getPort(), dataPartition.getUseSsl(), - dataPartition.getFileAccessBlob(), this.recordDef, this.dataPartition.getThisPart(), this.dataPartition.getCopyPath(0), - fileCompression, connectTimeoutMs, socketOpTimeoutMs, this.writeSpan); + dataPartition.getFileAccessBlob(), context.recordDef, this.dataPartition.getThisPart(), this.dataPartition.getCopyPath(0), + context.fileCompression, context.connectTimeoutMs, context.socketOpTimeoutMs, this.writeSpan); this.binaryRecordWriter = new BinaryRecordWriter(this.outputStream); this.binaryRecordWriter.initialize(this.recordAccessor); log.info("HPCCRemoteFileWriter: Opening file part: " + dataPartition.getThisPart() - + " compression: " + fileCompression.name()); + + " compression: " + context.fileCompression.name()); log.trace("Record definition:\n" - + RecordDefinitionTranslator.toJsonRecord(this.recordDef)); + + RecordDefinitionTranslator.toJsonRecord(context.recordDef)); openTimeMs = System.currentTimeMillis(); } diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/HpccRemoteFileReader.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/HpccRemoteFileReader.java index d8e2a1aef..4e0c640c4 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/HpccRemoteFileReader.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/HpccRemoteFileReader.java @@ -38,7 +38,6 @@ public class HpccRemoteFileReader implements Iterator { private static final Logger log = LogManager.getLogger(HpccRemoteFileReader.class); - private FieldDef originalRecordDef = null; private DataPartition dataPartition = null; private RowServiceInputStream inputStream = null; private BinaryRecordReader binaryRecordReader; @@ -46,18 +45,13 @@ public class HpccRemoteFileReader implements Iterator private boolean handlePrefetch = true; private boolean isClosed = false; private boolean canReadNext = true; - private boolean createPrefetchThread = true; private int retryCount = 0; - private int connectTimeout = 0; - private int readSizeKB = 0; - private int limit = -1; private int maxReadRetries = DEFAULT_READ_RETRIES; - private int socketOpTimeoutMs = 0; private long openTimeMs = 0; private long recordsRead = 0; + private FileReadContext context = null; private Span readSpan = null; - private String readSpanName = null; public static final int NO_RECORD_LIMIT = -1; public static final int DEFAULT_READ_SIZE_OPTION = -1; @@ -71,6 +65,31 @@ public static class FileReadResumeInfo public long recordReaderStreamPos = 0; }; + public static class FileReadContext + { + public FieldDef originalRD = null; + public int connectTimeout = -1; + public int socketOpTimeoutMS = -1; + public int recordReadLimit = -1; + public boolean createPrefetchThread = true; + public int readSizeKB = -1; + public Span parentSpan = null; + }; + + private static FileReadContext constructReadContext(FieldDef originalRD, int connectTimeout, int socketOpTimeoutMS, + int recordReadLimit, boolean createPrefetchThread, int readSizeKB) + { + FileReadContext context = new FileReadContext(); + context.originalRD = originalRD; + context.connectTimeout = connectTimeout; + context.socketOpTimeoutMS = socketOpTimeoutMS; + context.recordReadLimit = recordReadLimit; + context.createPrefetchThread = createPrefetchThread; + context.readSizeKB = readSizeKB; + + return context; + } + /** * Instantiates a new hpcc remote file reader. * @@ -204,19 +223,51 @@ public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilde * @throws Exception * general exception */ - public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilder recBuilder, int connectTimeout, int limit, boolean createPrefetchThread, int readSizeKB, FileReadResumeInfo resumeInfo, int socketOpTimeoutMs) throws Exception + public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilder recBuilder, int connectTimeout, int limit, boolean createPrefetchThread, int readSizeKB, FileReadResumeInfo resumeInfo, int socketOpTimeoutMs) throws Exception + { + this(constructReadContext(originalRD, connectTimeout, socketOpTimeoutMs, limit, createPrefetchThread, readSizeKB), dp, recBuilder, resumeInfo); + } + + /** + * A remote file reader that reads the part identified by the HpccPart object using the record definition provided. + * + * @param ctx + * the FileReadContext + * @param dp + * the part of the file, name and location + * @param recBuilder + * the IRecordBuilder used to construct records + * @throws Exception + * general exception + */ + public HpccRemoteFileReader(FileReadContext ctx, DataPartition dp, IRecordBuilder recBuilder) throws Exception + { + this(ctx, dp, recBuilder, null); + } + + /** + * A remote file reader that reads the part identified by the HpccPart object using the record definition provided. + * + * @param ctx + * the FileReadContext + * @param dp + * the part of the file, name and location + * @param recBuilder + * the IRecordBuilder used to construct records + * @param resumeInfo + * FileReadeResumeInfo data required to restart a read from a particular point in a file, null for reading from start + * @throws Exception + * general exception + */ + public HpccRemoteFileReader(FileReadContext ctx, DataPartition dp, IRecordBuilder recBuilder, FileReadResumeInfo resumeInfo) throws Exception { - this.handlePrefetch = createPrefetchThread; - this.originalRecordDef = originalRD; + this.context = ctx; + this.handlePrefetch = context.createPrefetchThread; this.dataPartition = dp; this.recordBuilder = recBuilder; - this.readSizeKB = readSizeKB; - this.limit = limit; - this.createPrefetchThread = createPrefetchThread; - this.socketOpTimeoutMs = socketOpTimeoutMs; - this.readSpanName = "HPCCRemoteFileReader.RowService/Read_" + dp.getFileName() + "_" + dp.getThisPart(); - this.readSpan = Utils.createSpan(readSpanName); + String readSpanName = "HPCCRemoteFileReader.RowService/Read_" + dataPartition.getFileName() + "_" + dataPartition.getThisPart(); + this.readSpan = Utils.createChildSpan(context.parentSpan, readSpanName); String primaryIP = dp.getCopyIP(0); String secondaryIP = ""; @@ -225,19 +276,13 @@ public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilde secondaryIP = dp.getCopyIP(1); } - Attributes attributes = Attributes.of( AttributeKey.stringKey("server.primary.address"), primaryIP, - AttributeKey.stringKey("server.secondary.address"), secondaryIP, + Attributes attributes = Attributes.of( AttributeKey.stringKey("server.0.address"), primaryIP, + AttributeKey.stringKey("server.1.address"), secondaryIP, ServerAttributes.SERVER_PORT, Long.valueOf(dp.getPort()), - AttributeKey.longKey("read.size"), Long.valueOf(readSizeKB*1000)); - readSpan.setAllAttributes(attributes); - - if (connectTimeout < 1) - { - connectTimeout = RowServiceInputStream.DEFAULT_CONNECT_TIMEOUT_MILIS; - } - this.connectTimeout = connectTimeout; + AttributeKey.longKey("read.size"), Long.valueOf(context.readSizeKB*1000)); + this.readSpan.setAllAttributes(attributes); - if (this.originalRecordDef == null) + if (context.originalRD == null) { Exception e = new Exception("HpccRemoteFileReader: Provided original record definition is null, original record definition is required."); this.readSpan.recordException(e); @@ -256,7 +301,9 @@ public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilde if (resumeInfo == null) { - this.inputStream = new RowServiceInputStream(this.dataPartition, this.originalRecordDef, projectedRecordDefinition, connectTimeout, limit, createPrefetchThread, readSizeKB, null, false, socketOpTimeoutMs, readSpan); + this.inputStream = new RowServiceInputStream(this.dataPartition, context.originalRD, projectedRecordDefinition, context.connectTimeout, + context.recordReadLimit, context.createPrefetchThread, context.readSizeKB, null, + false, context.socketOpTimeoutMS, this.readSpan); this.binaryRecordReader = new BinaryRecordReader(this.inputStream); this.binaryRecordReader.initialize(this.recordBuilder); @@ -271,7 +318,9 @@ public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilde restartInfo.streamPos = resumeInfo.inputStreamPos; restartInfo.tokenBin = resumeInfo.tokenBin; - this.inputStream = new RowServiceInputStream(this.dataPartition, this.originalRecordDef, projectedRecordDefinition, connectTimeout, limit, createPrefetchThread, readSizeKB, restartInfo, false, socketOpTimeoutMs, this.readSpan); + this.inputStream = new RowServiceInputStream(this.dataPartition, context.originalRD, projectedRecordDefinition, context.connectTimeout, + context.recordReadLimit, context.createPrefetchThread, context.readSizeKB, restartInfo, + false, context.socketOpTimeoutMS, this.readSpan); long bytesToSkip = resumeInfo.recordReaderStreamPos - resumeInfo.inputStreamPos; if (bytesToSkip < 0) @@ -289,7 +338,7 @@ public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilde log.info("HPCCRemoteFileReader: Opening file part: " + dataPartition.getThisPart() + (resumeInfo != null ? " resume position: " + resumeInfo.inputStreamPos : "" )); log.trace("Original record definition:\n" - + RecordDefinitionTranslator.toJsonRecord(originalRD) + + RecordDefinitionTranslator.toJsonRecord(context.originalRD) + " projected record definition:\n" + RecordDefinitionTranslator.toJsonRecord(projectedRecordDefinition)); openTimeMs = System.currentTimeMillis(); @@ -315,10 +364,19 @@ private boolean retryRead() try { - this.readSpan = Utils.createSpan(readSpanName); - this.inputStream = new RowServiceInputStream(this.dataPartition, this.originalRecordDef, - this.recordBuilder.getRecordDefinition(), this.connectTimeout, this.limit, this.createPrefetchThread, - this.readSizeKB, restartInfo, false, this.socketOpTimeoutMs, this.readSpan); + String readSpanName = "HPCCRemoteFileReader.RowService/Read_" + dataPartition.getFileName() + "_" + dataPartition.getThisPart(); + if (context.parentSpan != null) + { + this.readSpan = Utils.createChildSpan(context.parentSpan, readSpanName); + } + else + { + this.readSpan = Utils.createSpan(readSpanName); + } + + this.inputStream = new RowServiceInputStream(this.dataPartition, context.originalRD,this.recordBuilder.getRecordDefinition(), + context.connectTimeout, context.recordReadLimit, context.createPrefetchThread, + context.readSizeKB, restartInfo, false, context.socketOpTimeoutMS, this.readSpan); long bytesToSkip = resumeInfo.recordReaderStreamPos - resumeInfo.inputStreamPos; if (bytesToSkip < 0) diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java index 90d30794e..b1fbb7c66 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java @@ -410,8 +410,17 @@ public RowServiceInputStream(DataPartition dp, FieldDef rd, FieldDef pRd, int co this.handle = 0; this.tokenBin = null; this.simulateFail = false; - this.connectTimeout = connectTimeout; - this.socketOpTimeoutMs = socketOpTimeoutMS; + + if (connectTimeout > 0) + { + this.connectTimeout = connectTimeout; + } + + if (socketOpTimeoutMS > 0) + { + this.socketOpTimeoutMs = socketOpTimeoutMS; + } + this.recordLimit = limit; this.readBufferCapacity.set(this.maxReadSizeKB*1024*2); @@ -810,14 +819,6 @@ private int startFetch() } String prefix = "RowServiceInputStream.startFetch(), file " + dataPart.getFileName() + " part " + dataPart.getThisPart() + " on IP " + getIP() + ":"; - if (readSpan != null) - { - Attributes attributes = Attributes.of( AttributeKey.longKey("server.index"), Long.valueOf(getFilePartCopy()), - AttributeKey.longKey("read.offset"), streamPos, - AttributeKey.longKey("read.size"), Long.valueOf(maxReadSizeKB*1000)); - readSpan.addEvent("RowServiceInputStream.readRequest", attributes); - } - //------------------------------------------------------------------------------ // If we haven't made the connection active, activate it now and send the // first request. @@ -830,6 +831,14 @@ private int startFetch() numFetches++; if (!this.active.get()) { + if (readSpan != null) + { + Attributes attributes = Attributes.of( AttributeKey.longKey("server.index"), Long.valueOf(getFilePartCopy()), + AttributeKey.longKey("read.offset"), streamPos, + AttributeKey.longKey("read.size"), Long.valueOf(maxReadSizeKB*1000)); + readSpan.addEvent("RowServiceInputStream.readRequest", attributes); + } + try { makeActive(); @@ -974,9 +983,6 @@ else if (this.handle == 0) { if (dataLen == 0) { - // Read handle - dis.readInt(); - close(); return 0; } @@ -1330,12 +1336,6 @@ public int available() throws IOException { // this.bufferWriteMutex.release(); IOException wrappedException = new IOException(prefix + "End of input stream, bufferLen:" + bufferLen + ", this.readPos:" + this.readPos + ", availableBytes=0"); - if (readSpan != null) - { - Attributes attributes = Attributes.of( AttributeKey.longKey("server.index"), Long.valueOf(getFilePartCopy()), - ExceptionAttributes.EXCEPTION_MESSAGE, wrappedException.getMessage()); - readSpan.recordException(wrappedException, attributes); - } throw wrappedException; } } diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceOutputStream.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceOutputStream.java index 40f15f536..e21808f8d 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceOutputStream.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceOutputStream.java @@ -236,8 +236,18 @@ private static class RowServiceResponse this.filePath = filePartPath; this.accessToken = accessToken; this.compressionAlgo = fileCompression; + + if (sockOpTimeoutMS < 0) + { + sockOpTimeoutMS = DEFAULT_SOCKET_OP_TIMEOUT_MS; + } this.sockOpTimeoutMs = sockOpTimeoutMS; + if (connectTimeoutMs < 0) + { + connectTimeoutMs = DEFAULT_CONNECT_TIMEOUT_MILIS; + } + if (writeSpan != null && writeSpan.getSpanContext().isValid()) { this.writeSpan = writeSpan; diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/Utils.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/Utils.java index 7462c64dc..04d7887ea 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/Utils.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/Utils.java @@ -46,17 +46,27 @@ public static Tracer getTelemetryTracer() public static Span createSpan(String name) { - return Utils.getTelemetryTracer().spanBuilder(name) + Span span = Utils.getTelemetryTracer().spanBuilder(name) .setSpanKind(SpanKind.CLIENT) .startSpan(); + span.makeCurrent(); + + return span; } public static Span createChildSpan(Span parentSpan, String name) { - return Utils.getTelemetryTracer().spanBuilder(name) + if (parentSpan == null) + { + return createSpan(name); + } + + Span span = Utils.getTelemetryTracer().spanBuilder(name) .setParent(Context.current().with(parentSpan)) .setSpanKind(SpanKind.CLIENT) .startSpan(); + span.makeCurrent(); + return span; } }