Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/candidate-9.4.x'
Browse files Browse the repository at this point in the history
Signed-off-by: Gordon Smith <[email protected]>
  • Loading branch information
GordonSmith committed Nov 9, 2023
2 parents 5e8d2d8 + feb19f5 commit a5793a2
Show file tree
Hide file tree
Showing 5 changed files with 238 additions and 9 deletions.
6 changes: 4 additions & 2 deletions .github/workflows/Jirabot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ jobs:
python -m site
python -m pip install --upgrade pip setuptools wheel
python -m pip install --upgrade jira
python -m pip --version
python -m pip freeze | grep jira
- name: "Run"
env:
JIRABOT_USERNAME : ${{ secrets.JIRABOT_USERNAME }}
Expand Down Expand Up @@ -60,9 +62,9 @@ jobs:
try:
jira.transition_issue(issue, transition)
result += 'Workflow Transition: ' + transition + '\n'
except:
except Exception as error:
transitions = jira.transitions(issue)
result += 'Error: Transition: "' + transition + '" failed. Valid transitions=[' + (', '.join(transitions)) + ']\n'
result += 'Error: Transition: "' + transition + '" failed with: "' + str(error) + '" Valid transitions=' + str(transitions) + '\n'
if issue.fields.customfield_10010 is None:
issue.update(fields={'customfield_10010': pull_url})
Expand Down
220 changes: 220 additions & 0 deletions dfsclient/src/main/java/org/hpccsystems/dfs/client/FileUtility.java
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,24 @@ private static Options getReadOptions()
return options;
}

private static Options getReadTestOptions()
{
Options options = new Options();
options.addRequiredOption("read_test", "Read test", true, "Specifies the file that should be read.");
options.addRequiredOption("url", "Source Cluster URL", true, "Specifies the URL of the ESP to connect to.");
options.addOption("user", true, "Specifies the username used to connect. Defaults to null.");
options.addOption("pass", true, "Specifies the password used to connect. Defaults to null.");
options.addOption("num_threads", true, "Specifies the number of parallel to use to perform operations.");

options.addOption(Option.builder("file_parts")
.argName("_file_parts")
.hasArgs()
.valueSeparator(',')
.desc("Specifies the file parts that should be read. Defaults to all file parts.")
.build());
return options;
}

private static Options getCopyOptions()
{
Options options = new Options();
Expand Down Expand Up @@ -463,6 +481,7 @@ private static Options getTopLevelOptions()
{
Options options = new Options();
options.addOption("read", "Reads the specified file(s) and writes a copy of the files to the local directory.");
options.addOption("read_test", "Reads the specified file and/or particular file parts without writing it locally.");
options.addOption("copy", "Copies the specified remote source file to the specified remote destination cluster / file.");
options.addOption("write", "Writes the specified local source file to the specified remote destination cluster / file.");

Expand Down Expand Up @@ -660,6 +679,44 @@ public void run()
}
}

private static Runnable[] createReadTestTasks(DataPartition[] fileParts, FieldDef recordDef, TaskContext context) throws Exception
{
Runnable[] tasks = new Runnable[fileParts.length];
for (int i = 0; i < tasks.length; i++)
{
final int taskIndex = i;
final DataPartition filePart = fileParts[taskIndex];
final HpccRemoteFileReader<HPCCRecord> filePartReader = new HpccRemoteFileReader<HPCCRecord>(filePart, recordDef, new HPCCRecordBuilder(recordDef));

tasks[taskIndex] = new Runnable()
{
HpccRemoteFileReader<HPCCRecord> fileReader = filePartReader;

public void run()
{
try
{
while (fileReader.hasNext())
{
HPCCRecord record = fileReader.next();
context.recordsRead.incrementAndGet();
}

fileReader.close();
context.bytesRead.addAndGet(fileReader.getStreamPosition());
}
catch (Exception e)
{
context.addError("Error while reading file part index: '" + filePart.getThisPart() + " Error message: " + e.getMessage());
return;
}
}
};
}

return tasks;
}

