Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HPCC4J-635 DFSClient: FileUtility add additional testing / debug options #742

Merged
merged 1 commit into from
Aug 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ public class FileUtility
private static final int NUM_DEFAULT_THREADS = 4;
static private final int DEFAULT_ACCESS_EXPIRY_SECONDS = 120;

static private final int DEFAULT_READ_REQUEST_SIZE = 4096;
static private final int DEFAULT_READ_REQUEST_DELAY = 0;

private static boolean otelInitialized = false;

private static class TaskContext
Expand Down Expand Up @@ -548,6 +551,8 @@ private static Options getReadTestOptions()
options.addOption("pass", true, "Specifies the password used to connect. Defaults to null.");
options.addOption("num_threads", true, "Specifies the number of parallel to use to perform operations.");
options.addOption("access_expiry_seconds", true, "Access token expiration seconds.");
options.addOption("read_request_size", true, "The size of the read requests in KB sent to the rowservice.");
options.addOption("read_request_delay", true, "The delay in MS between read requests sent to the rowservice.");

options.addOption(Option.builder("file_parts")
.argName("_file_parts")
Expand Down Expand Up @@ -801,7 +806,7 @@ public void run()
}
}

private static Runnable[] createReadTestTasks(DataPartition[] fileParts, FieldDef recordDef, TaskContext context) throws Exception
private static Runnable[] createReadTestTasks(DataPartition[] fileParts, FieldDef recordDef, TaskContext context, int readRequestSize, int readRequestDelay) throws Exception
{
Runnable[] tasks = new Runnable[fileParts.length];
for (int i = 0; i < tasks.length; i++)
Expand All @@ -818,7 +823,9 @@ public void run()
HpccRemoteFileReader.FileReadContext readContext = new HpccRemoteFileReader.FileReadContext();
readContext.parentSpan = context.getCurrentOperation().operationSpan;
readContext.originalRD = recordDef;
readContext.readSizeKB = readRequestSize;
HpccRemoteFileReader<HPCCRecord> fileReader = new HpccRemoteFileReader<HPCCRecord>(readContext, filePart, new HPCCRecordBuilder(recordDef));
fileReader.getInputStream().setReadRequestDelay(readRequestDelay);

while (fileReader.hasNext())
{
Expand Down Expand Up @@ -1405,6 +1412,30 @@ private static void performReadTest(String[] args, TaskContext context)
+ numThreadsStr + ", must be an integer. Defaulting to: " + DEFAULT_ACCESS_EXPIRY_SECONDS + "s.");
}

int readRequestSize = DEFAULT_READ_REQUEST_SIZE;
String readRequestSizeStr = cmd.getOptionValue("read_request_size", "" + readRequestSize);
try
{
readRequestSize = Integer.parseInt(readRequestSizeStr);
}
catch(Exception e)
{
System.out.println("Invalid option value for read_request_size: "
+ readRequestSizeStr + ", must be an integer. Defaulting to: " + DEFAULT_READ_REQUEST_SIZE + "KB.");
}

int readRequestDelay = DEFAULT_READ_REQUEST_DELAY;
String readRequestDelayStr = cmd.getOptionValue("read_request_delay", "" + readRequestDelay);
try
{
readRequestDelay = Integer.parseInt(readRequestDelayStr);
}
catch(Exception e)
{
System.out.println("Invalid option value for read_request_delay: "
+ readRequestDelayStr + ", must be an integer. Defaulting to: " + DEFAULT_READ_REQUEST_DELAY + "ms.");
}

String formatStr = cmd.getOptionValue("format");
if (formatStr == null)
{
Expand Down Expand Up @@ -1477,6 +1508,7 @@ private static void performReadTest(String[] args, TaskContext context)
context.addWarn("InvalidParams: Skipping invalid file part index: " + filePartsStrs[i]);
}
}
fileParts = filePartList.toArray(new DataPartition[0]);
}

Runnable[] tasks = null;
Expand All @@ -1485,7 +1517,7 @@ private static void performReadTest(String[] args, TaskContext context)
switch (format)
{
case THOR:
tasks = createReadTestTasks(fileParts, recordDef, context);
tasks = createReadTestTasks(fileParts, recordDef, context, readRequestSize, readRequestDelay);
break;
case PARQUET:
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ public class RowServiceInputStream extends InputStream implements IProfilable
private long mutexWaitTimeNS = 0;
private long waitTimeNS = 0;
private long sleepTimeNS = 0;
private int readRequestDelayMS = 0;
private long fetchStartTimeNS = 0;
private long fetchTimeNS = 0;
private long fetchFinishTimeNS = 0;
Expand Down Expand Up @@ -668,6 +669,15 @@ public int getHandle()
return handle;
}

/**
* The delay in milliseconds between read requests. Primarily used for testing.
* @param sleepTimeMS
*/
public void setReadRequestDelay(int sleepTimeMS)
{
this.readRequestDelayMS = sleepTimeMS;
}

/**
* Simulate a handle failure and use the file token instead. The handle is set to an invalid value so the THOR node
* will indicate that the handle is unknown and request a otken.
Expand Down Expand Up @@ -1151,6 +1161,18 @@ private void finishFetch()

if (inFetchingMode == false)
{
if (readRequestDelayMS > 0)
{
try
{
Thread.sleep(readRequestDelayMS);
}
catch (InterruptedException e)
{
// We don't care about waking early
}
}

if (readSpan != null)
{
Attributes attributes = Attributes.of( AttributeKey.longKey("server.index"), Long.valueOf(getFilePartCopy()),
Expand All @@ -1159,7 +1181,6 @@ private void finishFetch()
readSpan.addEvent("RowServiceInputStream.readRequest", attributes);
}


// Create the read ahead request
if (this.simulateFail) this.handle = -1;
String readAheadRequest = (this.forceTokenUse) ? this.makeTokenRequest() : this.makeHandleRequest();
Expand Down
Loading