Skip to content

Commit

Permalink
HPCC4J-645 FileUtility filtering support and testing improvements (#757)
Browse files Browse the repository at this point in the history
Signed-off-by: James McMullan [email protected]
  • Loading branch information
jpmcmu authored Sep 17, 2024
1 parent ed4ce33 commit 735de52
Showing 1 changed file with 132 additions and 0 deletions.
132 changes: 132 additions & 0 deletions dfsclient/src/main/java/org/hpccsystems/dfs/client/FileUtility.java
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ private static class TaskOperation
public String currentOperationDesc = "";
public long operationStartNS = 0;


public List<String> errorMessages = new ArrayList<String>();
public List<String> warnMessages = new ArrayList<String>();

Expand Down Expand Up @@ -156,6 +157,9 @@ public JSONObject end(boolean success)
private Stack<TaskOperation> operations = new Stack<TaskOperation>();
public List<JSONObject> operationResults = new ArrayList<JSONObject>();

public int readRetries = HpccRemoteFileReader.DEFAULT_READ_RETRIES;
public int socketOpTimeoutMS = RowServiceInputStream.DEFAULT_SOCKET_OP_TIMEOUT_MS;

public void setCurrentOperationSpanAttributes(Attributes attributes)
{
if (!hasCurrentOperation())
Expand Down Expand Up @@ -545,6 +549,46 @@ public void save(OutputStream outStream) throws IOException
}
}

private static int getReadRetries(CommandLine cmd)
{
int readRetries = HpccRemoteFileReader.DEFAULT_READ_RETRIES;
String retriesStr = cmd.getOptionValue("read_retries");
if (retriesStr != null)
{
try
{
readRetries = Integer.parseInt(retriesStr);
}
catch(Exception e)
{
System.out.println("Invalid option value for read_retries: "
+ retriesStr + ", must be an integer. Defaulting to: " + HpccRemoteFileReader.DEFAULT_READ_RETRIES + " retries.");
}
}

return readRetries;
}

private static int getSocketOpTimeoutMS(CommandLine cmd)
{
int socketOpTimeoutS = RowServiceInputStream.DEFAULT_SOCKET_OP_TIMEOUT_MS / 1000;
String timeoutStr = cmd.getOptionValue("socket_timeout_seconds");
if (timeoutStr != null)
{
try
{
socketOpTimeoutS = Integer.parseInt(timeoutStr);
}
catch(Exception e)
{
System.out.println("Invalid option value for socket_timeout: "
+ timeoutStr + ", must be an integer. Defaulting to: " + socketOpTimeoutS + " seconds.");
}
}

return socketOpTimeoutS * 1000;
}