private static Runnable[] createReadToThorTasks(DataPartition[] fileParts, SplitTable[] splitTables, String[] outFilePaths, FieldDef recordDef, TaskContext context) throws Exception
{
Runnable[] tasks = new Runnable[fileParts.length];
Expand Down Expand Up @@ -1159,6 +1216,165 @@ private static void performRead(String[] args, TaskContext context)
}
}

private static void performReadTest(String[] args, TaskContext context)
{
Options options = getReadTestOptions();
CommandLineParser parser = new DefaultParser();

CommandLine cmd = null;
try
{
cmd = parser.parse(options, args);
}
catch (ParseException e)
{
System.out.println("Error parsing commandline options:\n" + e.getMessage());
return;
}

String connString = cmd.getOptionValue("url");
String user = cmd.getOptionValue("user");
String pass = cmd.getOptionValue("pass");

String outputPath = cmd.getOptionValue("out",".");

int numThreads = NUM_DEFAULT_THREADS;
String numThreadsStr = cmd.getOptionValue("num_threads", "" + numThreads);
try
{
numThreads = Integer.parseInt(numThreadsStr);
}
catch(Exception e)
{
System.out.println("Invalid option value for num_threads: "
+ numThreadsStr + ", must be an integer. Defaulting to: " + NUM_DEFAULT_THREADS + " threads.");
}

String formatStr = cmd.getOptionValue("format");
if (formatStr == null)
{
formatStr = "THOR";
}

FileFormat format = FileFormat.THOR;
switch (formatStr.toUpperCase())
{
case "THOR":
format = FileFormat.THOR;
break;
case "PARQUET":
format = FileFormat.PARQUET;
break;
default:
System.out.println("Error unsupported format specified: " + format);
return;
}

String datasetName = cmd.getOptionValue("read_test");
context.startOperation("Read Test " + datasetName);

HPCCFile file = null;
try
{
file = new HPCCFile(datasetName, connString, user, pass);
}
catch (Exception e)
{
System.out.println("Error while attempting to open file: '" + datasetName + "': " + e.getMessage());
return;
}

DataPartition[] fileParts = null;
FieldDef recordDef = null;
try
{
fileParts = file.getFileParts();
recordDef = file.getRecordDefinition();
}
catch (Exception e)
{
System.out.println("Error while retrieving file parts for: '" + datasetName + "': " + e.getMessage());
return;
}

String[] filePartsStrs = cmd.getOptionValues("file_parts");
if (filePartsStrs != null && filePartsStrs.length > 0)
{
ArrayList<DataPartition> filePartList = new ArrayList<DataPartition>();
for (int i = 0; i < filePartsStrs.length; i++)
{
try
{
int filePartIndex = Integer.parseInt(filePartsStrs[i]) - 1;
if (filePartIndex < 0 || filePartIndex >= fileParts.length)
{
System.out.println("Skipping invalid file part index: " + filePartsStrs[i]
+ " outside of range: [0," + fileParts.length + "]");
continue;
}

filePartList.add(fileParts[filePartIndex]);
}
catch (NumberFormatException e)
{
System.out.println("Skipping invalid file part index: " + filePartsStrs[i]);
}
}
}

Runnable[] tasks = null;
try
{
switch (format)
{
case THOR:
tasks = createReadTestTasks(fileParts, recordDef, context);
break;
case PARQUET:
default:
throw new Exception("Error unsupported format specified: " + format);
};
}
catch (Exception e)
{
context.addError("Error while attempting to create read tasks for file: '" + datasetName + "': " + e.getMessage());
return;
}

try
{
executeTasks(tasks, numThreads);
}
catch (Exception e)
{
context.addError("Error while attempting to execute read tasks for file: '" + datasetName + "': " + e.getMessage());
return;
}

if (context.hasError())
{
return;
}

try
{
String fileName = file.getFileName().replace(":","_");
String filePath = outputPath + File.separator + fileName + ".meta";
FileOutputStream metaFile = new FileOutputStream(filePath);

String metaStr = RecordDefinitionTranslator.toJsonRecord(file.getRecordDefinition()).toString();
metaFile.write(metaStr.getBytes());
metaFile.close();
}
catch (Exception e)
{
context.addError("Error while attempting to write meta-data for file: '" + datasetName + "': " + e.getMessage());
return;
}

context.endOperation();
}

