Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HPCC4J-552 File Utility Read Invididual File Parts #658

Merged
merged 1 commit into from
Nov 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
220 changes: 220 additions & 0 deletions dfsclient/src/main/java/org/hpccsystems/dfs/client/FileUtility.java
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,24 @@ private static Options getReadOptions()
return options;
}

private static Options getReadTestOptions()
{
Options options = new Options();
options.addRequiredOption("read_test", "Read test", true, "Specifies the file that should be read.");
options.addRequiredOption("url", "Source Cluster URL", true, "Specifies the URL of the ESP to connect to.");
options.addOption("user", true, "Specifies the username used to connect. Defaults to null.");
options.addOption("pass", true, "Specifies the password used to connect. Defaults to null.");
options.addOption("num_threads", true, "Specifies the number of parallel to use to perform operations.");

options.addOption(Option.builder("file_parts")
.argName("_file_parts")
.hasArgs()
.valueSeparator(',')
.desc("Specifies the file parts that should be read. Defaults to all file parts.")
.build());
return options;
}

private static Options getCopyOptions()
{
Options options = new Options();
Expand Down Expand Up @@ -463,6 +481,7 @@ private static Options getTopLevelOptions()
{
Options options = new Options();
options.addOption("read", "Reads the specified file(s) and writes a copy of the files to the local directory.");
options.addOption("read_test", "Reads the specified file and/or particular file parts without writing it locally.");
options.addOption("copy", "Copies the specified remote source file to the specified remote destination cluster / file.");
options.addOption("write", "Writes the specified local source file to the specified remote destination cluster / file.");

Expand Down Expand Up @@ -660,6 +679,44 @@ public void run()
}
}

private static Runnable[] createReadTestTasks(DataPartition[] fileParts, FieldDef recordDef, TaskContext context) throws Exception
{
Runnable[] tasks = new Runnable[fileParts.length];
for (int i = 0; i < tasks.length; i++)
{
final int taskIndex = i;
rpastrana marked this conversation as resolved.
Show resolved Hide resolved
final DataPartition filePart = fileParts[taskIndex];
final HpccRemoteFileReader<HPCCRecord> filePartReader = new HpccRemoteFileReader<HPCCRecord>(filePart, recordDef, new HPCCRecordBuilder(recordDef));

tasks[taskIndex] = new Runnable()
{
HpccRemoteFileReader<HPCCRecord> fileReader = filePartReader;

public void run()
{
try
{
while (fileReader.hasNext())
{
HPCCRecord record = fileReader.next();
context.recordsRead.incrementAndGet();
}

fileReader.close();
context.bytesRead.addAndGet(fileReader.getStreamPosition());
}
catch (Exception e)
{
context.addError("Error while reading file part index: '" + filePart.getThisPart() + " Error message: " + e.getMessage());
return;
}
}
};
}

return tasks;
}

private static Runnable[] createReadToThorTasks(DataPartition[] fileParts, SplitTable[] splitTables, String[] outFilePaths, FieldDef recordDef, TaskContext context) throws Exception
{
Runnable[] tasks = new Runnable[fileParts.length];
Expand Down Expand Up @@ -1159,6 +1216,165 @@ private static void performRead(String[] args, TaskContext context)
}
}

