From 8d1e1ac8ee46595a7700d1d07ccbeba10ca9ee6b Mon Sep 17 00:00:00 2001 From: James McMullan Date: Fri, 3 May 2024 09:31:02 -0400 Subject: [PATCH] HPCC4J-577 Added Read Retry to HPCCRemoteFileReader - Added Read Retry to HPCCRemoteFileReader - Minor improvements to FileUtility to improve testing Signed-off-by: James McMullan James.McMullan@lexisnexis.com --- .../hpccsystems/dfs/client/FileUtility.java | 25 ++-- .../dfs/client/HpccRemoteFileReader.java | 108 +++++++++++++++--- 2 files changed, 110 insertions(+), 23 deletions(-) diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/FileUtility.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/FileUtility.java index 2cf0e3b25..40c766323 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/FileUtility.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/FileUtility.java @@ -64,6 +64,7 @@ public class FileUtility private static final int DEFAULT_SPLIT_TABLE_SIZE = 128; private static final int NUM_DEFAULT_THREADS = 4; + static private final int DEFAULT_ACCESS_EXPIRY_SECONDS = 120; private static class TaskContext { @@ -425,6 +426,7 @@ private static Options getReadTestOptions() options.addOption("user", true, "Specifies the username used to connect. Defaults to null."); options.addOption("pass", true, "Specifies the password used to connect. Defaults to null."); options.addOption("num_threads", true, "Specifies the number of parallel to use to perform operations."); + options.addOption("access_expiry_seconds", true, "Access token expiration seconds."); options.addOption(Option.builder("file_parts") .argName("_file_parts") @@ -633,11 +635,6 @@ private static String[] filterFilesByFormat(String[] srcFiles, FileFormat format private static void executeTasks(Runnable[] tasks, int numThreads) throws Exception { - if (tasks.length > numThreads) - { - numThreads = tasks.length; - } - int numTasksPerThread = tasks.length / numThreads; int numResidualTasks = tasks.length % numThreads; @@ -686,16 +683,15 @@ private static Runnable[] createReadTestTasks(DataPartition[] fileParts, FieldDe { final int taskIndex = i; final DataPartition filePart = fileParts[taskIndex]; - final HpccRemoteFileReader filePartReader = new HpccRemoteFileReader(filePart, recordDef, new HPCCRecordBuilder(recordDef)); tasks[taskIndex] = new Runnable() { - HpccRemoteFileReader fileReader = filePartReader; - public void run() { try { + HpccRemoteFileReader fileReader = new HpccRemoteFileReader(filePart, recordDef, new HPCCRecordBuilder(recordDef)); + while (fileReader.hasNext()) { HPCCRecord record = fileReader.next(); @@ -1250,6 +1246,18 @@ private static void performReadTest(String[] args, TaskContext context) + numThreadsStr + ", must be an integer. Defaulting to: " + NUM_DEFAULT_THREADS + " threads."); } + int expirySeconds = DEFAULT_ACCESS_EXPIRY_SECONDS; + String expirySecondsStr = cmd.getOptionValue("access_expiry_seconds", "" + expirySeconds); + try + { + expirySeconds = Integer.parseInt(expirySecondsStr); + } + catch(Exception e) + { + System.out.println("Invalid option value for access_expiry_seconds: " + + numThreadsStr + ", must be an integer. Defaulting to: " + DEFAULT_ACCESS_EXPIRY_SECONDS + "s."); + } + String formatStr = cmd.getOptionValue("format"); if (formatStr == null) { @@ -1277,6 +1285,7 @@ private static void performReadTest(String[] args, TaskContext context) try { file = new HPCCFile(datasetName, connString, user, pass); + file.setFileAccessExpirySecs(expirySeconds); } catch (Exception e) { diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/HpccRemoteFileReader.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/HpccRemoteFileReader.java index dad69ec3a..1a4d0dc54 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/HpccRemoteFileReader.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/HpccRemoteFileReader.java @@ -38,12 +38,20 @@ public class HpccRemoteFileReader implements Iterator private boolean handlePrefetch = true; private boolean isClosed = false; private boolean canReadNext = true; + private boolean createPrefetchThread = true; + private int retryCount = 0; + private int connectTimeout = 0; + private int readSizeKB = 0; + private int limit = -1; + private int maxReadRetries = DEFAULT_READ_RETRIES; + private int socketOpTimeoutMs = 0; private long openTimeMs = 0; private long recordsRead = 0; public static final int NO_RECORD_LIMIT = -1; public static final int DEFAULT_READ_SIZE_OPTION = -1; public static final int DEFAULT_CONNECT_TIMEOUT_OPTION = -1; + public static final int DEFAULT_READ_RETRIES = 3; public static class FileReadResumeInfo { @@ -189,18 +197,23 @@ public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilde { this.handlePrefetch = createPrefetchThread; this.originalRecordDef = originalRD; - if (this.originalRecordDef == null) - { - throw new Exception("HpccRemoteFileReader: Original record definition is null."); - } + this.dataPartition = dp; + this.recordBuilder = recBuilder; + this.readSizeKB = readSizeKB; + this.limit = limit; + this.createPrefetchThread = createPrefetchThread; + this.socketOpTimeoutMs = socketOpTimeoutMs; if (connectTimeout < 1) { connectTimeout = RowServiceInputStream.DEFAULT_CONNECT_TIMEOUT_MILIS; } + this.connectTimeout = connectTimeout; - this.dataPartition = dp; - this.recordBuilder = recBuilder; + if (this.originalRecordDef == null) + { + throw new Exception("HpccRemoteFileReader: Provided original record definition is null, original record definition is required."); + } FieldDef projectedRecordDefinition = recBuilder.getRecordDefinition(); if (projectedRecordDefinition == null) @@ -246,6 +259,61 @@ public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilde openTimeMs = System.currentTimeMillis(); } + private boolean retryRead() + { + if (retryCount < maxReadRetries) + { + log.info("Retrying read for " + this.dataPartition.toString() + " retry count: " + retryCount); + retryCount++; + + FileReadResumeInfo resumeInfo = getFileReadResumeInfo(); + RowServiceInputStream.RestartInformation restartInfo = new RowServiceInputStream.RestartInformation(); + restartInfo.streamPos = resumeInfo.inputStreamPos; + restartInfo.tokenBin = resumeInfo.tokenBin; + + try + { + this.inputStream.close(); + } + catch (Exception e) {} + + try + { + this.inputStream = new RowServiceInputStream(this.dataPartition, this.originalRecordDef, + this.recordBuilder.getRecordDefinition(), this.connectTimeout, this.limit, this.createPrefetchThread, + this.readSizeKB, restartInfo, false, this.socketOpTimeoutMs); + long bytesToSkip = resumeInfo.recordReaderStreamPos - resumeInfo.inputStreamPos; + if (bytesToSkip < 0) + { + throw new Exception("Unable to restart read stream, unexpected stream position in record reader."); + } + this.inputStream.skip(bytesToSkip); + + this.binaryRecordReader = new BinaryRecordReader(this.inputStream, resumeInfo.recordReaderStreamPos); + this.binaryRecordReader.initialize(this.recordBuilder); + } + catch (Exception e) + { + log.error("Failed to retry read for " + this.dataPartition.toString() + " " + e.getMessage(), e); + return false; + } + + return true; + } + + return false; + } + + /** + * Sets the maximum number of times to retry a read operation before failing. + * + * @param maxReadRetries maximum number of read retries + */ + public void setMaxReadRetries(int maxReadRetries) + { + this.maxReadRetries = maxReadRetries; + } + /** * Returns the stream position within the file. * @@ -363,11 +431,16 @@ public boolean hasNext() } catch (HpccFileException e) { - canReadNext = false; - log.error("Read failure for " + this.dataPartition.toString()); - java.util.NoSuchElementException exception = new java.util.NoSuchElementException("Fatal read error: " + e.getMessage()); - exception.initCause(e); - throw exception; + if (!retryRead()) + { + canReadNext = false; + log.error("Read failure for " + this.dataPartition.toString(), e); + java.util.NoSuchElementException exception = new java.util.NoSuchElementException("Fatal read error: " + e.getMessage()); + exception.initCause(e); + throw exception; + } + + return hasNext(); } return canReadNext; @@ -393,10 +466,15 @@ public T next() } catch (HpccFileException e) { - log.error("Read failure for " + this.dataPartition.toString() + " " + e.getMessage()); - java.util.NoSuchElementException exception = new java.util.NoSuchElementException("Fatal read error: " + e.getMessage()); - exception.initCause(e); - throw exception; + if (!retryRead()) + { + log.error("Read failure for " + this.dataPartition.toString() + " " + e.getMessage(), e); + java.util.NoSuchElementException exception = new java.util.NoSuchElementException("Fatal read error: " + e.getMessage()); + exception.initCause(e); + throw exception; + } + + return next(); } recordsRead++;