private static void performCopy(String[] args, TaskContext context)
{
Options options = getCopyOptions();
Expand Down Expand Up @@ -1576,6 +1792,10 @@ public static JSONArray run(String[] args)
{
performRead(args, context);
}
else if (cmd.hasOption("read_test"))
{
performReadTest(args, context);
}
else if (cmd.hasOption("copy"))
{
performCopy(args, context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,7 @@ String createIndexOnDataset(String datasetName, FieldDef recordDef) throws Excep
return indexName;
}

@Test
public void testBatchRandomAccess() throws Exception
{
try
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,18 @@ public void thorFileTests()
Assert.assertTrue("FileUtility operation didn't complete successfully", success);
}

{
String readArgs[] = {"-read_test", "benchmark::integer::20kb", "-url", this.connString,
"-user", this.hpccUser, "-pass", this.hpccPass, "-file_parts", "1" };

JSONArray results = FileUtility.run(readArgs);
JSONObject result = results.optJSONObject(0);
Assert.assertNotNull("FileUtility result should not be null.", result);

boolean success = result.optBoolean("successful",false);
Assert.assertTrue("FileUtility operation didn't complete successfully", success);
}

{
String copyArgs[] = {"-copy", "benchmark::integer::20kb benchmark::integer::20kb-copy",
"-url", this.connString, "-dest_url", this.connString,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public void copyFile() throws Exception
{
Assume.assumeFalse("Test not valid on containerized HPCC environment", client.isTargetHPCCContainerized());
assumeTrue("Ignoring test 'copyFile' because HPCC-30117 is not fixed", HPCC_30117.equalsIgnoreCase("fixed"));

String lzfile=System.currentTimeMillis() + "_csvtest.csv";
String hpccfilename="temp::" + lzfile;
client.createHPCCFile(lzfile, targetLZ, true);
Expand Down Expand Up @@ -145,27 +146,20 @@ public void copyFile() throws Exception
@Test
public void AcreateHPCCFile() throws Exception, ArrayOfEspExceptionWrapper
{
Assume.assumeFalse("Test not valid on containerized HPCC environment", client.isTargetHPCCContainerized());
assumeTrue("Ignoring test 'copyFile' because HPCC-30117 is not fixed", HPCC_30117.equalsIgnoreCase("fixed"));
System.out.println("Creating file: '" + testfilename + "' on LandingZone: '" + targetLZ + "' on HPCC: '" + super.connString +"'");
Assert.assertTrue(client.createHPCCFile(testfilename, targetLZ, true));
}

@Test
public void BwriteHPCCFile() throws Exception, ArrayOfEspExceptionWrapper
{
assumeTrue("Ignoring test 'copyFile' because HPCC-30117 is not fixed", HPCC_30117.equalsIgnoreCase("fixed"));
System.out.println("Writing data to file: '" + testfilename + "' on LandingZone: '" + targetLZ + "' on HPCC: '" + super.connString +"'");
byte[] data = "HELLO MY DARLING, HELLO MY DEAR!1234567890ABCDEFGHIJKLMNOPQRSTUVXYZ".getBytes();
Assert.assertTrue(client.writeHPCCFileData(data, testfilename, targetLZ, true, 0, 20));
}

@Test
public void CreadHPCCFile() throws Exception, ArrayOfEspExceptionWrapper
{
Assume.assumeFalse("Test not valid on containerized HPCC environment", client.isTargetHPCCContainerized());
assumeTrue("Ignoring test 'copyFile' because HPCC-30117 is not fixed", HPCC_30117.equalsIgnoreCase("fixed"));

System.out.println("reading data from file: '" + testfilename + "' on LandingZone: '" + targetLZ + "' on HPCC: '" + super.connString +"'");
byte[] data = "HELLO MY DARLING, HELLO MY DEAR!1234567890ABCDEFGHIJKLMNOPQRSTUVXYZ".getBytes();
String response = client.readFileData(targetLZ, testfilename, data.length, 0);
Expand Down

0 comments on commit a5793a2

Please sign in to comment.