From 0ced7edfa0ab8b951c07aabc3fd76ecff15fc614 Mon Sep 17 00:00:00 2001 From: Gordon Smith Date: Fri, 30 Aug 2024 14:02:39 +0100 Subject: [PATCH 1/5] Split off 9.8.18 Signed-off-by: Gordon Smith --- commons-hpcc/pom.xml | 2 +- dfsclient/pom.xml | 2 +- pom.xml | 2 +- wsclient/pom.xml | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/commons-hpcc/pom.xml b/commons-hpcc/pom.xml index 18400b6ec..982397d2d 100644 --- a/commons-hpcc/pom.xml +++ b/commons-hpcc/pom.xml @@ -9,7 +9,7 @@ org.hpccsystems hpcc4j - 9.8.17-0-SNAPSHOT + 9.8.19-0-SNAPSHOT diff --git a/dfsclient/pom.xml b/dfsclient/pom.xml index d48e28e5b..1d6905124 100644 --- a/dfsclient/pom.xml +++ b/dfsclient/pom.xml @@ -9,7 +9,7 @@ org.hpccsystems hpcc4j - 9.8.17-0-SNAPSHOT + 9.8.19-0-SNAPSHOT diff --git a/pom.xml b/pom.xml index 3498a49d9..129e10444 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ 4.0.0 org.hpccsystems hpcc4j - 9.8.17-0-SNAPSHOT + 9.8.19-0-SNAPSHOT pom HPCC Systems Java Projects https://hpccsystems.com diff --git a/wsclient/pom.xml b/wsclient/pom.xml index 2496f1315..90f6a0602 100644 --- a/wsclient/pom.xml +++ b/wsclient/pom.xml @@ -9,7 +9,7 @@ org.hpccsystems hpcc4j - 9.8.17-0-SNAPSHOT + 9.8.19-0-SNAPSHOT From 3ebc99dbaa1fe8debdd1eff3227fea7f51c8c78b Mon Sep 17 00:00:00 2001 From: Gordon Smith Date: Fri, 30 Aug 2024 14:03:36 +0100 Subject: [PATCH 2/5] Split off 9.6.44 Signed-off-by: Gordon Smith --- commons-hpcc/pom.xml | 2 +- dfsclient/pom.xml | 2 +- pom.xml | 2 +- wsclient/pom.xml | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/commons-hpcc/pom.xml b/commons-hpcc/pom.xml index 1fce2e46c..17a17c6dc 100644 --- a/commons-hpcc/pom.xml +++ b/commons-hpcc/pom.xml @@ -9,7 +9,7 @@ org.hpccsystems hpcc4j - 9.6.43-0-SNAPSHOT + 9.6.45-0-SNAPSHOT diff --git a/dfsclient/pom.xml b/dfsclient/pom.xml index b4234b8cd..82e76a329 100644 --- a/dfsclient/pom.xml +++ b/dfsclient/pom.xml @@ -9,7 +9,7 @@ org.hpccsystems hpcc4j - 9.6.43-0-SNAPSHOT + 9.6.45-0-SNAPSHOT diff --git a/pom.xml b/pom.xml index bd6c92662..ef91762a7 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ 4.0.0 org.hpccsystems hpcc4j - 9.6.43-0-SNAPSHOT + 9.6.45-0-SNAPSHOT pom HPCC Systems Java Projects https://hpccsystems.com diff --git a/wsclient/pom.xml b/wsclient/pom.xml index 321e25c5f..32a54035f 100644 --- a/wsclient/pom.xml +++ b/wsclient/pom.xml @@ -9,7 +9,7 @@ org.hpccsystems hpcc4j - 9.6.43-0-SNAPSHOT + 9.6.45-0-SNAPSHOT From be177ec41d7b462f390730f68825efd1dd5ec62c Mon Sep 17 00:00:00 2001 From: James McMullan Date: Fri, 30 Aug 2024 10:00:09 -0400 Subject: [PATCH 3/5] HPCC4J-635 DFSClient: FileUtility add additional testing / debug options (#742) Signed-off-by: James McMullan James.McMullan@lexisnexis.com --- .../hpccsystems/dfs/client/FileUtility.java | 36 +++++++++++++++++-- .../dfs/client/RowServiceInputStream.java | 23 +++++++++++- 2 files changed, 56 insertions(+), 3 deletions(-) diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/FileUtility.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/FileUtility.java index 116c37c21..2d3cc4366 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/FileUtility.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/FileUtility.java @@ -74,6 +74,9 @@ public class FileUtility private static final int NUM_DEFAULT_THREADS = 4; static private final int DEFAULT_ACCESS_EXPIRY_SECONDS = 120; + static private final int DEFAULT_READ_REQUEST_SIZE = 4096; + static private final int DEFAULT_READ_REQUEST_DELAY = 0; + private static boolean otelInitialized = false; private static class TaskContext @@ -548,6 +551,8 @@ private static Options getReadTestOptions() options.addOption("pass", true, "Specifies the password used to connect. Defaults to null."); options.addOption("num_threads", true, "Specifies the number of parallel to use to perform operations."); options.addOption("access_expiry_seconds", true, "Access token expiration seconds."); + options.addOption("read_request_size", true, "The size of the read requests in KB sent to the rowservice."); + options.addOption("read_request_delay", true, "The delay in MS between read requests sent to the rowservice."); options.addOption(Option.builder("file_parts") .argName("_file_parts") @@ -801,7 +806,7 @@ public void run() } } - private static Runnable[] createReadTestTasks(DataPartition[] fileParts, FieldDef recordDef, TaskContext context) throws Exception + private static Runnable[] createReadTestTasks(DataPartition[] fileParts, FieldDef recordDef, TaskContext context, int readRequestSize, int readRequestDelay) throws Exception { Runnable[] tasks = new Runnable[fileParts.length]; for (int i = 0; i < tasks.length; i++) @@ -818,7 +823,9 @@ public void run() HpccRemoteFileReader.FileReadContext readContext = new HpccRemoteFileReader.FileReadContext(); readContext.parentSpan = context.getCurrentOperation().operationSpan; readContext.originalRD = recordDef; + readContext.readSizeKB = readRequestSize; HpccRemoteFileReader fileReader = new HpccRemoteFileReader(readContext, filePart, new HPCCRecordBuilder(recordDef)); + fileReader.getInputStream().setReadRequestDelay(readRequestDelay); while (fileReader.hasNext()) { @@ -1405,6 +1412,30 @@ private static void performReadTest(String[] args, TaskContext context) + numThreadsStr + ", must be an integer. Defaulting to: " + DEFAULT_ACCESS_EXPIRY_SECONDS + "s."); } + int readRequestSize = DEFAULT_READ_REQUEST_SIZE; + String readRequestSizeStr = cmd.getOptionValue("read_request_size", "" + readRequestSize); + try + { + readRequestSize = Integer.parseInt(readRequestSizeStr); + } + catch(Exception e) + { + System.out.println("Invalid option value for read_request_size: " + + readRequestSizeStr + ", must be an integer. Defaulting to: " + DEFAULT_READ_REQUEST_SIZE + "KB."); + } + + int readRequestDelay = DEFAULT_READ_REQUEST_DELAY; + String readRequestDelayStr = cmd.getOptionValue("read_request_delay", "" + readRequestDelay); + try + { + readRequestDelay = Integer.parseInt(readRequestDelayStr); + } + catch(Exception e) + { + System.out.println("Invalid option value for read_request_delay: " + + readRequestDelayStr + ", must be an integer. Defaulting to: " + DEFAULT_READ_REQUEST_DELAY + "ms."); + } + String formatStr = cmd.getOptionValue("format"); if (formatStr == null) { @@ -1477,6 +1508,7 @@ private static void performReadTest(String[] args, TaskContext context) context.addWarn("InvalidParams: Skipping invalid file part index: " + filePartsStrs[i]); } } + fileParts = filePartList.toArray(new DataPartition[0]); } Runnable[] tasks = null; @@ -1485,7 +1517,7 @@ private static void performReadTest(String[] args, TaskContext context) switch (format) { case THOR: - tasks = createReadTestTasks(fileParts, recordDef, context); + tasks = createReadTestTasks(fileParts, recordDef, context, readRequestSize, readRequestDelay); break; case PARQUET: default: diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java index d7bbfff79..fcffaa974 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java @@ -108,6 +108,7 @@ public class RowServiceInputStream extends InputStream implements IProfilable private long mutexWaitTimeNS = 0; private long waitTimeNS = 0; private long sleepTimeNS = 0; + private int readRequestDelayMS = 0; private long fetchStartTimeNS = 0; private long fetchTimeNS = 0; private long fetchFinishTimeNS = 0; @@ -668,6 +669,15 @@ public int getHandle() return handle; } + /** + * The delay in milliseconds between read requests. Primarily used for testing. + * @param sleepTimeMS + */ + public void setReadRequestDelay(int sleepTimeMS) + { + this.readRequestDelayMS = sleepTimeMS; + } + /** * Simulate a handle failure and use the file token instead. The handle is set to an invalid value so the THOR node * will indicate that the handle is unknown and request a otken. @@ -1150,6 +1160,18 @@ private void finishFetch() if (inFetchingMode == false) { + if (readRequestDelayMS > 0) + { + try + { + Thread.sleep(readRequestDelayMS); + } + catch (InterruptedException e) + { + // We don't care about waking early + } + } + if (readSpan != null) { Attributes attributes = Attributes.of( AttributeKey.longKey("server.index"), Long.valueOf(getFilePartCopy()), @@ -1158,7 +1180,6 @@ private void finishFetch() readSpan.addEvent("RowServiceInputStream.readRequest", attributes); } - // Create the read ahead request if (this.simulateFail) this.handle = -1; String readAheadRequest = (this.forceTokenUse) ? this.makeTokenRequest() : this.makeHandleRequest(); From 61ece0f61f1320543d46b768853d82ba0cecbb4f Mon Sep 17 00:00:00 2001 From: James McMullan Date: Wed, 4 Sep 2024 13:44:31 -0400 Subject: [PATCH 4/5] HPCC4J-636 DFSClient: Improve Opentelemetry tracing (#743) Signed-off-by: James McMullan James.McMullan@lexisnexis.com --- .../dfs/client/HPCCRemoteFileWriter.java | 37 +- .../dfs/client/HpccRemoteFileReader.java | 71 ++-- .../dfs/client/RowServiceInputStream.java | 362 +++++++++++++----- .../dfs/client/RowServiceOutputStream.java | 93 +++-- 4 files changed, 403 insertions(+), 160 deletions(-) diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/HPCCRemoteFileWriter.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/HPCCRemoteFileWriter.java index f1f4c5322..1cd3f09de 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/HPCCRemoteFileWriter.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/HPCCRemoteFileWriter.java @@ -21,6 +21,7 @@ import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.StatusCode; import io.opentelemetry.semconv.ServerAttributes; import org.apache.logging.log4j.Logger; @@ -142,8 +143,9 @@ public HPCCRemoteFileWriter(FileWriteContext ctx, DataPartition dp, IRecordAcces this.recordAccessor = recordAccessor; - this.writeSpanName = "HPCCRemoteFileWriter.RowService/Write_" + dp.getFileName() + "_" + dp.getThisPart(); + this.writeSpanName = "HPCCRemoteFileWriter/Write_" + dp.getFileName() + "_" + dp.getThisPart(); this.writeSpan = Utils.createChildSpan(context.parentSpan, writeSpanName); + this.writeSpan.setStatus(StatusCode.OK); String primaryIP = dp.getCopyIP(0); String secondaryIP = ""; @@ -154,7 +156,7 @@ public HPCCRemoteFileWriter(FileWriteContext ctx, DataPartition dp, IRecordAcces Attributes attributes = Attributes.of( AttributeKey.stringKey("server.0.address"), primaryIP, AttributeKey.stringKey("server.1.address"), secondaryIP, - ServerAttributes.SERVER_PORT, Long.valueOf(dp.getPort())); + AttributeKey.stringKey("server.port"), Integer.toString(dp.getPort())); writeSpan.setAllAttributes(attributes); this.outputStream = new RowServiceOutputStream(dataPartition.getCopyIP(0), dataPartition.getPort(), dataPartition.getUseSsl(), @@ -181,8 +183,20 @@ public HPCCRemoteFileWriter(FileWriteContext ctx, DataPartition dp, IRecordAcces */ public void writeRecord(T record) throws Exception { - this.binaryRecordWriter.writeRecord(record); - this.recordsWritten++; + try + { + this.binaryRecordWriter.writeRecord(record); + this.recordsWritten++; + } + catch (Exception e) + { + log.error("HPCCRemoteFileWriter: Error writing record: " + e.getMessage()); + this.writeSpan.recordException(e); + this.writeSpan.setStatus(StatusCode.ERROR); + this.writeSpan.end(); + + throw e; + } } /** @@ -197,7 +211,20 @@ public void writeRecords(Iterator it) throws Exception { while (it.hasNext()) { - this.binaryRecordWriter.writeRecord(it.next()); + try + { + this.binaryRecordWriter.writeRecord(it.next()); + this.recordsWritten++; + } + catch (Exception e) + { + log.error("HPCCRemoteFileWriter: Error writing record: " + e.getMessage()); + this.writeSpan.recordException(e); + this.writeSpan.setStatus(StatusCode.ERROR); + this.writeSpan.end(); + + throw e; + } this.recordsWritten++; } } diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/HpccRemoteFileReader.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/HpccRemoteFileReader.java index 4e0c640c4..4192349bb 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/HpccRemoteFileReader.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/HpccRemoteFileReader.java @@ -21,6 +21,7 @@ import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.StatusCode; import io.opentelemetry.semconv.ServerAttributes; import org.apache.logging.log4j.Logger; @@ -73,6 +74,7 @@ public static class FileReadContext public int recordReadLimit = -1; public boolean createPrefetchThread = true; public int readSizeKB = -1; + public int readRequestSpanBatchSize = -1; // The number of read requests before creating a new span public Span parentSpan = null; }; @@ -266,21 +268,7 @@ public HpccRemoteFileReader(FileReadContext ctx, DataPartition dp, IRecordBuilde this.dataPartition = dp; this.recordBuilder = recBuilder; - String readSpanName = "HPCCRemoteFileReader.RowService/Read_" + dataPartition.getFileName() + "_" + dataPartition.getThisPart(); - this.readSpan = Utils.createChildSpan(context.parentSpan, readSpanName); - - String primaryIP = dp.getCopyIP(0); - String secondaryIP = ""; - if (dp.getCopyCount() > 1) - { - secondaryIP = dp.getCopyIP(1); - } - - Attributes attributes = Attributes.of( AttributeKey.stringKey("server.0.address"), primaryIP, - AttributeKey.stringKey("server.1.address"), secondaryIP, - ServerAttributes.SERVER_PORT, Long.valueOf(dp.getPort()), - AttributeKey.longKey("read.size"), Long.valueOf(context.readSizeKB*1000)); - this.readSpan.setAllAttributes(attributes); + this.readSpan = createReadSpan(ctx, dp); if (context.originalRD == null) { @@ -304,6 +292,7 @@ public HpccRemoteFileReader(FileReadContext ctx, DataPartition dp, IRecordBuilde this.inputStream = new RowServiceInputStream(this.dataPartition, context.originalRD, projectedRecordDefinition, context.connectTimeout, context.recordReadLimit, context.createPrefetchThread, context.readSizeKB, null, false, context.socketOpTimeoutMS, this.readSpan); + this.inputStream.setReadRequestSpanBatchSize(context.readRequestSpanBatchSize); this.binaryRecordReader = new BinaryRecordReader(this.inputStream); this.binaryRecordReader.initialize(this.recordBuilder); @@ -321,6 +310,7 @@ public HpccRemoteFileReader(FileReadContext ctx, DataPartition dp, IRecordBuilde this.inputStream = new RowServiceInputStream(this.dataPartition, context.originalRD, projectedRecordDefinition, context.connectTimeout, context.recordReadLimit, context.createPrefetchThread, context.readSizeKB, restartInfo, false, context.socketOpTimeoutMS, this.readSpan); + this.inputStream.setReadRequestSpanBatchSize(context.readRequestSpanBatchSize); long bytesToSkip = resumeInfo.recordReaderStreamPos - resumeInfo.inputStreamPos; if (bytesToSkip < 0) @@ -328,6 +318,7 @@ public HpccRemoteFileReader(FileReadContext ctx, DataPartition dp, IRecordBuilde Exception e = new Exception("Unable to restart read stream, unexpected stream position in record reader."); this.readSpan.recordException(e); this.readSpan.end(); + throw e; } this.inputStream.skip(bytesToSkip); @@ -344,6 +335,35 @@ public HpccRemoteFileReader(FileReadContext ctx, DataPartition dp, IRecordBuilde openTimeMs = System.currentTimeMillis(); } + private static Span createReadSpan(FileReadContext context, DataPartition dp) + { + String readSpanName = "HPCCRemoteFileReader/Read_" + dp.getFileName() + "_" + dp.getThisPart(); + Span readSpan = Utils.createChildSpan(context.parentSpan, readSpanName); + readSpan.setStatus(StatusCode.OK); + + String primaryIP = dp.getCopyIP(0); + String secondaryIP = ""; + if (dp.getCopyCount() > 1) + { + secondaryIP = dp.getCopyIP(1); + } + + long readSize = context.readSizeKB; + if (readSize < 0) + { + readSize = RowServiceInputStream.DEFAULT_MAX_READ_SIZE_KB; + } + readSize *= 1000; + + Attributes attributes = Attributes.of( AttributeKey.stringKey("server.0.address"), primaryIP, + AttributeKey.stringKey("server.1.address"), secondaryIP, + AttributeKey.stringKey("server.port"), Integer.toString(dp.getPort()), + AttributeKey.longKey("read.size"), Long.valueOf(readSize)); + readSpan.setAllAttributes(attributes); + + return readSpan; + } + private boolean retryRead() { if (retryCount < maxReadRetries) @@ -364,20 +384,12 @@ private boolean retryRead() try { - String readSpanName = "HPCCRemoteFileReader.RowService/Read_" + dataPartition.getFileName() + "_" + dataPartition.getThisPart(); - if (context.parentSpan != null) - { - this.readSpan = Utils.createChildSpan(context.parentSpan, readSpanName); - } - else - { - this.readSpan = Utils.createSpan(readSpanName); - } + this.readSpan = createReadSpan(context, dataPartition); this.inputStream = new RowServiceInputStream(this.dataPartition, context.originalRD,this.recordBuilder.getRecordDefinition(), context.connectTimeout, context.recordReadLimit, context.createPrefetchThread, context.readSizeKB, restartInfo, false, context.socketOpTimeoutMS, this.readSpan); - + this.inputStream.setReadRequestSpanBatchSize(context.readRequestSpanBatchSize); long bytesToSkip = resumeInfo.recordReaderStreamPos - resumeInfo.inputStreamPos; if (bytesToSkip < 0) { @@ -391,6 +403,7 @@ private boolean retryRead() catch (Exception e) { this.readSpan.recordException(e); + this.readSpan.setStatus(StatusCode.ERROR); this.readSpan.end(); log.error("Failed to retry read for " + this.dataPartition.toString() + " " + e.getMessage(), e); return false; @@ -529,6 +542,10 @@ public boolean hasNext() } catch (HpccFileException e) { + this.readSpan.recordException(e); + this.readSpan.setStatus(StatusCode.ERROR); + this.readSpan.end(); + if (!retryRead()) { canReadNext = false; @@ -564,6 +581,10 @@ public T next() } catch (HpccFileException e) { + this.readSpan.recordException(e); + this.readSpan.setStatus(StatusCode.ERROR); + this.readSpan.end(); + if (!retryRead()) { log.error("Read failure for " + this.dataPartition.toString() + " " + e.getMessage(), e); diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java index d7bbfff79..62fc8e593 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java @@ -24,6 +24,8 @@ import java.util.concurrent.atomic.AtomicInteger; import java.io.InputStream; +import java.sql.Timestamp; + import javax.net.SocketFactory; import javax.net.ssl.SSLSocket; import javax.net.ssl.SSLSocketFactory; @@ -55,6 +57,60 @@ */ public class RowServiceInputStream extends InputStream implements IProfilable { + private static class ReadRequestEvent + { + public long requestTime = 0; + public long requestStreamPos = 0; + public long responseTime = 0; + public int bytesRead = 0; + public int requestSize = 0; + }; + + public static class RestartInformation + { + public long streamPos = 0; + public byte[] tokenBin = null; + } + + private static class RowServiceResponse + { + int len = 0; + int errorCode = 0; + int handle = -1; + String errorMessage = null; + } + + public static final int DEFAULT_READ_REQUEST_SPAN_BATCH_SIZE = 25; + public static final int DEFAULT_CONNECT_TIMEOUT_MILIS = 5000; // 5 second connection timeout + public static final int DEFAULT_SOCKET_OP_TIMEOUT_MS = 15000; // 15 second timeout on read / write operations + + // Note: The platform may respond with more data than this if records are larger than this limit. + public static final int DEFAULT_MAX_READ_SIZE_KB = 4096; + private static final int SHORT_SLEEP_MS = 1; + private static final int LONG_WAIT_THRESHOLD_US = 100; + private static final int MAX_HOT_LOOP_NS = 10000; + + // This is used to prevent the prefetch thread from hot looping when + // the network connection is slow. The read on the socket will block until + // at least 512 bytes are available + private static final int MIN_SOCKET_READ_SIZE = 512; + + public static final String BYTES_READ_METRIC = "bytesRead"; + public static final String FIRST_BYTE_TIME_METRIC = "prefetchFirstByteTime"; + public static final String WAIT_TIME_METRIC = "parseWaitTime"; + public static final String MUTEX_WAIT_TIME_METRIC = "mutexWaitTime"; + public static final String SLEEP_TIME_METRIC = "prefetchSleepTime"; + + public static final String FETCH_START_TIME_METRIC = "fetchRequestStartTime"; + public static final String FETCH_TIME_METRIC = "fetchRequestReadTime"; + public static final String FETCH_FINISH_TIME_METRIC = "fetchRequestFinishTime"; + public static final String CLOSE_TIME_METRIC = "connectionCloseTime"; + + public static final String LONG_WAITS_METRIC = "numLongWaits"; + public static final String FETCHES_METRIC = "numFetches"; + public static final String PARTIAL_BLOCK_READS_METRIC = "numPartialBlockReads"; + public static final String BLOCK_READS_METRIC = "numBlockReads"; + private AtomicBoolean active = new AtomicBoolean(false); private AtomicBoolean closed = new AtomicBoolean(false); private boolean simulateFail = false; @@ -72,9 +128,17 @@ public class RowServiceInputStream extends InputStream implements IProfilable private java.io.DataOutputStream dos = null; private String rowServiceVersion = ""; - private Span readSpan = null; + private Span fileReadSpan = null; private String traceContextHeader = null; + private Span readRequestSpan = null; + private int readRequestCount = 0; + private int readRequestStart = 0; + private int readRequestBatchSize = DEFAULT_READ_REQUEST_SPAN_BATCH_SIZE; + + private List readRequestEvents = new ArrayList(); + private ReadRequestEvent currentReadRequestEvent = null; + private int filePartCopyIndexPointer = 0; //pointer into the prioritizedCopyIndexes struct private List prioritizedCopyIndexes = new ArrayList(); @@ -118,61 +182,18 @@ public class RowServiceInputStream extends InputStream implements IProfilable private long numBlockReads = 0; private Socket sock; - public static final int DEFAULT_CONNECT_TIMEOUT_MILIS = 5000; // 5 second connection timeout - public static final int DEFAULT_SOCKET_OP_TIMEOUT_MS = 15000; // 15 second timeout on read / write operations private int connectTimeout = DEFAULT_CONNECT_TIMEOUT_MILIS; private int socketOpTimeoutMs = DEFAULT_SOCKET_OP_TIMEOUT_MS; private static final Charset HPCCCharSet = Charset.forName("ISO-8859-1"); private static final Logger log = LogManager.getLogger(RowServiceInputStream.class); - // Note: The platform may respond with more data than this if records are larger than this limit. - private static final int DEFAULT_MAX_READ_SIZE_KB = 4096; - private static final int SHORT_SLEEP_MS = 1; - private static final int LONG_WAIT_THRESHOLD_US = 100; - private static final int MAX_HOT_LOOP_NS = 10000; - - // This is used to prevent the prefetch thread from hot looping when - // the network connection is slow. The read on the socket will block until - // at least 512 bytes are available - private static final int MIN_SOCKET_READ_SIZE = 512; - private int maxReadSizeKB = DEFAULT_MAX_READ_SIZE_KB; // Buffer compact threshold should always be smaller than buffer prefetch threshold private int bufferPrefetchThresholdKB = DEFAULT_MAX_READ_SIZE_KB/2; private int bufferCompactThresholdKB = DEFAULT_MAX_READ_SIZE_KB/4; - public static final String BYTES_READ_METRIC = "bytesRead"; - public static final String FIRST_BYTE_TIME_METRIC = "prefetchFirstByteTime"; - public static final String WAIT_TIME_METRIC = "parseWaitTime"; - public static final String MUTEX_WAIT_TIME_METRIC = "mutexWaitTime"; - public static final String SLEEP_TIME_METRIC = "prefetchSleepTime"; - - public static final String FETCH_START_TIME_METRIC = "fetchRequestStartTime"; - public static final String FETCH_TIME_METRIC = "fetchRequestReadTime"; - public static final String FETCH_FINISH_TIME_METRIC = "fetchRequestFinishTime"; - public static final String CLOSE_TIME_METRIC = "connectionCloseTime"; - - public static final String LONG_WAITS_METRIC = "numLongWaits"; - public static final String FETCHES_METRIC = "numFetches"; - public static final String PARTIAL_BLOCK_READS_METRIC = "numPartialBlockReads"; - public static final String BLOCK_READS_METRIC = "numBlockReads"; - - public static class RestartInformation - { - public long streamPos = 0; - public byte[] tokenBin = null; - } - - private static class RowServiceResponse - { - int len = 0; - int errorCode = 0; - int handle = -1; - String errorMessage = null; - } - /** * Instantiates a new row service input stream. * @@ -393,8 +414,8 @@ public RowServiceInputStream(DataPartition dp, FieldDef rd, FieldDef pRd, int co if (rdSpan != null && rdSpan.getSpanContext().isValid()) { - this.readSpan = rdSpan; - this.traceContextHeader = org.hpccsystems.ws.client.utils.Utils.getTraceParentHeader(readSpan); + this.fileReadSpan = rdSpan; + this.traceContextHeader = org.hpccsystems.ws.client.utils.Utils.getTraceParentHeader(fileReadSpan); } int copycount = dataPart.getCopyCount(); @@ -557,16 +578,20 @@ RestartInformation getRestartInformationForStreamPos(long streamPos) return restartInfo; } - private void setPrefetchException(HpccFileException e) { this.prefetchException = e; - if (readSpan != null) + if (readRequestSpan != null) + { + readRequestSpan.recordException(e); + readRequestSpan.setStatus(StatusCode.ERROR); + } + else if (fileReadSpan != null) { Attributes attributes = Attributes.of( AttributeKey.longKey("server.index"), Long.valueOf(getFilePartCopy()), ExceptionAttributes.EXCEPTION_MESSAGE, e.getMessage()); - readSpan.recordException(e, attributes); + fileReadSpan.recordException(e, attributes); } } @@ -668,6 +693,21 @@ public int getHandle() return handle; } + /** + * Sets the read request span batch size. + * + * @param batchSize the read request span batch size + */ + public void setReadRequestSpanBatchSize(int batchSize) + { + if (batchSize < 1) + { + batchSize = DEFAULT_READ_REQUEST_SPAN_BATCH_SIZE; + } + + this.readRequestBatchSize = batchSize; + } + /** * Simulate a handle failure and use the file token instead. The handle is set to an invalid value so the THOR node * will indicate that the handle is unknown and request a otken. @@ -731,11 +771,11 @@ public void startBlockingFetchRequest(List fetchOffsets) throws Exception if (inFetchingMode == false) { Exception wrappedException = new Exception("Error: attempted to start a fetch request for an input stream in sequential read mode."); - if (readSpan != null) + if (fileReadSpan != null) { Attributes attributes = Attributes.of( AttributeKey.longKey("server.index"), Long.valueOf(getFilePartCopy()), ExceptionAttributes.EXCEPTION_MESSAGE, wrappedException.getMessage()); - readSpan.recordException(wrappedException, attributes); + fileReadSpan.recordException(wrappedException, attributes); } throw wrappedException; } @@ -810,6 +850,75 @@ public void startBlockingFetchRequest(List fetchOffsets) throws Exception } } + private void startNewReadRequestSpan() + { + if (fileReadSpan != null) + { + // finishReadRequestSpan will clear the readRequestSpan when it is finished + boolean shouldStartNewSpan = readRequestSpan == null; + if (shouldStartNewSpan) + { + readRequestSpan = Utils.createChildSpan(fileReadSpan, "ReadRequest[" + readRequestCount + + "," + (readRequestCount + readRequestBatchSize) + "]"); + readRequestSpan.setAttribute("server.index", getFilePartCopy()); + readRequestSpan.setStatus(StatusCode.OK); + readRequestStart = readRequestCount; + } + readRequestCount++; + + currentReadRequestEvent = new ReadRequestEvent(); + currentReadRequestEvent.requestTime = System.currentTimeMillis(); + currentReadRequestEvent.requestStreamPos = streamPos; + currentReadRequestEvent.requestSize = maxReadSizeKB*1000; + } + } + + private void finishReadRequestSpan() + { + if (readRequestSpan != null) + { + if (currentReadRequestEvent != null) + { + currentReadRequestEvent.responseTime = System.currentTimeMillis(); + currentReadRequestEvent.bytesRead = totalDataInCurrentRequest; + readRequestEvents.add(currentReadRequestEvent); + + currentReadRequestEvent = null; + } + + int batchIndex = readRequestCount % readRequestBatchSize; + if (batchIndex == 0 || isClosed()) + { + List requestTimes = new ArrayList(); + List responseTimes = new ArrayList(); + List requestSizes = new ArrayList(); + List bytesRead = new ArrayList(); + List requestStreamPos = new ArrayList(); + + for (ReadRequestEvent event : readRequestEvents) + { + requestTimes.add( (new Timestamp(event.requestTime)).toString() ); + responseTimes.add( (new Timestamp(event.responseTime)).toString() ); + requestSizes.add((long)event.requestSize); + bytesRead.add((long)event.bytesRead); + requestStreamPos.add(event.requestStreamPos); + } + readRequestEvents.clear(); + + readRequestSpan.setAttribute(AttributeKey.stringArrayKey("requestTimes"), requestTimes); + readRequestSpan.setAttribute(AttributeKey.stringArrayKey("responseTimes"), responseTimes); + readRequestSpan.setAttribute(AttributeKey.longArrayKey("requestSizes"), requestSizes); + readRequestSpan.setAttribute(AttributeKey.longArrayKey("bytesRead"), bytesRead); + readRequestSpan.setAttribute(AttributeKey.longArrayKey("requestStreamPos"), requestStreamPos); + readRequestSpan.updateName( "ReadRequest[" + readRequestStart + "," + readRequestCount + "]"); + + readRequestSpan.end(); + readRequestSpan = null; + } + } + } + + // Run from prefetch thread only private int startFetch() { @@ -831,13 +940,7 @@ private int startFetch() numFetches++; if (!this.active.get()) { - if (readSpan != null) - { - Attributes attributes = Attributes.of( AttributeKey.longKey("server.index"), Long.valueOf(getFilePartCopy()), - AttributeKey.longKey("read.offset"), streamPos, - AttributeKey.longKey("read.size"), Long.valueOf(maxReadSizeKB*1000)); - readSpan.addEvent("RowServiceInputStream.readRequest", attributes); - } + startNewReadRequestSpan(); try { @@ -1044,11 +1147,11 @@ private void readDataInFetch() if (bytesToRead < 0) { IOException wrappedException = new IOException(prefix + "Encountered unexpected end of stream mid fetch, this.dis.available() returned " + bytesToRead + " bytes."); - if (readSpan != null) + if (fileReadSpan != null) { Attributes attributes = Attributes.of( AttributeKey.longKey("server.index"), Long.valueOf(getFilePartCopy()), ExceptionAttributes.EXCEPTION_MESSAGE, wrappedException.getMessage()); - readSpan.recordException(wrappedException, attributes); + fileReadSpan.recordException(wrappedException, attributes); } throw wrappedException; } @@ -1137,12 +1240,8 @@ private void finishFetch() catch(Exception ie){} } - if (readSpan != null) - { - Attributes attributes = Attributes.of( AttributeKey.longKey("server.index"), Long.valueOf(getFilePartCopy()), - AttributeKey.longKey("read.bytesRead"), Long.valueOf(totalDataInCurrentRequest)); - readSpan.addEvent("RowServiceInputStream.readResponse", attributes); - } + finishReadRequestSpan(); + //------------------------------------------------------------------------------ // Send read ahead request @@ -1150,14 +1249,7 @@ private void finishFetch() if (inFetchingMode == false) { - if (readSpan != null) - { - Attributes attributes = Attributes.of( AttributeKey.longKey("server.index"), Long.valueOf(getFilePartCopy()), - AttributeKey.longKey("read.offset"), streamPos, - AttributeKey.longKey("read.size"), Long.valueOf(maxReadSizeKB*1000)); - readSpan.addEvent("RowServiceInputStream.readRequest", attributes); - } - + startNewReadRequestSpan(); // Create the read ahead request if (this.simulateFail) this.handle = -1; @@ -1370,6 +1462,7 @@ public void close() throws IOException catch(Exception e){} } + finishReadRequestSpan(); this.sendCloseFileRequest(); @@ -1675,12 +1768,15 @@ private void makeActive() throws HpccFileException this.handle = 0; String prefix = "RowServiceInputStream.makeActive, file " + dataPart.getFileName() + " part " + dataPart.getThisPart() + " on IP " + getIP() + ":"; - if (readSpan != null) + Span connectSpan = null; + if (fileReadSpan != null) { - Attributes attributes = Attributes.of( AttributeKey.longKey("server.index"), Long.valueOf(getFilePartCopy()) ); - readSpan.addEvent("RowServiceInputStream.connect", attributes); + + connectSpan = Utils.createChildSpan(fileReadSpan, "Connect"); + connectSpan.setAttribute("server.index", getFilePartCopy()); } + boolean needsRetry = false; do { @@ -1727,11 +1823,29 @@ private void makeActive() throws HpccFileException } catch (java.net.UnknownHostException e) { - throw new HpccFileException(prefix + "Bad file part IP address or host name: " + e.getMessage(),e); + HpccFileException wrappedException = new HpccFileException(prefix + "Bad file part IP address or host name: " + e.getMessage(),e); + + if (connectSpan != null) + { + connectSpan.recordException(wrappedException); + connectSpan.setStatus(StatusCode.ERROR); + connectSpan.end(); + } + + throw wrappedException; } catch (java.io.IOException e) { - throw new HpccFileException(prefix + " error making part active:" + e.getMessage(),e); + HpccFileException wrappedException = new HpccFileException(prefix + " error making part active:" + e.getMessage(),e); + + if (connectSpan != null) + { + connectSpan.recordException(wrappedException); + connectSpan.setStatus(StatusCode.ERROR); + connectSpan.end(); + } + + throw wrappedException; } try @@ -1741,17 +1855,35 @@ private void makeActive() throws HpccFileException } catch (java.io.IOException e) { - throw new HpccFileException(prefix + " Failed to make streams for datapart:" + e.getMessage(), e); + HpccFileException wrappedException = new HpccFileException(prefix + " Failed to make streams for datapart:" + e.getMessage(), e); + + if (connectSpan != null) + { + connectSpan.recordException(wrappedException); + connectSpan.setStatus(StatusCode.ERROR); + connectSpan.end(); + } + + throw wrappedException; + } + + if (connectSpan != null) + { + connectSpan.setStatus(StatusCode.OK); + connectSpan.end(); } //------------------------------------------------------------------------------ // Check protocol version //------------------------------------------------------------------------------ - if (readSpan != null) + + Span versionSpan = null; + if (fileReadSpan != null) { - Attributes attributes = Attributes.of( AttributeKey.longKey("server.index"), Long.valueOf(getFilePartCopy()) ); - readSpan.addEvent("RowServiceInputStream.versionRequest", attributes); + versionSpan = Utils.createChildSpan(fileReadSpan, "VersionRequest"); + versionSpan.setAttribute( AttributeKey.longKey("server.index"), Long.valueOf(getFilePartCopy()) ); + versionSpan.setStatus(StatusCode.OK); } try @@ -1765,7 +1897,15 @@ private void makeActive() throws HpccFileException } catch (IOException e) { - throw new HpccFileException(prefix+ " Failed on initial remote read transfer: " + e.getMessage(),e); + HpccFileException wrappedException = new HpccFileException(prefix+ " Failed on initial remote read transfer: " + e.getMessage(),e); + if (versionSpan != null) + { + versionSpan.setStatus(StatusCode.ERROR); + versionSpan.recordException(wrappedException); + versionSpan.end(); + } + + throw wrappedException; } RowServiceResponse response = readResponse(); @@ -1784,23 +1924,31 @@ private void makeActive() throws HpccFileException } catch (IOException e) { - throw new HpccFileException(prefix + "Error while attempting to read version response:" + e.getMessage(), e); + HpccFileException wrappedException = new HpccFileException(prefix + "Error while attempting to read version response:" + e.getMessage(), e); + if (versionSpan != null) + { + versionSpan.setStatus(StatusCode.ERROR); + versionSpan.recordException(wrappedException); + versionSpan.end(); + } + + throw wrappedException; } rowServiceVersion = new String(versionBytes, HPCCCharSet); + } - if (readSpan != null) - { - Attributes attributes = Attributes.of( AttributeKey.longKey("server.index"), Long.valueOf(getFilePartCopy()), - ServiceAttributes.SERVICE_VERSION, rowServiceVersion); - readSpan.addEvent("RowServiceInputStream.versionResponse", attributes); - } + if (versionSpan != null) + { + versionSpan.setAttribute(ServiceAttributes.SERVICE_VERSION, rowServiceVersion); + versionSpan.end(); } //------------------------------------------------------------------------------ // Send initial read request //------------------------------------------------------------------------------ + startNewReadRequestSpan(); try { String readTrans = null; @@ -1821,7 +1969,16 @@ private void makeActive() throws HpccFileException } catch (IOException e) { - throw new HpccFileException(prefix + " Failed on initial remote read read trans:" + e.getMessage(), e); + HpccFileException wrappedException = new HpccFileException(prefix + " Failed on initial remote read read trans:" + e.getMessage(), e); + + if (readRequestSpan != null) + { + readRequestSpan.recordException(wrappedException); + readRequestSpan.setStatus(StatusCode.ERROR); + readRequestSpan.end(); + } + + throw wrappedException; } if (CompileTimeConstants.PROFILE_CODE) @@ -2273,6 +2430,14 @@ private void sendCloseFileRequest() throws IOException return; } + Span closeSpan = null; + if (fileReadSpan != null) + { + closeSpan = Utils.createChildSpan(fileReadSpan, "CloseRequest"); + closeSpan.setAttribute("server.index", getFilePartCopy()); + closeSpan.setStatus(StatusCode.OK); + } + String closeFileRequest = makeCloseHandleRequest(); int jsonRequestLen = closeFileRequest.length(); @@ -2296,14 +2461,19 @@ private void sendCloseFileRequest() throws IOException catch (HpccFileException e) { IOException wrappedException = new IOException(prefix + "Failed to close file. Unable to read response with error: " + e.getMessage(), e); - if (readSpan != null) + if (closeSpan != null) { - Attributes attributes = Attributes.of( AttributeKey.longKey("server.index"), Long.valueOf(getFilePartCopy()), - ExceptionAttributes.EXCEPTION_MESSAGE, wrappedException.getMessage()); - readSpan.recordException(wrappedException, attributes); + closeSpan.recordException(wrappedException); + closeSpan.setStatus(StatusCode.ERROR); + closeSpan.end(); } throw wrappedException; } + + if (closeSpan != null) + { + closeSpan.end(); + } } private RowServiceResponse readResponse() throws HpccFileException diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceOutputStream.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceOutputStream.java index e21808f8d..036cd825b 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceOutputStream.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceOutputStream.java @@ -31,7 +31,9 @@ import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.StatusCode; import io.opentelemetry.semconv.ServerAttributes; +import io.opentelemetry.semconv.ServiceAttributes; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.LogManager; @@ -68,7 +70,7 @@ public class RowServiceOutputStream extends OutputStream private long handle = -1; private ByteBuffer scratchBuffer = ByteBuffer.allocate(SCRATCH_BUFFER_LEN); - private Span writeSpan = null; + private Span fileWriteSpan = null; private String traceContextHeader = null; private static class RowServiceResponse @@ -221,13 +223,13 @@ private static class RowServiceResponse * the socket connect timeout in ms (default is 5000) * @param socketOpTimeoutMS * the socket operation(read/write) timeout in ms (default is 15000) - * @param writeSpan + * @param fileWriteSpan * the opentelemetry span to use for tracing * @throws Exception * the exception */ RowServiceOutputStream(String ip, int port, boolean useSSL, String accessToken, FieldDef recordDef, int filePartIndex, String filePartPath, - CompressionAlgorithm fileCompression, int connectTimeoutMs, int sockOpTimeoutMS, Span writeSpan) throws Exception + CompressionAlgorithm fileCompression, int connectTimeoutMs, int sockOpTimeoutMS, Span fileWriteSpan) throws Exception { this.rowServiceIP = ip; this.rowServicePort = port; @@ -248,15 +250,18 @@ private static class RowServiceResponse connectTimeoutMs = DEFAULT_CONNECT_TIMEOUT_MILIS; } - if (writeSpan != null && writeSpan.getSpanContext().isValid()) + if (fileWriteSpan != null && fileWriteSpan.getSpanContext().isValid()) { - this.writeSpan = writeSpan; - this.traceContextHeader = org.hpccsystems.ws.client.utils.Utils.getTraceParentHeader(writeSpan); + this.fileWriteSpan = fileWriteSpan; + this.traceContextHeader = org.hpccsystems.ws.client.utils.Utils.getTraceParentHeader(fileWriteSpan); } - if (this.writeSpan != null) + Span connectSpan = null; + if (this.fileWriteSpan != null) { - writeSpan.addEvent("RowServiceOutputStream.connect", getServerAttributes()); + connectSpan = Utils.createChildSpan(fileWriteSpan, "Connect"); + connectSpan.setStatus(StatusCode.OK); + connectSpan.setAllAttributes(getServerAttributes()); } try @@ -302,21 +307,31 @@ private static class RowServiceResponse log.error(errorMessage); Exception wrappedException = new Exception(errorMessage, e); - if (writeSpan != null) + if (connectSpan != null) { - writeSpan.recordException(wrappedException, getServerAttributes()); + connectSpan.recordException(wrappedException, getServerAttributes()); + connectSpan.setStatus(StatusCode.ERROR); + connectSpan.end(); } throw wrappedException; } + if (connectSpan != null) + { + connectSpan.end(); + } + //------------------------------------------------------------------------------ // Check protocol version //------------------------------------------------------------------------------ - if (writeSpan != null) + Span versionSpan = null; + if (fileWriteSpan != null) { - writeSpan.addEvent("RowServiceOutputStream.versionRequest", getServerAttributes()); + versionSpan = Utils.createChildSpan(fileWriteSpan, "VersionRequest"); + versionSpan.setStatus(StatusCode.OK); + versionSpan.setAllAttributes(getServerAttributes()); } try @@ -331,9 +346,11 @@ private static class RowServiceResponse catch (IOException e) { HpccFileException wrappedException = new HpccFileException("Failed on initial remote read read trans", e); - if (writeSpan != null) + if (versionSpan != null) { - writeSpan.recordException(wrappedException, getServerAttributes()); + versionSpan.recordException(wrappedException, getServerAttributes()); + versionSpan.setStatus(StatusCode.ERROR); + versionSpan.end(); } throw wrappedException; @@ -356,9 +373,11 @@ private static class RowServiceResponse catch (IOException e) { HpccFileException wrappedException = new HpccFileException("Error while attempting to read version response.", e); - if (writeSpan != null) + if (versionSpan != null) { - writeSpan.recordException(wrappedException, getServerAttributes()); + versionSpan.recordException(wrappedException, getServerAttributes()); + versionSpan.setStatus(StatusCode.ERROR); + versionSpan.end(); } throw wrappedException; @@ -367,6 +386,12 @@ private static class RowServiceResponse rowServiceVersion = new String(versionBytes, StandardCharsets.ISO_8859_1); } + if (versionSpan != null) + { + versionSpan.setAttribute(ServiceAttributes.SERVICE_VERSION, rowServiceVersion); + versionSpan.end(); + } + // Go ahead and make the initial write request. This won't write any data to file // but it will cause the file to be opened on the remote server and keeps our access // token from expiring before we can start writing @@ -435,9 +460,9 @@ private void makeInitialWriteRequest() throws Exception if (response.errorCode != RFCCodes.RFCStreamNoError) { IOException wrappedException = new IOException(response.errorMessage); - if (writeSpan != null) + if (fileWriteSpan != null) { - writeSpan.recordException(wrappedException, getServerAttributes()); + fileWriteSpan.recordException(wrappedException, getServerAttributes()); } throw wrappedException; @@ -481,9 +506,9 @@ private void sendCloseFileRequest() throws IOException catch (IOException e) { IOException wrappedException = new IOException("Failed on close file with error: ", e); - if (writeSpan != null) + if (fileWriteSpan != null) { - writeSpan.recordException(wrappedException, getServerAttributes()); + fileWriteSpan.recordException(wrappedException, getServerAttributes()); } throw wrappedException; @@ -497,9 +522,9 @@ private void sendCloseFileRequest() throws IOException catch (HpccFileException e) { IOException wrappedException = new IOException("Failed to close file. Unable to read response with error: ", e); - if (writeSpan != null) + if (fileWriteSpan != null) { - writeSpan.recordException(wrappedException, getServerAttributes()); + fileWriteSpan.recordException(wrappedException, getServerAttributes()); } throw wrappedException; @@ -508,9 +533,9 @@ private void sendCloseFileRequest() throws IOException if (response.errorCode != RFCCodes.RFCStreamNoError) { IOException wrappedException = new IOException(response.errorMessage); - if (writeSpan != null) + if (fileWriteSpan != null) { - writeSpan.recordException(wrappedException, getServerAttributes()); + fileWriteSpan.recordException(wrappedException, getServerAttributes()); } throw wrappedException; @@ -577,9 +602,9 @@ private RowServiceResponse readResponse() throws HpccFileException if (response.len < 4) { HpccFileException wrappedException = new HpccFileException("Early data termination, no handle"); - if (writeSpan != null) + if (fileWriteSpan != null) { - writeSpan.recordException(wrappedException, getServerAttributes()); + fileWriteSpan.recordException(wrappedException, getServerAttributes()); } throw wrappedException; @@ -591,9 +616,9 @@ private RowServiceResponse readResponse() throws HpccFileException catch (IOException e) { HpccFileException wrappedException = new HpccFileException("Error while attempting to read row service response: ", e); - if (writeSpan != null) + if (fileWriteSpan != null) { - writeSpan.recordException(wrappedException, getServerAttributes()); + fileWriteSpan.recordException(wrappedException, getServerAttributes()); } throw wrappedException; @@ -618,9 +643,9 @@ public void close() throws IOException else if (bytesWritten == 0 && compressionAlgo != CompressionAlgorithm.NONE) { IOException wrappedException = new IOException("Fatal error while closing file. Writing compressed files with 0 length is not supported with the remote HPCC cluster."); - if (writeSpan != null) + if (fileWriteSpan != null) { - writeSpan.recordException(wrappedException, getServerAttributes()); + fileWriteSpan.recordException(wrappedException, getServerAttributes()); } throw wrappedException; @@ -694,9 +719,9 @@ public void write(byte[] b, int off, int len) throws IOException catch (HpccFileException e) { IOException wrappedException = new IOException("Failed during write operation. Unable to read response with error: ", e); - if (writeSpan != null) + if (fileWriteSpan != null) { - writeSpan.recordException(wrappedException, getServerAttributes()); + fileWriteSpan.recordException(wrappedException, getServerAttributes()); } throw wrappedException; @@ -705,9 +730,9 @@ public void write(byte[] b, int off, int len) throws IOException if (response.errorCode != RFCCodes.RFCStreamNoError) { IOException wrappedException = new IOException(response.errorMessage); - if (writeSpan != null) + if (fileWriteSpan != null) { - writeSpan.recordException(wrappedException, getServerAttributes()); + fileWriteSpan.recordException(wrappedException, getServerAttributes()); } throw wrappedException; From 1914fa24a239ee4c479beb75294718cafb2a9f41 Mon Sep 17 00:00:00 2001 From: James McMullan Date: Wed, 4 Sep 2024 13:47:59 -0400 Subject: [PATCH 5/5] HPCC4J-637 DFSClient: FileUtility add credential prompting (#744) Signed-off-by: James McMullan James.McMullan@lexisnexis.com --- .../hpccsystems/dfs/client/FileUtility.java | 45 +++++++++++++++---- 1 file changed, 37 insertions(+), 8 deletions(-) diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/FileUtility.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/FileUtility.java index 116c37c21..c41ae0946 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/FileUtility.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/FileUtility.java @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.io.BufferedInputStream; +import java.io.Console; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; @@ -322,6 +323,28 @@ public JSONArray generateResultsMessage() } }; + private static String[] getCredentials(CommandLine cmd) + { + Console console = System.console(); + + String user = cmd.getOptionValue("user"); + boolean userIsEmpty = user == null || user.isEmpty(); + if (userIsEmpty) + { + user = new String(console.readLine("Enter username: ")); + userIsEmpty = user == null || user.isEmpty(); + } + + String pass = cmd.getOptionValue("pass"); + boolean passIsEmpty = pass == null || pass.isEmpty(); + if (!userIsEmpty && passIsEmpty) + { + pass = new String(console.readPassword("Enter password for " + user + ": ")); + } + + return new String[] {user, pass}; + } + private static enum FileFormat { THOR, @@ -1198,8 +1221,10 @@ private static void performRead(String[] args, TaskContext context) } String connString = cmd.getOptionValue("url"); - String user = cmd.getOptionValue("user"); - String pass = cmd.getOptionValue("pass"); + + String[] creds = getCredentials(cmd); + String user = creds[0]; + String pass = creds[1]; String outputPath = cmd.getOptionValue("out","."); @@ -1376,8 +1401,10 @@ private static void performReadTest(String[] args, TaskContext context) } String connString = cmd.getOptionValue("url"); - String user = cmd.getOptionValue("user"); - String pass = cmd.getOptionValue("pass"); + + String[] creds = getCredentials(cmd); + String user = creds[0]; + String pass = creds[1]; String outputPath = cmd.getOptionValue("out","."); @@ -1560,8 +1587,9 @@ private static void performCopy(String[] args, TaskContext context) + numThreadsStr + ", must be an integer. Defaulting to: " + NUM_DEFAULT_THREADS + " threads."); } - String user = cmd.getOptionValue("user"); - String pass = cmd.getOptionValue("pass"); + String[] creds = getCredentials(cmd); + String user = creds[0]; + String pass = creds[1]; String destClusterName = cmd.getOptionValue("dest_cluster"); @@ -1741,8 +1769,9 @@ private static void performWrite(String[] args, TaskContext context) + numThreadsStr + ", must be an integer. Defaulting to: " + NUM_DEFAULT_THREADS + " threads."); } - String user = cmd.getOptionValue("user"); - String pass = cmd.getOptionValue("pass"); + String[] creds = getCredentials(cmd); + String user = creds[0]; + String pass = creds[1]; String destClusterName = cmd.getOptionValue("dest_cluster");