From d4d621c4075d2e289bf645e613fab6cbccb2df3f Mon Sep 17 00:00:00 2001 From: Gavin Halliday Date: Mon, 23 Sep 2024 16:26:13 +0100 Subject: [PATCH 1/6] Split off 9.6.50 Signed-off-by: Gavin Halliday --- commons-hpcc/pom.xml | 2 +- dfsclient/pom.xml | 2 +- pom.xml | 2 +- wsclient/pom.xml | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/commons-hpcc/pom.xml b/commons-hpcc/pom.xml index 92323ddee..498a655ec 100644 --- a/commons-hpcc/pom.xml +++ b/commons-hpcc/pom.xml @@ -9,7 +9,7 @@ org.hpccsystems hpcc4j - 9.6.49-0-SNAPSHOT + 9.6.51-0-SNAPSHOT diff --git a/dfsclient/pom.xml b/dfsclient/pom.xml index 6621be0db..8b5acea9c 100644 --- a/dfsclient/pom.xml +++ b/dfsclient/pom.xml @@ -9,7 +9,7 @@ org.hpccsystems hpcc4j - 9.6.49-0-SNAPSHOT + 9.6.51-0-SNAPSHOT diff --git a/pom.xml b/pom.xml index bdf211b33..af2b1b42c 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ 4.0.0 org.hpccsystems hpcc4j - 9.6.49-0-SNAPSHOT + 9.6.51-0-SNAPSHOT pom HPCC Systems Java Projects https://hpccsystems.com diff --git a/wsclient/pom.xml b/wsclient/pom.xml index 55b91f4bd..0dc3821e8 100644 --- a/wsclient/pom.xml +++ b/wsclient/pom.xml @@ -9,7 +9,7 @@ org.hpccsystems hpcc4j - 9.6.49-0-SNAPSHOT + 9.6.51-0-SNAPSHOT From d69f7e3cf04875613fac463987873599b656f1ef Mon Sep 17 00:00:00 2001 From: Gavin Halliday Date: Mon, 23 Sep 2024 16:28:20 +0100 Subject: [PATCH 2/6] Split off 9.8.24 Signed-off-by: Gavin Halliday --- commons-hpcc/pom.xml | 2 +- dfsclient/pom.xml | 2 +- pom.xml | 2 +- wsclient/pom.xml | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/commons-hpcc/pom.xml b/commons-hpcc/pom.xml index b02edeb06..46d084e28 100644 --- a/commons-hpcc/pom.xml +++ b/commons-hpcc/pom.xml @@ -9,7 +9,7 @@ org.hpccsystems hpcc4j - 9.8.23-0-SNAPSHOT + 9.8.25-0-SNAPSHOT diff --git a/dfsclient/pom.xml b/dfsclient/pom.xml index f5b6e5370..ceb265e4b 100644 --- a/dfsclient/pom.xml +++ b/dfsclient/pom.xml @@ -9,7 +9,7 @@ org.hpccsystems hpcc4j - 9.8.23-0-SNAPSHOT + 9.8.25-0-SNAPSHOT diff --git a/pom.xml b/pom.xml index 2f089ee9e..6ee8a5337 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ 4.0.0 org.hpccsystems hpcc4j - 9.8.23-0-SNAPSHOT + 9.8.25-0-SNAPSHOT pom HPCC Systems Java Projects https://hpccsystems.com diff --git a/wsclient/pom.xml b/wsclient/pom.xml index 92ce55470..f1b8ca32e 100644 --- a/wsclient/pom.xml +++ b/wsclient/pom.xml @@ -9,7 +9,7 @@ org.hpccsystems hpcc4j - 9.8.23-0-SNAPSHOT + 9.8.25-0-SNAPSHOT From 418be89f7dc64d53e279e4daa76b430e90fe9027 Mon Sep 17 00:00:00 2001 From: Gavin Halliday Date: Mon, 23 Sep 2024 16:29:46 +0100 Subject: [PATCH 3/6] Split off 9.4.98 Signed-off-by: Gavin Halliday --- commons-hpcc/pom.xml | 2 +- dfsclient/pom.xml | 2 +- pom.xml | 2 +- wsclient/pom.xml | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/commons-hpcc/pom.xml b/commons-hpcc/pom.xml index 04b062447..054d6f3b5 100644 --- a/commons-hpcc/pom.xml +++ b/commons-hpcc/pom.xml @@ -9,7 +9,7 @@ org.hpccsystems hpcc4j - 9.4.97-0-SNAPSHOT + 9.4.99-0-SNAPSHOT diff --git a/dfsclient/pom.xml b/dfsclient/pom.xml index ca7b0009e..8c507f3e3 100644 --- a/dfsclient/pom.xml +++ b/dfsclient/pom.xml @@ -9,7 +9,7 @@ org.hpccsystems hpcc4j - 9.4.97-0-SNAPSHOT + 9.4.99-0-SNAPSHOT diff --git a/pom.xml b/pom.xml index 1ca671799..5bcf41a97 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ 4.0.0 org.hpccsystems hpcc4j - 9.4.97-0-SNAPSHOT + 9.4.99-0-SNAPSHOT pom HPCC Systems Java Projects https://hpccsystems.com diff --git a/wsclient/pom.xml b/wsclient/pom.xml index 37fa5c79a..56ea31386 100644 --- a/wsclient/pom.xml +++ b/wsclient/pom.xml @@ -9,7 +9,7 @@ org.hpccsystems hpcc4j - 9.4.97-0-SNAPSHOT + 9.4.99-0-SNAPSHOT From 9d1e729f59f9587ff6c3e6b1ca5f9a0f26b16f44 Mon Sep 17 00:00:00 2001 From: Gavin Halliday Date: Mon, 23 Sep 2024 16:31:23 +0100 Subject: [PATCH 4/6] Split off 9.2.124 Signed-off-by: Gavin Halliday --- commons-hpcc/pom.xml | 2 +- dfsclient/pom.xml | 2 +- pom.xml | 2 +- wsclient/pom.xml | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/commons-hpcc/pom.xml b/commons-hpcc/pom.xml index ea8d08d2a..cbfd74ed0 100644 --- a/commons-hpcc/pom.xml +++ b/commons-hpcc/pom.xml @@ -9,7 +9,7 @@ org.hpccsystems hpcc4j - 9.2.123-0-SNAPSHOT + 9.2.125-0-SNAPSHOT diff --git a/dfsclient/pom.xml b/dfsclient/pom.xml index 07248ca9a..2b3f8d375 100644 --- a/dfsclient/pom.xml +++ b/dfsclient/pom.xml @@ -9,7 +9,7 @@ org.hpccsystems hpcc4j - 9.2.123-0-SNAPSHOT + 9.2.125-0-SNAPSHOT diff --git a/pom.xml b/pom.xml index 7100d21e4..f5ba24b2c 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ 4.0.0 org.hpccsystems hpcc4j - 9.2.123-0-SNAPSHOT + 9.2.125-0-SNAPSHOT pom HPCC Systems Java Projects https://hpccsystems.com diff --git a/wsclient/pom.xml b/wsclient/pom.xml index cd6a77539..9ac4595c0 100644 --- a/wsclient/pom.xml +++ b/wsclient/pom.xml @@ -9,7 +9,7 @@ org.hpccsystems hpcc4j - 9.2.123-0-SNAPSHOT + 9.2.125-0-SNAPSHOT From aca68bd08551dbe37a0481430f0492efd089477f Mon Sep 17 00:00:00 2001 From: James McMullan Date: Wed, 25 Sep 2024 09:47:16 -0400 Subject: [PATCH 5/6] HPCC4J-651 JirabotMerge: Allow multiple projects to be configured (#762) Signed-off-by: James McMullan James.McMullan@lexisnexis.com --- .github/workflows/JirabotMerge.yml | 23 +++++++++++++++-------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/.github/workflows/JirabotMerge.yml b/.github/workflows/JirabotMerge.yml index 1aab53288..201e0f621 100644 --- a/.github/workflows/JirabotMerge.yml +++ b/.github/workflows/JirabotMerge.yml @@ -106,14 +106,14 @@ jobs: releaseTagPattern = releaseTagPrefix if major is not None: - releaseTagPattern += str(major) + '\.' + releaseTagPattern += str(major) + '\\.' else: - releaseTagPattern += '[0-9]+\.' + releaseTagPattern += '[0-9]+\\.' if minor is not None: - releaseTagPattern += str(minor) + '\.' + releaseTagPattern += str(minor) + '\\.' else: - releaseTagPattern += '[0-9]+\.' + releaseTagPattern += '[0-9]+\\.' if point is not None: releaseTagPattern += str(point) + '(-[0-9]+)?' @@ -254,14 +254,21 @@ jobs: print('Error: PROJECT_CONFIG is missing required fields: tagPrefix and/or tagPostfix') sys.exit(1) - project_name = projectConfig.get('projectName') - if project_name is None: - print('Error: PROJECT_CONFIG is missing required field: projectName') + project_prefixes = projectConfig.get('projectPrefixes') + if not project_prefixes: + print('Error: PROJECT_CONFIG is missing required field: projectPrefixes. Add a "projectPrefixes" JSON array of project prefix strings to the PROJECT_CONFIG.') sys.exit(1) + if not isinstance(project_prefixes, list): + print('Error: PROJECT_CONFIG field projectPrefixes is not a valid JSON array, aborting.') + sys.exit(1) + + project_list_regex = '|'.join(project_prefixes) + result = '' - issuem = re.search("(" + project_name + ")-[0-9]+", title, re.IGNORECASE) + issuem = re.search("(" + project_list_regex + ")-[0-9]+", title, re.IGNORECASE) if issuem: + project_name = issuem.group(1) issue_name = issuem.group() jira = Jira(url=jira_url, username=jirabot_user, password=jirabot_pass, cloud=True) From 91efcca16c1cf55e9cddea6241c3dce7589c8cba Mon Sep 17 00:00:00 2001 From: James McMullan Date: Wed, 25 Sep 2024 11:21:10 -0400 Subject: [PATCH 6/6] HPCC4J-639 Add concurrent connection startup limit (#760) Signed-off-by: James McMullan James.McMullan@lexisnexis.com --- .../hpccsystems/dfs/client/FileUtility.java | 62 ++++++++ .../dfs/client/HpccRemoteFileReader.java | 37 +++-- .../dfs/client/RowServiceInputStream.java | 140 +++++++++++++++--- 3 files changed, 210 insertions(+), 29 deletions(-) diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/FileUtility.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/FileUtility.java index e50c59636..1939f8cfb 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/FileUtility.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/FileUtility.java @@ -159,6 +159,7 @@ public JSONObject end(boolean success) public int readRetries = HpccRemoteFileReader.DEFAULT_READ_RETRIES; public int socketOpTimeoutMS = RowServiceInputStream.DEFAULT_SOCKET_OP_TIMEOUT_MS; + public int initialReadSizeKB = RowServiceInputStream.DEFAULT_INITIAL_REQUEST_READ_SIZE_KB; public void setCurrentOperationSpanAttributes(Attributes attributes) { @@ -352,6 +353,26 @@ private static String[] getCredentials(CommandLine cmd) return new String[] {user, pass}; } + private static void applyGlobalConfig(CommandLine cmd) + { + int concurrentStartups = -1; + String concurrentStartupsStr = cmd.getOptionValue("connection_startup_limit", "" + -1); + try + { + concurrentStartups = Integer.parseInt(concurrentStartupsStr); + } + catch(Exception e) + { + System.out.println("Invalid option value for connection_startup_limit: " + + concurrentStartupsStr + ", must be an integer."); + } + + if (concurrentStartups > 0) + { + RowServiceInputStream.setMaxConcurrentConnectionStartups(concurrentStartups); + } + } + private static enum FileFormat { THOR, @@ -589,6 +610,23 @@ private static int getSocketOpTimeoutMS(CommandLine cmd) return socketOpTimeoutS * 1000; } + private static int getInitialReadSizeKB(CommandLine cmd) + { + int initialReadSizeKB = RowServiceInputStream.DEFAULT_INITIAL_REQUEST_READ_SIZE_KB; + String initialReadSizeStr = cmd.getOptionValue("initial_read_size", "" + initialReadSizeKB); + try + { + initialReadSizeKB = Integer.parseInt(initialReadSizeStr); + } + catch(Exception e) + { + System.out.println("Invalid option value for initial_read_size: " + + initialReadSizeStr + ", must be an integer. Defaulting to: " + RowServiceInputStream.DEFAULT_INITIAL_REQUEST_READ_SIZE_KB + "KB."); + } + + return initialReadSizeKB; + } + private static Options getReadOptions() { Options options = new Options(); @@ -602,6 +640,8 @@ private static Options getReadOptions() options.addOption("ignore_tlk", false, "Ignore the TLK file when reading Index files."); options.addOption("read_retries", true, "Sets the maximum number of retries to attempt when reading a file."); options.addOption("socket_timeout_seconds", true, "Sets the socket operation timeout in seconds."); + options.addOption("connection_startup_limit", true, "Specifies the maximum number of connections to startup concurrently." + + " useful in cases where starting up connections too quickly can overwhelm intermediate processes."); options.addOption(Option.builder("read") .argName("files") @@ -622,12 +662,16 @@ private static Options getReadTestOptions() options.addOption("pass", true, "Specifies the password used to connect. Defaults to null."); options.addOption("num_threads", true, "Specifies the number of parallel to use to perform operations."); options.addOption("access_expiry_seconds", true, "Access token expiration seconds."); + options.addOption("initial_read_size", true, "The size of the initial read request in KB sent to the rowservice," + + " useful in cases where starting up connections too quickly can overwhelm intermediate processes."); options.addOption("read_request_size", true, "The size of the read requests in KB sent to the rowservice."); options.addOption("read_request_delay", true, "The delay in MS between read requests sent to the rowservice."); options.addOption("filter", true, "Specifies a filter to apply to the files read from the cluster."); options.addOption("ignore_tlk", false, "Ignore the TLK file when reading Index files."); options.addOption("read_retries", true, "Sets the maximum number of retries to attempt when reading a file."); options.addOption("socket_timeout_seconds", true, "Sets the socket operation timeout in seconds."); + options.addOption("connection_startup_limit", true, "Specifies the maximum number of connections to startup concurrently." + + " useful in cases where starting up connections too quickly can overwhelm intermediate processes."); options.addOption(Option.builder("file_parts") .argName("_file_parts") @@ -651,6 +695,8 @@ private static Options getCopyOptions() options.addOption("ignore_tlk", false, "Ignore the TLK file when reading Index files."); options.addOption("read_retries", true, "Sets the maximum number of retries to attempt when reading a file."); options.addOption("socket_timeout_seconds", true, "Sets the socket operation timeout in seconds."); + options.addOption("connection_startup_limit", true, "Specifies the maximum number of connections to startup concurrently." + + " useful in cases where starting up connections too quickly can overwhelm intermediate processes."); options.addOption(Option.builder("copy") .argName("files") @@ -673,6 +719,8 @@ private static Options getWriteOptions() options.addRequiredOption("dest_cluster", "Destination Cluster Name", true, "Specifies the name of the cluster to write files back to."); options.addOption("num_threads", true, "Specifies the number of parallel to use to perform operations."); options.addOption("socket_timeout_seconds", true, "Sets the socket operation timeout in seconds."); + options.addOption("connection_startup_limit", true, "Specifies the maximum number of connections to startup concurrently." + + " useful in cases where starting up connections too quickly can overwhelm intermediate processes."); options.addOption(Option.builder("write") .argName("files") @@ -903,6 +951,7 @@ public void run() HpccRemoteFileReader.FileReadContext readContext = new HpccRemoteFileReader.FileReadContext(); readContext.parentSpan = context.getCurrentOperation().operationSpan; readContext.originalRD = recordDef; + readContext.initialReadSizeKB = context.initialReadSizeKB; readContext.readSizeKB = readRequestSize; readContext.socketOpTimeoutMS = context.socketOpTimeoutMS; @@ -944,6 +993,7 @@ private static Runnable[] createReadToThorTasks(DataPartition[] fileParts, Split readContext.parentSpan = context.getCurrentOperation().operationSpan; readContext.originalRD = recordDef; readContext.socketOpTimeoutMS = context.socketOpTimeoutMS; + readContext.initialReadSizeKB = context.initialReadSizeKB; final HpccRemoteFileReader filePartReader = new HpccRemoteFileReader(readContext, fileParts[taskIndex], new HPCCRecordBuilder(recordDef)); filePartReader.setMaxReadRetries(context.readRetries); @@ -1072,6 +1122,7 @@ private static Runnable[] createNonRedistributingCopyTasks(HPCCFile file, DFUCre readContext.parentSpan = context.getCurrentOperation().operationSpan; readContext.originalRD = recordDef; readContext.socketOpTimeoutMS = context.socketOpTimeoutMS; + readContext.initialReadSizeKB = context.initialReadSizeKB; filePartReaders[j] = new HpccRemoteFileReader(readContext, inFilePart, new HPCCRecordBuilder(recordDef)); filePartReaders[j].setMaxReadRetries(context.readRetries); } @@ -1312,6 +1363,8 @@ private static void performRead(String[] args, TaskContext context) String user = creds[0]; String pass = creds[1]; + applyGlobalConfig(cmd); + String outputPath = cmd.getOptionValue("out","."); int numThreads = NUM_DEFAULT_THREADS; @@ -1334,6 +1387,7 @@ private static void performRead(String[] args, TaskContext context) context.readRetries = getReadRetries(cmd); context.socketOpTimeoutMS = getSocketOpTimeoutMS(cmd); + context.initialReadSizeKB = getInitialReadSizeKB(cmd); FileFormat format = FileFormat.THOR; switch (formatStr.toUpperCase()) @@ -1514,6 +1568,8 @@ private static void performReadTest(String[] args, TaskContext context) String user = creds[0]; String pass = creds[1]; + applyGlobalConfig(cmd); + String outputPath = cmd.getOptionValue("out","."); int numThreads = NUM_DEFAULT_THREADS; @@ -1566,6 +1622,7 @@ private static void performReadTest(String[] args, TaskContext context) context.readRetries = getReadRetries(cmd); context.socketOpTimeoutMS = getSocketOpTimeoutMS(cmd); + context.initialReadSizeKB = getInitialReadSizeKB(cmd); String formatStr = cmd.getOptionValue("format"); if (formatStr == null) @@ -1746,6 +1803,8 @@ private static void performCopy(String[] args, TaskContext context) String user = creds[0]; String pass = creds[1]; + applyGlobalConfig(cmd); + String destClusterName = cmd.getOptionValue("dest_cluster"); String srcURL = cmd.getOptionValue("url"); @@ -1794,6 +1853,7 @@ private static void performCopy(String[] args, TaskContext context) context.readRetries = getReadRetries(cmd); context.socketOpTimeoutMS = getSocketOpTimeoutMS(cmd); + context.initialReadSizeKB = getInitialReadSizeKB(cmd); for (int i = 0; i < copyPairs.length; i+=2) { @@ -1950,6 +2010,8 @@ private static void performWrite(String[] args, TaskContext context) String user = creds[0]; String pass = creds[1]; + applyGlobalConfig(cmd); + String destClusterName = cmd.getOptionValue("dest_cluster"); String srcURL = cmd.getOptionValue("url"); diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/HpccRemoteFileReader.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/HpccRemoteFileReader.java index 87d32980e..81ef1faf2 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/HpccRemoteFileReader.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/HpccRemoteFileReader.java @@ -70,6 +70,7 @@ public static class FileReadContext public int socketOpTimeoutMS = -1; public int recordReadLimit = -1; public boolean createPrefetchThread = true; + public int initialReadSizeKB = -1; public int readSizeKB = -1; public int readRequestSpanBatchSize = -1; // The number of read requests before creating a new span public Span parentSpan = null; @@ -89,6 +90,21 @@ private static FileReadContext constructReadContext(FieldDef originalRD, int con return context; } + private static RowServiceInputStream.StreamContext constructStreamContext(FileReadContext readContext) + { + RowServiceInputStream.StreamContext context = new RowServiceInputStream.StreamContext(); + context.recordDefinition = readContext.originalRD; + context.recordReadLimit = readContext.recordReadLimit; + context.createPrefetchThread = readContext.createPrefetchThread; + context.maxReadSizeKB = readContext.readSizeKB; + context.initialReadSizeKB = readContext.initialReadSizeKB; + context.connectTimeoutMS = readContext.connectTimeout; + context.socketOpTimeoutMS = readContext.socketOpTimeoutMS; + context.createPrefetchThread = readContext.createPrefetchThread; + + return context; + } + /** * Instantiates a new hpcc remote file reader. * @@ -284,12 +300,15 @@ public HpccRemoteFileReader(FileReadContext ctx, DataPartition dp, IRecordBuilde throw e; } + RowServiceInputStream.StreamContext streamContext = constructStreamContext(context); + streamContext.projectedRecordDefinition = projectedRecordDefinition; + streamContext.fileReadSpan = this.readSpan; + if (resumeInfo == null) { - this.inputStream = new RowServiceInputStream(this.dataPartition, context.originalRD, projectedRecordDefinition, context.connectTimeout, - context.recordReadLimit, context.createPrefetchThread, context.readSizeKB, null, - false, context.socketOpTimeoutMS, this.readSpan); + this.inputStream = new RowServiceInputStream(streamContext, this.dataPartition, null); this.inputStream.setReadRequestSpanBatchSize(context.readRequestSpanBatchSize); + this.binaryRecordReader = new BinaryRecordReader(this.inputStream); this.binaryRecordReader.initialize(this.recordBuilder); @@ -304,9 +323,7 @@ public HpccRemoteFileReader(FileReadContext ctx, DataPartition dp, IRecordBuilde restartInfo.streamPos = resumeInfo.inputStreamPos; restartInfo.tokenBin = resumeInfo.tokenBin; - this.inputStream = new RowServiceInputStream(this.dataPartition, context.originalRD, projectedRecordDefinition, context.connectTimeout, - context.recordReadLimit, context.createPrefetchThread, context.readSizeKB, restartInfo, - false, context.socketOpTimeoutMS, this.readSpan); + this.inputStream = new RowServiceInputStream(streamContext, this.dataPartition, restartInfo); this.inputStream.setReadRequestSpanBatchSize(context.readRequestSpanBatchSize); long bytesToSkip = resumeInfo.recordReaderStreamPos - resumeInfo.inputStreamPos; @@ -383,9 +400,11 @@ private boolean retryRead() { this.readSpan = createReadSpan(context, dataPartition); - this.inputStream = new RowServiceInputStream(this.dataPartition, context.originalRD,this.recordBuilder.getRecordDefinition(), - context.connectTimeout, context.recordReadLimit, context.createPrefetchThread, - context.readSizeKB, restartInfo, false, context.socketOpTimeoutMS, this.readSpan); + RowServiceInputStream.StreamContext streamContext = constructStreamContext(context); + streamContext.projectedRecordDefinition = this.recordBuilder.getRecordDefinition(); + streamContext.fileReadSpan = this.readSpan; + + this.inputStream = new RowServiceInputStream(streamContext, this.dataPartition, restartInfo); this.inputStream.setReadRequestSpanBatchSize(context.readRequestSpanBatchSize); long bytesToSkip = resumeInfo.recordReaderStreamPos - resumeInfo.inputStreamPos; if (bytesToSkip < 0) diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java index 3cb569ac9..3226701d8 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java @@ -64,26 +64,60 @@ private static class ReadRequestEvent public int requestSize = 0; }; + private static class RowServiceResponse + { + int len = 0; + int errorCode = 0; + int handle = -1; + String errorMessage = null; + } + public static class RestartInformation { public long streamPos = 0; public byte[] tokenBin = null; } - private static class RowServiceResponse + public static class StreamContext { - int len = 0; - int errorCode = 0; - int handle = -1; - String errorMessage = null; + public FieldDef recordDefinition = null; + public FieldDef projectedRecordDefinition = null; + public int recordReadLimit = -1; + public int maxReadSizeKB = DEFAULT_MAX_READ_SIZE_KB; + public int initialReadSizeKB = DEFAULT_INITIAL_REQUEST_READ_SIZE_KB; + public boolean createPrefetchThread = true; + public boolean isFetching = false; + public int connectTimeoutMS = DEFAULT_CONNECT_TIMEOUT_MILIS; + public int socketOpTimeoutMS = DEFAULT_SOCKET_OP_TIMEOUT_MS; + public Span fileReadSpan = null; + }; + + private static StreamContext constructStreamContext(FieldDef rd, FieldDef pRd, int connectTimeout, int limit, + boolean createPrefetchThread, int maxReadSizeInKB, + boolean isFetching, int socketOpTimeoutMS, Span rdSpan) + { + StreamContext ctx = new StreamContext(); + ctx.recordDefinition = rd; + ctx.projectedRecordDefinition = pRd; + ctx.recordReadLimit = limit; + ctx.maxReadSizeKB = maxReadSizeInKB; + ctx.createPrefetchThread = createPrefetchThread; + ctx.isFetching = isFetching; + ctx.connectTimeoutMS = connectTimeout; + ctx.socketOpTimeoutMS = socketOpTimeoutMS; + ctx.fileReadSpan = rdSpan; + return ctx; } public static final int DEFAULT_READ_REQUEST_SPAN_BATCH_SIZE = 25; public static final int DEFAULT_CONNECT_TIMEOUT_MILIS = 5000; // 5 second connection timeout public static final int DEFAULT_SOCKET_OP_TIMEOUT_MS = 15000; // 15 second timeout on read / write operations + public static final int DEFAULT_MAX_CONCURRENT_CONNECTION_STARTUPS = 100; // Note: The platform may respond with more data than this if records are larger than this limit. public static final int DEFAULT_MAX_READ_SIZE_KB = 4096; + public static final int DEFAULT_INITIAL_REQUEST_READ_SIZE_KB = 256; + private static final int SHORT_SLEEP_MS = 1; private static final int LONG_WAIT_THRESHOLD_US = 100; private static final int MAX_HOT_LOOP_NS = 10000; @@ -109,8 +143,12 @@ private static class RowServiceResponse public static final String PARTIAL_BLOCK_READS_METRIC = "numPartialBlockReads"; public static final String BLOCK_READS_METRIC = "numBlockReads"; + private static AtomicInteger connectionStartupCount = new AtomicInteger(0); + private static int maxConcurrentStartups = DEFAULT_MAX_CONCURRENT_CONNECTION_STARTUPS; + private AtomicBoolean active = new AtomicBoolean(false); private AtomicBoolean closed = new AtomicBoolean(false); + private boolean isFirstReadRequest = false; private boolean simulateFail = false; private boolean forceTokenUse = false; private boolean inFetchingMode = false; @@ -188,6 +226,7 @@ private static class RowServiceResponse private static final Logger log = LogManager.getLogger(RowServiceInputStream.class); private int maxReadSizeKB = DEFAULT_MAX_READ_SIZE_KB; + private int initialReadSizeKB = DEFAULT_INITIAL_REQUEST_READ_SIZE_KB; // Buffer compact threshold should always be smaller than buffer prefetch threshold private int bufferPrefetchThresholdKB = DEFAULT_MAX_READ_SIZE_KB/2; @@ -394,13 +433,32 @@ public RowServiceInputStream(DataPartition dp, FieldDef rd, FieldDef pRd, int co public RowServiceInputStream(DataPartition dp, FieldDef rd, FieldDef pRd, int connectTimeout, int limit, boolean createPrefetchThread, int maxReadSizeInKB, RestartInformation restartInfo, boolean isFetching, int socketOpTimeoutMS, Span rdSpan) throws Exception { - this.recordDefinition = rd; - this.projectedRecordDefinition = pRd; - this.inFetchingMode = isFetching; + this(constructStreamContext(rd, pRd, connectTimeout, limit, createPrefetchThread, maxReadSizeInKB, + isFetching, socketOpTimeoutMS, rdSpan), dp, restartInfo); + } + + /** + * A plain socket connect to a THOR node for remote read + * + * @param context Streaming configuration context + * @param dp Data partition to read + * @param restartInfo Restart information, can be null + * @throws Exception general exception + */ + public RowServiceInputStream(StreamContext context, DataPartition dp, RestartInformation restartInfo) throws Exception + { + this.recordDefinition = context.recordDefinition; + this.projectedRecordDefinition = context.projectedRecordDefinition; + this.inFetchingMode = context.isFetching; - if (maxReadSizeInKB > 0) + if (context.maxReadSizeKB > 0) { - this.maxReadSizeKB = maxReadSizeInKB; + this.maxReadSizeKB = context.maxReadSizeKB; + } + + if (context.initialReadSizeKB > 0) + { + this.initialReadSizeKB = context.initialReadSizeKB; } this.bufferPrefetchThresholdKB = this.maxReadSizeKB/2; @@ -411,9 +469,9 @@ public RowServiceInputStream(DataPartition dp, FieldDef rd, FieldDef pRd, int co this.dataPart = dp; - if (rdSpan != null && rdSpan.getSpanContext().isValid()) + if (context.fileReadSpan != null && context.fileReadSpan.getSpanContext().isValid()) { - this.fileReadSpan = rdSpan; + this.fileReadSpan = context.fileReadSpan; this.traceContextHeader = org.hpccsystems.ws.client.utils.Utils.getTraceParentHeader(fileReadSpan); } @@ -431,17 +489,17 @@ public RowServiceInputStream(DataPartition dp, FieldDef rd, FieldDef pRd, int co this.tokenBin = null; this.simulateFail = false; - if (connectTimeout > 0) + if (context.connectTimeoutMS > 0) { - this.connectTimeout = connectTimeout; + this.connectTimeout = context.connectTimeoutMS; } - if (socketOpTimeoutMS > 0) + if (context.socketOpTimeoutMS > 0) { - this.socketOpTimeoutMs = socketOpTimeoutMS; + this.socketOpTimeoutMs = context.socketOpTimeoutMS; } - this.recordLimit = limit; + this.recordLimit = context.recordReadLimit; this.readBufferCapacity.set(this.maxReadSizeKB*1024*2); this.readBuffer = new byte[this.readBufferCapacity.get()]; @@ -513,7 +571,7 @@ public RowServiceInputStream(DataPartition dp, FieldDef rd, FieldDef pRd, int co } } - if (createPrefetchThread) + if (context.createPrefetchThread) { RowServiceInputStream rowInputStream = this; Runnable prefetchTask = new Runnable() @@ -692,6 +750,15 @@ public int getHandle() return handle; } + /** + * Sets the global max concurrent connection startups. + * @param max the new max concurrent connection startups + */ + public static void setMaxConcurrentConnectionStartups(int max) + { + maxConcurrentStartups = max; + } + /** * The delay in milliseconds between read requests. Primarily used for testing. * @param sleepTimeMS the sleep time in milliseconds @@ -1250,6 +1317,12 @@ private void finishFetch() finishReadRequestSpan(); + // After we have read the first request allow another connection to start + if (isFirstReadRequest) + { + isFirstReadRequest = false; + connectionStartupCount.decrementAndGet(); + } //------------------------------------------------------------------------------ // Send read ahead request @@ -1467,6 +1540,13 @@ public int available() throws IOException @Override public void close() throws IOException { + // If we close before the first read request is finished we need to decrement the connection count + if (isFirstReadRequest) + { + isFirstReadRequest = false; + connectionStartupCount.decrementAndGet(); + } + // Using getAndSet to prevent main thread and background thread from // closing at the same time if (this.closed.getAndSet(true) == false) @@ -1783,6 +1863,22 @@ public List getMetrics() private void makeActive() throws HpccFileException { + // Limit the number of concurrent connection startups + int currentCount = connectionStartupCount.get(); + int newCount = currentCount+1; + isFirstReadRequest = true; + while (newCount > maxConcurrentStartups || !connectionStartupCount.compareAndSet(currentCount, newCount)) + { + try + { + Thread.sleep(1); + } + catch (InterruptedException e) {} // We don't care about waking early + + currentCount = connectionStartupCount.get(); + newCount = currentCount+1; + } + this.active.set(false); this.handle = 0; String prefix = "RowServiceInputStream.makeActive, file " + dataPart.getFileName() + " part " + dataPart.getThisPart() + " on IP " + getIP() + ":"; @@ -1795,7 +1891,6 @@ private void makeActive() throws HpccFileException connectSpan.setAttribute("server.index", getFilePartCopy()); } - boolean needsRetry = false; do { @@ -2014,6 +2109,10 @@ private void makeActive() throws HpccFileException needsRetry = true; if (!setNextFilePartCopy()) { + // This connection has failed, decrement the connection count to allow another connection to start + isFirstReadRequest = false; + connectionStartupCount.decrementAndGet(); + throw new HpccFileException(prefix + " Unsuccessfuly attempted to connect to all file part copies", e); } } @@ -2309,7 +2408,7 @@ private String makeInitialRequest() sb.append(RFCCodes.RFCStreamReadCmd); sb.append("{ \"format\" : \"binary\", \n"); - sb.append("\"replyLimit\" : " + this.maxReadSizeKB + ",\n"); + sb.append("\"replyLimit\" : " + this.initialReadSizeKB + ",\n"); final String trace = traceContextHeader != null ? "\"_trace\": { \"traceparent\" : \"" + traceContextHeader + "\" },\n" : ""; sb.append(trace); @@ -2386,6 +2485,7 @@ private String makeHandleRequest() sb.append(RFCCodes.RFCStreamReadCmd); sb.append("{ \"format\" : \"binary\",\n"); sb.append(" \"handle\" : \"" + Integer.toString(this.handle) + "\","); + sb.append("\"replyLimit\" : " + this.maxReadSizeKB + ",\n"); final String trace = traceContextHeader != null ? "\"_trace\": { \"traceparent\" : \"" + traceContextHeader + "\" },\n" : ""; sb.append(trace);