Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/candidate-9.8.x'
Browse files Browse the repository at this point in the history
Signed-off-by: Gordon Smith <[email protected]>

# Conflicts:
#	commons-hpcc/pom.xml
#	dfsclient/pom.xml
#	pom.xml
#	wsclient/pom.xml
  • Loading branch information
GordonSmith committed Sep 27, 2024
2 parents 0a0dd8f + 0151e21 commit b3d682d
Show file tree
Hide file tree
Showing 4 changed files with 225 additions and 37 deletions.
23 changes: 15 additions & 8 deletions .github/workflows/JirabotMerge.yml
Original file line number Diff line number Diff line change
Expand Up @@ -106,14 +106,14 @@ jobs:
releaseTagPattern = releaseTagPrefix
if major is not None:
releaseTagPattern += str(major) + '\.'
releaseTagPattern += str(major) + '\\.'
else:
releaseTagPattern += '[0-9]+\.'
releaseTagPattern += '[0-9]+\\.'
if minor is not None:
releaseTagPattern += str(minor) + '\.'
releaseTagPattern += str(minor) + '\\.'
else:
releaseTagPattern += '[0-9]+\.'
releaseTagPattern += '[0-9]+\\.'
if point is not None:
releaseTagPattern += str(point) + '(-[0-9]+)?'
Expand Down Expand Up @@ -254,14 +254,21 @@ jobs:
print('Error: PROJECT_CONFIG is missing required fields: tagPrefix and/or tagPostfix')
sys.exit(1)
project_name = projectConfig.get('projectName')
if project_name is None:
print('Error: PROJECT_CONFIG is missing required field: projectName')
project_prefixes = projectConfig.get('projectPrefixes')
if not project_prefixes:
print('Error: PROJECT_CONFIG is missing required field: projectPrefixes. Add a "projectPrefixes" JSON array of project prefix strings to the PROJECT_CONFIG.')
sys.exit(1)
if not isinstance(project_prefixes, list):
print('Error: PROJECT_CONFIG field projectPrefixes is not a valid JSON array, aborting.')
sys.exit(1)
project_list_regex = '|'.join(project_prefixes)
result = ''
issuem = re.search("(" + project_name + ")-[0-9]+", title, re.IGNORECASE)
issuem = re.search("(" + project_list_regex + ")-[0-9]+", title, re.IGNORECASE)
if issuem:
project_name = issuem.group(1)
issue_name = issuem.group()
jira = Jira(url=jira_url, username=jirabot_user, password=jirabot_pass, cloud=True)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ public JSONObject end(boolean success)

public int readRetries = HpccRemoteFileReader.DEFAULT_READ_RETRIES;
public int socketOpTimeoutMS = RowServiceInputStream.DEFAULT_SOCKET_OP_TIMEOUT_MS;
public int initialReadSizeKB = RowServiceInputStream.DEFAULT_INITIAL_REQUEST_READ_SIZE_KB;

public void setCurrentOperationSpanAttributes(Attributes attributes)
{
Expand Down Expand Up @@ -352,6 +353,26 @@ private static String[] getCredentials(CommandLine cmd)
return new String[] {user, pass};
}

private static void applyGlobalConfig(CommandLine cmd)
{
int concurrentStartups = -1;
String concurrentStartupsStr = cmd.getOptionValue("connection_startup_limit", "" + -1);
try
{
concurrentStartups = Integer.parseInt(concurrentStartupsStr);
}
catch(Exception e)
{
System.out.println("Invalid option value for connection_startup_limit: "
+ concurrentStartupsStr + ", must be an integer.");
}

if (concurrentStartups > 0)
{
RowServiceInputStream.setMaxConcurrentConnectionStartups(concurrentStartups);
}
}

private static enum FileFormat
{
THOR,
Expand Down Expand Up @@ -589,6 +610,23 @@ private static int getSocketOpTimeoutMS(CommandLine cmd)
return socketOpTimeoutS * 1000;
}