private static Options getReadOptions()
{
Options options = new Options();
Expand All @@ -554,6 +598,10 @@ private static Options getReadOptions()
options.addOption("format", true, "Specifies the output format to be used when writing files to disk. Defaults to Thor files.");
options.addOption("num_threads", true, "Specifies the number of parallel to use to perform operations.");
options.addOption("out", true, "Specifies the directory that the files should be written to.");
options.addOption("filter", true, "Specifies a filter to apply to the files read from the cluster.");
options.addOption("ignore_tlk", false, "Ignore the TLK file when reading Index files.");
options.addOption("read_retries", true, "Sets the maximum number of retries to attempt when reading a file.");
options.addOption("socket_timeout_seconds", true, "Sets the socket operation timeout in seconds.");

options.addOption(Option.builder("read")
.argName("files")
Expand All @@ -576,6 +624,10 @@ private static Options getReadTestOptions()
options.addOption("access_expiry_seconds", true, "Access token expiration seconds.");
options.addOption("read_request_size", true, "The size of the read requests in KB sent to the rowservice.");
options.addOption("read_request_delay", true, "The delay in MS between read requests sent to the rowservice.");
options.addOption("filter", true, "Specifies a filter to apply to the files read from the cluster.");
options.addOption("ignore_tlk", false, "Ignore the TLK file when reading Index files.");
options.addOption("read_retries", true, "Sets the maximum number of retries to attempt when reading a file.");
options.addOption("socket_timeout_seconds", true, "Sets the socket operation timeout in seconds.");

options.addOption(Option.builder("file_parts")
.argName("_file_parts")
Expand All @@ -595,6 +647,10 @@ private static Options getCopyOptions()
options.addRequiredOption("dest_cluster", "Destination Cluster Name", true, "Specifies the name of the cluster to write files back to.");
options.addOption("dest_url", "Destination Cluster URL", true, "Specifies the URL of the ESP to write to.");
options.addOption("num_threads", true, "Specifies the number of parallel to use to perform operations.");
options.addOption("filter", true, "Specifies a filter to apply to the files read from the cluster.");
options.addOption("ignore_tlk", false, "Ignore the TLK file when reading Index files.");
options.addOption("read_retries", true, "Sets the maximum number of retries to attempt when reading a file.");
options.addOption("socket_timeout_seconds", true, "Sets the socket operation timeout in seconds.");

options.addOption(Option.builder("copy")
.argName("files")
Expand All @@ -616,6 +672,7 @@ private static Options getWriteOptions()
options.addOption("dest_url", "Destination Cluster URL", true, "Specifies the URL of the ESP to write to.");
options.addRequiredOption("dest_cluster", "Destination Cluster Name", true, "Specifies the name of the cluster to write files back to.");
options.addOption("num_threads", true, "Specifies the number of parallel to use to perform operations.");
options.addOption("socket_timeout_seconds", true, "Sets the socket operation timeout in seconds.");

options.addOption(Option.builder("write")
.argName("files")
Expand Down Expand Up @@ -847,8 +904,11 @@ public void run()
readContext.parentSpan = context.getCurrentOperation().operationSpan;
readContext.originalRD = recordDef;
readContext.readSizeKB = readRequestSize;
readContext.socketOpTimeoutMS = context.socketOpTimeoutMS;

HpccRemoteFileReader<HPCCRecord> fileReader = new HpccRemoteFileReader<HPCCRecord>(readContext, filePart, new HPCCRecordBuilder(recordDef));
fileReader.getInputStream().setReadRequestDelay(readRequestDelay);
fileReader.setMaxReadRetries(context.readRetries);

while (fileReader.hasNext())
{
Expand Down Expand Up @@ -881,7 +941,10 @@ private static Runnable[] createReadToThorTasks(DataPartition[] fileParts, Split
HpccRemoteFileReader.FileReadContext readContext = new HpccRemoteFileReader.FileReadContext();
readContext.parentSpan = context.getCurrentOperation().operationSpan;
readContext.originalRD = recordDef;
readContext.socketOpTimeoutMS = context.socketOpTimeoutMS;

final HpccRemoteFileReader<HPCCRecord> filePartReader = new HpccRemoteFileReader<HPCCRecord>(readContext, fileParts[taskIndex], new HPCCRecordBuilder(recordDef));
filePartReader.setMaxReadRetries(context.readRetries);

final String filePath = outFilePaths[taskIndex];
final FileOutputStream outStream = new FileOutputStream(filePath);
Expand Down Expand Up @@ -1004,7 +1067,9 @@ private static Runnable[] createNonRedistributingCopyTasks(HPCCFile file, DFUCre
HpccRemoteFileReader.FileReadContext readContext = new HpccRemoteFileReader.FileReadContext();
readContext.parentSpan = context.getCurrentOperation().operationSpan;
readContext.originalRD = recordDef;
readContext.socketOpTimeoutMS = context.socketOpTimeoutMS;
filePartReaders[j] = new HpccRemoteFileReader<HPCCRecord>(readContext, inFilePart, new HPCCRecordBuilder(recordDef));
filePartReaders[j].setMaxReadRetries(context.readRetries);
}
incomingFilePartIndex += numIncomingParts;

Expand Down Expand Up @@ -1142,6 +1207,7 @@ private static Runnable[] createWriteTasks(String[] srcFiles, SplitTable[] split
writeContext.parentSpan = context.getCurrentOperation().operationSpan;
writeContext.recordDef = recordDef;
writeContext.fileCompression = CompressionAlgorithm.NONE;
writeContext.socketOpTimeoutMs = context.socketOpTimeoutMS;
HPCCRemoteFileWriter<HPCCRecord> filePartWriter = new HPCCRemoteFileWriter<HPCCRecord>(writeContext, outFilePart, recordAccessor);

tasks[taskIndex] = new Runnable()
Expand Down Expand Up @@ -1253,6 +1319,9 @@ private static void performRead(String[] args, TaskContext context)
formatStr = "THOR";
}

context.readRetries = getReadRetries(cmd);
context.socketOpTimeoutMS = getSocketOpTimeoutMS(cmd);

FileFormat format = FileFormat.THOR;
switch (formatStr.toUpperCase())
{
Expand All @@ -1267,6 +1336,9 @@ private static void performRead(String[] args, TaskContext context)
return;
}

String filter = cmd.getOptionValue("filter");
boolean ignoreTLK = cmd.hasOption("ignore_tlk");

String[] datasets = cmd.getOptionValues("read");
for (int i = 0; i < datasets.length; i++)
{
Expand All @@ -1286,6 +1358,22 @@ private static void performRead(String[] args, TaskContext context)
return;
}

file.setUseTLK(!ignoreTLK);

if (filter != null)
{
try
{
file.setFilter(filter);
}
catch (Exception e)
{
String error = "Error while attempting to set filter for: '" + datasetName + "': " + e.getMessage();
context.addError(error);
return;
}
}

DataPartition[] fileParts = null;
FieldDef recordDef = null;
try
Expand Down Expand Up @@ -1463,6 +1551,9 @@ private static void performReadTest(String[] args, TaskContext context)
+ readRequestDelayStr + ", must be an integer. Defaulting to: " + DEFAULT_READ_REQUEST_DELAY + "ms.");
}

context.readRetries = getReadRetries(cmd);
context.socketOpTimeoutMS = getSocketOpTimeoutMS(cmd);

String formatStr = cmd.getOptionValue("format");
if (formatStr == null)
{
Expand All @@ -1483,6 +1574,9 @@ private static void performReadTest(String[] args, TaskContext context)
return;
}

String filter = cmd.getOptionValue("filter");
boolean ignoreTLK = cmd.hasOption("ignore_tlk");

String datasetName = cmd.getOptionValue("read_test");
context.startOperation("FileUtility.ReadTest_" + datasetName);

Expand All @@ -1500,6 +1594,22 @@ private static void performReadTest(String[] args, TaskContext context)
return;
}

file.setUseTLK(!ignoreTLK);

if (filter != null)
{
try
{
file.setFilter(filter);
}
catch (Exception e)
{
String error = "Error while attempting to set filter for: '" + datasetName + "': " + e.getMessage();
context.addError(error);
return;
}
}

DataPartition[] fileParts = null;
FieldDef recordDef = null;
try
Expand Down Expand Up @@ -1666,6 +1776,12 @@ private static void performCopy(String[] args, TaskContext context)
return;
}

String filter = cmd.getOptionValue("filter");
boolean ignoreTLK = cmd.hasOption("ignore_tlk");

context.readRetries = getReadRetries(cmd);
context.socketOpTimeoutMS = getSocketOpTimeoutMS(cmd);

for (int i = 0; i < copyPairs.length; i+=2)
{
String srcFile = copyPairs[i];
Expand All @@ -1686,6 +1802,22 @@ private static void performCopy(String[] args, TaskContext context)
return;
}

file.setUseTLK(!ignoreTLK);

if (filter != null)
{
try
{
file.setFilter(filter);
}
catch (Exception e)
{
String error = "Error while attempting to set filter for: '" + srcFile + "': " + e.getMessage();
context.addError(error);
return;
}
}

DataPartition[] srcFileParts = null;
try
{
Expand Down

0 comments on commit 735de52

Please sign in to comment.