private static void performReadTest(String[] args, TaskContext context)
{
Options options = getReadTestOptions();
CommandLineParser parser = new DefaultParser();

CommandLine cmd = null;
try
{
cmd = parser.parse(options, args);
}
catch (ParseException e)
{
System.out.println("Error parsing commandline options:\n" + e.getMessage());
return;
}

String connString = cmd.getOptionValue("url");
String user = cmd.getOptionValue("user");
String pass = cmd.getOptionValue("pass");

String outputPath = cmd.getOptionValue("out",".");

int numThreads = NUM_DEFAULT_THREADS;
String numThreadsStr = cmd.getOptionValue("num_threads", "" + numThreads);
try
{
numThreads = Integer.parseInt(numThreadsStr);
}
catch(Exception e)
{
System.out.println("Invalid option value for num_threads: "
+ numThreadsStr + ", must be an integer. Defaulting to: " + NUM_DEFAULT_THREADS + " threads.");
}

String formatStr = cmd.getOptionValue("format");
if (formatStr == null)
{
formatStr = "THOR";
}

FileFormat format = FileFormat.THOR;
switch (formatStr.toUpperCase())
{
case "THOR":
format = FileFormat.THOR;
break;
case "PARQUET":
format = FileFormat.PARQUET;
break;
default:
System.out.println("Error unsupported format specified: " + format);
return;
}

String datasetName = cmd.getOptionValue("read_test");
context.startOperation("Read Test " + datasetName);

HPCCFile file = null;
try
{
file = new HPCCFile(datasetName, connString, user, pass);
}
catch (Exception e)
{
System.out.println("Error while attempting to open file: '" + datasetName + "': " + e.getMessage());
return;
}

DataPartition[] fileParts = null;
FieldDef recordDef = null;
try
{
fileParts = file.getFileParts();
recordDef = file.getRecordDefinition();
}
catch (Exception e)
{
System.out.println("Error while retrieving file parts for: '" + datasetName + "': " + e.getMessage());
return;
}

String[] filePartsStrs = cmd.getOptionValues("file_parts");
if (filePartsStrs != null && filePartsStrs.length > 0)
{
ArrayList<DataPartition> filePartList = new ArrayList<DataPartition>();
for (int i = 0; i < filePartsStrs.length; i++)
{
try
{
int filePartIndex = Integer.parseInt(filePartsStrs[i]) - 1;
if (filePartIndex < 0 || filePartIndex >= fileParts.length)
{
System.out.println("Skipping invalid file part index: " + filePartsStrs[i]
+ " outside of range: [0," + fileParts.length + "]");
continue;
}

filePartList.add(fileParts[filePartIndex]);
}
catch (NumberFormatException e)
{
System.out.println("Skipping invalid file part index: " + filePartsStrs[i]);
}
}
}

Runnable[] tasks = null;
try
{
switch (format)
{
case THOR:
tasks = createReadTestTasks(fileParts, recordDef, context);
break;
case PARQUET:
default:
throw new Exception("Error unsupported format specified: " + format);
};
}
catch (Exception e)
{
context.addError("Error while attempting to create read tasks for file: '" + datasetName + "': " + e.getMessage());
return;
}

try
{
executeTasks(tasks, numThreads);
}
catch (Exception e)
{
context.addError("Error while attempting to execute read tasks for file: '" + datasetName + "': " + e.getMessage());
return;
}

if (context.hasError())
{
return;
}

try
{
String fileName = file.getFileName().replace(":","_");
String filePath = outputPath + File.separator + fileName + ".meta";
FileOutputStream metaFile = new FileOutputStream(filePath);

String metaStr = RecordDefinitionTranslator.toJsonRecord(file.getRecordDefinition()).toString();
metaFile.write(metaStr.getBytes());
metaFile.close();
}
catch (Exception e)
{
context.addError("Error while attempting to write meta-data for file: '" + datasetName + "': " + e.getMessage());
return;
}

context.endOperation();
}

private static void performCopy(String[] args, TaskContext context)
{
Options options = getCopyOptions();
Expand Down Expand Up @@ -1576,6 +1792,10 @@ public static JSONArray run(String[] args)
{
performRead(args, context);
}
else if (cmd.hasOption("read_test"))
{
performReadTest(args, context);
}
else if (cmd.hasOption("copy"))
{
performCopy(args, context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,18 @@ public void thorFileTests()
Assert.assertTrue("FileUtility operation didn't complete successfully", success);
}

{
String readArgs[] = {"-read_test", "benchmark::integer::20kb", "-url", this.connString,
"-user", this.hpccUser, "-pass", this.hpccPass, "-file_parts", "1" };

JSONArray results = FileUtility.run(readArgs);
JSONObject result = results.optJSONObject(0);
Assert.assertNotNull("FileUtility result should not be null.", result);

boolean success = result.optBoolean("successful",false);
Assert.assertTrue("FileUtility operation didn't complete successfully", success);
}

{
String copyArgs[] = {"-copy", "benchmark::integer::20kb benchmark::integer::20kb-copy",
"-url", this.connString, "-dest_url", this.connString,
Expand Down
Loading