private static int getInitialReadSizeKB(CommandLine cmd)
{
int initialReadSizeKB = RowServiceInputStream.DEFAULT_INITIAL_REQUEST_READ_SIZE_KB;
String initialReadSizeStr = cmd.getOptionValue("initial_read_size", "" + initialReadSizeKB);
try
{
initialReadSizeKB = Integer.parseInt(initialReadSizeStr);
}
catch(Exception e)
{
System.out.println("Invalid option value for initial_read_size: "
+ initialReadSizeStr + ", must be an integer. Defaulting to: " + RowServiceInputStream.DEFAULT_INITIAL_REQUEST_READ_SIZE_KB + "KB.");
}

return initialReadSizeKB;
}

private static Options getReadOptions()
{
Options options = new Options();
Expand All @@ -602,6 +640,8 @@ private static Options getReadOptions()
options.addOption("ignore_tlk", false, "Ignore the TLK file when reading Index files.");
options.addOption("read_retries", true, "Sets the maximum number of retries to attempt when reading a file.");
options.addOption("socket_timeout_seconds", true, "Sets the socket operation timeout in seconds.");
options.addOption("connection_startup_limit", true, "Specifies the maximum number of connections to startup concurrently."
+ " useful in cases where starting up connections too quickly can overwhelm intermediate processes.");

options.addOption(Option.builder("read")
.argName("files")
Expand All @@ -622,12 +662,16 @@ private static Options getReadTestOptions()
options.addOption("pass", true, "Specifies the password used to connect. Defaults to null.");
options.addOption("num_threads", true, "Specifies the number of parallel to use to perform operations.");
options.addOption("access_expiry_seconds", true, "Access token expiration seconds.");
options.addOption("initial_read_size", true, "The size of the initial read request in KB sent to the rowservice,"
+ " useful in cases where starting up connections too quickly can overwhelm intermediate processes.");
options.addOption("read_request_size", true, "The size of the read requests in KB sent to the rowservice.");
options.addOption("read_request_delay", true, "The delay in MS between read requests sent to the rowservice.");
options.addOption("filter", true, "Specifies a filter to apply to the files read from the cluster.");
options.addOption("ignore_tlk", false, "Ignore the TLK file when reading Index files.");
options.addOption("read_retries", true, "Sets the maximum number of retries to attempt when reading a file.");
options.addOption("socket_timeout_seconds", true, "Sets the socket operation timeout in seconds.");
options.addOption("connection_startup_limit", true, "Specifies the maximum number of connections to startup concurrently."
+ " useful in cases where starting up connections too quickly can overwhelm intermediate processes.");

options.addOption(Option.builder("file_parts")
.argName("_file_parts")
Expand All @@ -651,6 +695,8 @@ private static Options getCopyOptions()
options.addOption("ignore_tlk", false, "Ignore the TLK file when reading Index files.");
options.addOption("read_retries", true, "Sets the maximum number of retries to attempt when reading a file.");
options.addOption("socket_timeout_seconds", true, "Sets the socket operation timeout in seconds.");
options.addOption("connection_startup_limit", true, "Specifies the maximum number of connections to startup concurrently."
+ " useful in cases where starting up connections too quickly can overwhelm intermediate processes.");

options.addOption(Option.builder("copy")
.argName("files")
Expand All @@ -673,6 +719,8 @@ private static Options getWriteOptions()
options.addRequiredOption("dest_cluster", "Destination Cluster Name", true, "Specifies the name of the cluster to write files back to.");
options.addOption("num_threads", true, "Specifies the number of parallel to use to perform operations.");
options.addOption("socket_timeout_seconds", true, "Sets the socket operation timeout in seconds.");
options.addOption("connection_startup_limit", true, "Specifies the maximum number of connections to startup concurrently."
+ " useful in cases where starting up connections too quickly can overwhelm intermediate processes.");

options.addOption(Option.builder("write")
.argName("files")
Expand Down Expand Up @@ -903,6 +951,7 @@ public void run()
HpccRemoteFileReader.FileReadContext readContext = new HpccRemoteFileReader.FileReadContext();
readContext.parentSpan = context.getCurrentOperation().operationSpan;
readContext.originalRD = recordDef;
readContext.initialReadSizeKB = context.initialReadSizeKB;
readContext.readSizeKB = readRequestSize;
readContext.socketOpTimeoutMS = context.socketOpTimeoutMS;

Expand Down Expand Up @@ -944,6 +993,7 @@ private static Runnable[] createReadToThorTasks(DataPartition[] fileParts, Split
readContext.parentSpan = context.getCurrentOperation().operationSpan;
readContext.originalRD = recordDef;
readContext.socketOpTimeoutMS = context.socketOpTimeoutMS;
readContext.initialReadSizeKB = context.initialReadSizeKB;

final HpccRemoteFileReader<HPCCRecord> filePartReader = new HpccRemoteFileReader<HPCCRecord>(readContext, fileParts[taskIndex], new HPCCRecordBuilder(recordDef));
filePartReader.setMaxReadRetries(context.readRetries);
Expand Down Expand Up @@ -1072,6 +1122,7 @@ private static Runnable[] createNonRedistributingCopyTasks(HPCCFile file, DFUCre
readContext.parentSpan = context.getCurrentOperation().operationSpan;
readContext.originalRD = recordDef;
readContext.socketOpTimeoutMS = context.socketOpTimeoutMS;
readContext.initialReadSizeKB = context.initialReadSizeKB;
filePartReaders[j] = new HpccRemoteFileReader<HPCCRecord>(readContext, inFilePart, new HPCCRecordBuilder(recordDef));
filePartReaders[j].setMaxReadRetries(context.readRetries);
}
Expand Down Expand Up @@ -1312,6 +1363,8 @@ private static void performRead(String[] args, TaskContext context)
String user = creds[0];
String pass = creds[1];

applyGlobalConfig(cmd);

String outputPath = cmd.getOptionValue("out",".");

int numThreads = NUM_DEFAULT_THREADS;
Expand All @@ -1334,6 +1387,7 @@ private static void performRead(String[] args, TaskContext context)

context.readRetries = getReadRetries(cmd);
context.socketOpTimeoutMS = getSocketOpTimeoutMS(cmd);
context.initialReadSizeKB = getInitialReadSizeKB(cmd);

FileFormat format = FileFormat.THOR;
switch (formatStr.toUpperCase())
Expand Down Expand Up @@ -1514,6 +1568,8 @@ private static void performReadTest(String[] args, TaskContext context)
String user = creds[0];
String pass = creds[1];

applyGlobalConfig(cmd);

String outputPath = cmd.getOptionValue("out",".");

int numThreads = NUM_DEFAULT_THREADS;
Expand Down Expand Up @@ -1566,6 +1622,7 @@ private static void performReadTest(String[] args, TaskContext context)

context.readRetries = getReadRetries(cmd);
context.socketOpTimeoutMS = getSocketOpTimeoutMS(cmd);
context.initialReadSizeKB = getInitialReadSizeKB(cmd);

String formatStr = cmd.getOptionValue("format");
if (formatStr == null)
Expand Down Expand Up @@ -1746,6 +1803,8 @@ private static void performCopy(String[] args, TaskContext context)
String user = creds[0];
String pass = creds[1];

applyGlobalConfig(cmd);

String destClusterName = cmd.getOptionValue("dest_cluster");

String srcURL = cmd.getOptionValue("url");
Expand Down Expand Up @@ -1794,6 +1853,7 @@ private static void performCopy(String[] args, TaskContext context)

context.readRetries = getReadRetries(cmd);
context.socketOpTimeoutMS = getSocketOpTimeoutMS(cmd);
context.initialReadSizeKB = getInitialReadSizeKB(cmd);

for (int i = 0; i < copyPairs.length; i+=2)
{
Expand Down Expand Up @@ -1950,6 +2010,8 @@ private static void performWrite(String[] args, TaskContext context)
String user = creds[0];
String pass = creds[1];

applyGlobalConfig(cmd);

String destClusterName = cmd.getOptionValue("dest_cluster");

String srcURL = cmd.getOptionValue("url");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public static class FileReadContext
public int socketOpTimeoutMS = -1;
public int recordReadLimit = -1;
public boolean createPrefetchThread = true;
public int initialReadSizeKB = -1;
public int readSizeKB = -1;
public int readRequestSpanBatchSize = -1; // The number of read requests before creating a new span
public Span parentSpan = null;
Expand All @@ -89,6 +90,21 @@ private static FileReadContext constructReadContext(FieldDef originalRD, int con
return context;
}

private static RowServiceInputStream.StreamContext constructStreamContext(FileReadContext readContext)
{
RowServiceInputStream.StreamContext context = new RowServiceInputStream.StreamContext();
context.recordDefinition = readContext.originalRD;
context.recordReadLimit = readContext.recordReadLimit;
context.createPrefetchThread = readContext.createPrefetchThread;
context.maxReadSizeKB = readContext.readSizeKB;
context.initialReadSizeKB = readContext.initialReadSizeKB;
context.connectTimeoutMS = readContext.connectTimeout;
context.socketOpTimeoutMS = readContext.socketOpTimeoutMS;
context.createPrefetchThread = readContext.createPrefetchThread;

return context;
}

/**
* Instantiates a new hpcc remote file reader.
*
Expand Down Expand Up @@ -284,12 +300,15 @@ public HpccRemoteFileReader(FileReadContext ctx, DataPartition dp, IRecordBuilde
throw e;
}

RowServiceInputStream.StreamContext streamContext = constructStreamContext(context);
streamContext.projectedRecordDefinition = projectedRecordDefinition;
streamContext.fileReadSpan = this.readSpan;

if (resumeInfo == null)
{
this.inputStream = new RowServiceInputStream(this.dataPartition, context.originalRD, projectedRecordDefinition, context.connectTimeout,
context.recordReadLimit, context.createPrefetchThread, context.readSizeKB, null,
false, context.socketOpTimeoutMS, this.readSpan);
this.inputStream = new RowServiceInputStream(streamContext, this.dataPartition, null);
this.inputStream.setReadRequestSpanBatchSize(context.readRequestSpanBatchSize);

this.binaryRecordReader = new BinaryRecordReader(this.inputStream);
this.binaryRecordReader.initialize(this.recordBuilder);

Expand All @@ -304,9 +323,7 @@ public HpccRemoteFileReader(FileReadContext ctx, DataPartition dp, IRecordBuilde
restartInfo.streamPos = resumeInfo.inputStreamPos;
restartInfo.tokenBin = resumeInfo.tokenBin;

this.inputStream = new RowServiceInputStream(this.dataPartition, context.originalRD, projectedRecordDefinition, context.connectTimeout,
context.recordReadLimit, context.createPrefetchThread, context.readSizeKB, restartInfo,
false, context.socketOpTimeoutMS, this.readSpan);
this.inputStream = new RowServiceInputStream(streamContext, this.dataPartition, restartInfo);
this.inputStream.setReadRequestSpanBatchSize(context.readRequestSpanBatchSize);

long bytesToSkip = resumeInfo.recordReaderStreamPos - resumeInfo.inputStreamPos;
Expand Down Expand Up @@ -383,9 +400,11 @@ private boolean retryRead()
{
this.readSpan = createReadSpan(context, dataPartition);

this.inputStream = new RowServiceInputStream(this.dataPartition, context.originalRD,this.recordBuilder.getRecordDefinition(),
context.connectTimeout, context.recordReadLimit, context.createPrefetchThread,
context.readSizeKB, restartInfo, false, context.socketOpTimeoutMS, this.readSpan);
RowServiceInputStream.StreamContext streamContext = constructStreamContext(context);
streamContext.projectedRecordDefinition = this.recordBuilder.getRecordDefinition();
streamContext.fileReadSpan = this.readSpan;

this.inputStream = new RowServiceInputStream(streamContext, this.dataPartition, restartInfo);
this.inputStream.setReadRequestSpanBatchSize(context.readRequestSpanBatchSize);
long bytesToSkip = resumeInfo.recordReaderStreamPos - resumeInfo.inputStreamPos;
if (bytesToSkip < 0)
Expand Down
Loading

0 comments on commit b3d682d

Please sign in to comment.