From 87ff51544edfbe3e52f916cfa80eec102003c9db Mon Sep 17 00:00:00 2001 From: James McMullan Date: Tue, 17 Sep 2024 15:51:51 -0400 Subject: [PATCH] HPCC4J-639 Add concurrent connection startup limit - Added a limit to the number of connections that can be started simultaneously - Reduced the size of the initial read request - Exposed parameters in FileUtility Signed-off-by: James McMullan James.McMullan@lexisnexis.com --- .../hpccsystems/dfs/client/FileUtility.java | 82 ++++++++++- .../dfs/client/HpccRemoteFileReader.java | 37 +++-- .../dfs/client/RowServiceInputStream.java | 139 +++++++++++++++--- 3 files changed, 223 insertions(+), 35 deletions(-) diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/FileUtility.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/FileUtility.java index e522b1e50..0efa90d20 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/FileUtility.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/FileUtility.java @@ -159,6 +159,7 @@ public JSONObject end(boolean success) public int readRetries = HpccRemoteFileReader.DEFAULT_READ_RETRIES; public int socketOpTimeoutMS = RowServiceInputStream.DEFAULT_SOCKET_OP_TIMEOUT_MS; + public int initialReadSizeKB = RowServiceInputStream.DEFAULT_INITIAL_REQUEST_READ_SIZE_KB; public void setCurrentOperationSpanAttributes(Attributes attributes) { @@ -352,6 +353,26 @@ private static String[] getCredentials(CommandLine cmd) return new String[] {user, pass}; } + private static void applyGlobalConfig(CommandLine cmd) + { + int concurrentStartups = -1; + String concurrentStartupsStr = cmd.getOptionValue("connection_startup_limit", "" + -1); + try + { + concurrentStartups = Integer.parseInt(concurrentStartupsStr); + } + catch(Exception e) + { + System.out.println("Invalid option value for connection_startup_limit: " + + concurrentStartupsStr + ", must be an integer."); + } + + if (concurrentStartups > 0) + { + RowServiceInputStream.setMaxConcurrentConnectionStartups(concurrentStartups); + } + } + private static enum FileFormat { THOR, @@ -589,6 +610,23 @@ private static int getSocketOpTimeoutMS(CommandLine cmd) return socketOpTimeoutS * 1000; } + private static int getInitialReadSizeKB(CommandLine cmd) + { + int initialReadSizeKB = RowServiceInputStream.DEFAULT_INITIAL_REQUEST_READ_SIZE_KB; + String initialReadSizeStr = cmd.getOptionValue("initial_read_size", "" + initialReadSizeKB); + try + { + initialReadSizeKB = Integer.parseInt(initialReadSizeStr); + } + catch(Exception e) + { + System.out.println("Invalid option value for initial_read_size: " + + initialReadSizeStr + ", must be an integer. Defaulting to: " + RowServiceInputStream.DEFAULT_INITIAL_REQUEST_READ_SIZE_KB + "KB."); + } + + return initialReadSizeKB; + } + private static Options getReadOptions() { Options options = new Options(); @@ -602,6 +640,7 @@ private static Options getReadOptions() options.addOption("ignore_tlk", false, "Ignore the TLK file when reading Index files."); options.addOption("read_retries", true, "Sets the maximum number of retries to attempt when reading a file."); options.addOption("socket_timeout_seconds", true, "Sets the socket operation timeout in seconds."); + options.addOption("connection_startup_limit", true, "Specifies the maximum number of connections to startup concurrently."); options.addOption(Option.builder("read") .argName("files") @@ -622,12 +661,14 @@ private static Options getReadTestOptions() options.addOption("pass", true, "Specifies the password used to connect. Defaults to null."); options.addOption("num_threads", true, "Specifies the number of parallel to use to perform operations."); options.addOption("access_expiry_seconds", true, "Access token expiration seconds."); + options.addOption("initial_read_size", true, "The size of the initial read request in KB sent to the rowservice."); options.addOption("read_request_size", true, "The size of the read requests in KB sent to the rowservice."); options.addOption("read_request_delay", true, "The delay in MS between read requests sent to the rowservice."); options.addOption("filter", true, "Specifies a filter to apply to the files read from the cluster."); options.addOption("ignore_tlk", false, "Ignore the TLK file when reading Index files."); options.addOption("read_retries", true, "Sets the maximum number of retries to attempt when reading a file."); options.addOption("socket_timeout_seconds", true, "Sets the socket operation timeout in seconds."); + options.addOption("connection_startup_limit", true, "Specifies the maximum number of connections to startup concurrently."); options.addOption(Option.builder("file_parts") .argName("_file_parts") @@ -651,6 +692,7 @@ private static Options getCopyOptions() options.addOption("ignore_tlk", false, "Ignore the TLK file when reading Index files."); options.addOption("read_retries", true, "Sets the maximum number of retries to attempt when reading a file."); options.addOption("socket_timeout_seconds", true, "Sets the socket operation timeout in seconds."); + options.addOption("connection_startup_limit", true, "Specifies the maximum number of connections to startup concurrently."); options.addOption(Option.builder("copy") .argName("files") @@ -673,6 +715,7 @@ private static Options getWriteOptions() options.addRequiredOption("dest_cluster", "Destination Cluster Name", true, "Specifies the name of the cluster to write files back to."); options.addOption("num_threads", true, "Specifies the number of parallel to use to perform operations."); options.addOption("socket_timeout_seconds", true, "Sets the socket operation timeout in seconds."); + options.addOption("connection_startup_limit", true, "Specifies the maximum number of connections to startup concurrently."); options.addOption(Option.builder("write") .argName("files") @@ -903,6 +946,7 @@ public void run() HpccRemoteFileReader.FileReadContext readContext = new HpccRemoteFileReader.FileReadContext(); readContext.parentSpan = context.getCurrentOperation().operationSpan; readContext.originalRD = recordDef; + readContext.initialReadSizeKB = context.initialReadSizeKB; readContext.readSizeKB = readRequestSize; readContext.socketOpTimeoutMS = context.socketOpTimeoutMS; @@ -910,11 +954,13 @@ public void run() fileReader.getInputStream().setReadRequestDelay(readRequestDelay); fileReader.setMaxReadRetries(context.readRetries); + long recCount = 0; while (fileReader.hasNext()) { HPCCRecord record = fileReader.next(); - context.getCurrentOperation().recordsRead.incrementAndGet(); + recCount++; } + context.getCurrentOperation().recordsRead.addAndGet(recCount); fileReader.close(); context.getCurrentOperation().bytesRead.addAndGet(fileReader.getStreamPosition()); @@ -942,6 +988,7 @@ private static Runnable[] createReadToThorTasks(DataPartition[] fileParts, Split readContext.parentSpan = context.getCurrentOperation().operationSpan; readContext.originalRD = recordDef; readContext.socketOpTimeoutMS = context.socketOpTimeoutMS; + readContext.initialReadSizeKB = context.initialReadSizeKB; final HpccRemoteFileReader filePartReader = new HpccRemoteFileReader(readContext, fileParts[taskIndex], new HPCCRecordBuilder(recordDef)); filePartReader.setMaxReadRetries(context.readRetries); @@ -963,13 +1010,15 @@ public void run() { try { + long recCount = 0; while (fileReader.hasNext()) { splitTable.addRecordPosition(fileReader.getStreamPosition()); HPCCRecord record = fileReader.next(); fileWriter.writeRecord(record); - context.getCurrentOperation().recordsRead.incrementAndGet(); + recCount++; } + context.getCurrentOperation().recordsRead.addAndGet(recCount); splitTable.finish(fileReader.getStreamPosition()); @@ -1068,6 +1117,7 @@ private static Runnable[] createNonRedistributingCopyTasks(HPCCFile file, DFUCre readContext.parentSpan = context.getCurrentOperation().operationSpan; readContext.originalRD = recordDef; readContext.socketOpTimeoutMS = context.socketOpTimeoutMS; + readContext.initialReadSizeKB = context.initialReadSizeKB; filePartReaders[j] = new HpccRemoteFileReader(readContext, inFilePart, new HPCCRecordBuilder(recordDef)); filePartReaders[j].setMaxReadRetries(context.readRetries); } @@ -1091,14 +1141,18 @@ public void run() { for (int k = 0; k < fileReaders.length; k++) { + long recordsRead = 0; + long recordsWritten = 0; HpccRemoteFileReader fileReader = fileReaders[k]; while (fileReader.hasNext()) { HPCCRecord record = fileReader.next(); fileWriter.writeRecord(record); - context.getCurrentOperation().recordsWritten.incrementAndGet(); - context.getCurrentOperation().recordsRead.incrementAndGet(); + recordsRead++; + recordsWritten++; } + context.getCurrentOperation().recordsWritten.addAndGet(recordsWritten); + context.getCurrentOperation().recordsRead.addAndGet(recordsRead); fileReader.close(); context.getCurrentOperation().bytesRead.addAndGet(fileReader.getStreamPosition()); @@ -1251,14 +1305,19 @@ public void run() splitEnd = endingSplit.splitEnd; } + long recordsRead = 0; + long recordsWritten = 0; while (fileReader.hasNext() && fileReader.getStreamPosAfterLastRecord() < splitEnd) { HPCCRecord record = (HPCCRecord) fileReader.getNext(); fileWriter.writeRecord(record); - context.getCurrentOperation().recordsWritten.incrementAndGet(); - context.getCurrentOperation().recordsRead.incrementAndGet(); + recordsRead++; + recordsWritten++; } + context.getCurrentOperation().recordsWritten.addAndGet(recordsWritten); + context.getCurrentOperation().recordsRead.addAndGet(recordsRead); + context.getCurrentOperation().bytesRead.addAndGet(fileReader.getStreamPosAfterLastRecord()); inputStreams[j].close(); } @@ -1299,6 +1358,8 @@ private static void performRead(String[] args, TaskContext context) String user = creds[0]; String pass = creds[1]; + applyGlobalConfig(cmd); + String outputPath = cmd.getOptionValue("out","."); int numThreads = NUM_DEFAULT_THREADS; @@ -1321,6 +1382,7 @@ private static void performRead(String[] args, TaskContext context) context.readRetries = getReadRetries(cmd); context.socketOpTimeoutMS = getSocketOpTimeoutMS(cmd); + context.initialReadSizeKB = getInitialReadSizeKB(cmd); FileFormat format = FileFormat.THOR; switch (formatStr.toUpperCase()) @@ -1501,6 +1563,8 @@ private static void performReadTest(String[] args, TaskContext context) String user = creds[0]; String pass = creds[1]; + applyGlobalConfig(cmd); + String outputPath = cmd.getOptionValue("out","."); int numThreads = NUM_DEFAULT_THREADS; @@ -1553,6 +1617,7 @@ private static void performReadTest(String[] args, TaskContext context) context.readRetries = getReadRetries(cmd); context.socketOpTimeoutMS = getSocketOpTimeoutMS(cmd); + context.initialReadSizeKB = getInitialReadSizeKB(cmd); String formatStr = cmd.getOptionValue("format"); if (formatStr == null) @@ -1733,6 +1798,8 @@ private static void performCopy(String[] args, TaskContext context) String user = creds[0]; String pass = creds[1]; + applyGlobalConfig(cmd); + String destClusterName = cmd.getOptionValue("dest_cluster"); String srcURL = cmd.getOptionValue("url"); @@ -1781,6 +1848,7 @@ private static void performCopy(String[] args, TaskContext context) context.readRetries = getReadRetries(cmd); context.socketOpTimeoutMS = getSocketOpTimeoutMS(cmd); + context.initialReadSizeKB = getInitialReadSizeKB(cmd); for (int i = 0; i < copyPairs.length; i+=2) { @@ -1937,6 +2005,8 @@ private static void performWrite(String[] args, TaskContext context) String user = creds[0]; String pass = creds[1]; + applyGlobalConfig(cmd); + String destClusterName = cmd.getOptionValue("dest_cluster"); String srcURL = cmd.getOptionValue("url"); diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/HpccRemoteFileReader.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/HpccRemoteFileReader.java index 4192349bb..664046073 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/HpccRemoteFileReader.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/HpccRemoteFileReader.java @@ -73,6 +73,7 @@ public static class FileReadContext public int socketOpTimeoutMS = -1; public int recordReadLimit = -1; public boolean createPrefetchThread = true; + public int initialReadSizeKB = -1; public int readSizeKB = -1; public int readRequestSpanBatchSize = -1; // The number of read requests before creating a new span public Span parentSpan = null; @@ -92,6 +93,21 @@ private static FileReadContext constructReadContext(FieldDef originalRD, int con return context; } + private static RowServiceInputStream.StreamContext constructStreamContext(FileReadContext readContext) + { + RowServiceInputStream.StreamContext context = new RowServiceInputStream.StreamContext(); + context.recordDefinition = readContext.originalRD; + context.recordReadLimit = readContext.recordReadLimit; + context.createPrefetchThread = readContext.createPrefetchThread; + context.maxReadSizeKB = readContext.readSizeKB; + context.initialReadSizeKB = readContext.initialReadSizeKB; + context.connectTimeoutMS = readContext.connectTimeout; + context.socketOpTimeoutMS = readContext.socketOpTimeoutMS; + context.createPrefetchThread = readContext.createPrefetchThread; + + return context; + } + /** * Instantiates a new hpcc remote file reader. * @@ -287,12 +303,15 @@ public HpccRemoteFileReader(FileReadContext ctx, DataPartition dp, IRecordBuilde throw e; } + RowServiceInputStream.StreamContext streamContext = constructStreamContext(context); + streamContext.projectedRecordDefinition = projectedRecordDefinition; + streamContext.fileReadSpan = this.readSpan; + if (resumeInfo == null) { - this.inputStream = new RowServiceInputStream(this.dataPartition, context.originalRD, projectedRecordDefinition, context.connectTimeout, - context.recordReadLimit, context.createPrefetchThread, context.readSizeKB, null, - false, context.socketOpTimeoutMS, this.readSpan); + this.inputStream = new RowServiceInputStream(streamContext, this.dataPartition, null); this.inputStream.setReadRequestSpanBatchSize(context.readRequestSpanBatchSize); + this.binaryRecordReader = new BinaryRecordReader(this.inputStream); this.binaryRecordReader.initialize(this.recordBuilder); @@ -307,9 +326,7 @@ public HpccRemoteFileReader(FileReadContext ctx, DataPartition dp, IRecordBuilde restartInfo.streamPos = resumeInfo.inputStreamPos; restartInfo.tokenBin = resumeInfo.tokenBin; - this.inputStream = new RowServiceInputStream(this.dataPartition, context.originalRD, projectedRecordDefinition, context.connectTimeout, - context.recordReadLimit, context.createPrefetchThread, context.readSizeKB, restartInfo, - false, context.socketOpTimeoutMS, this.readSpan); + this.inputStream = new RowServiceInputStream(streamContext, this.dataPartition, restartInfo); this.inputStream.setReadRequestSpanBatchSize(context.readRequestSpanBatchSize); long bytesToSkip = resumeInfo.recordReaderStreamPos - resumeInfo.inputStreamPos; @@ -386,9 +403,11 @@ private boolean retryRead() { this.readSpan = createReadSpan(context, dataPartition); - this.inputStream = new RowServiceInputStream(this.dataPartition, context.originalRD,this.recordBuilder.getRecordDefinition(), - context.connectTimeout, context.recordReadLimit, context.createPrefetchThread, - context.readSizeKB, restartInfo, false, context.socketOpTimeoutMS, this.readSpan); + RowServiceInputStream.StreamContext streamContext = constructStreamContext(context); + streamContext.projectedRecordDefinition = this.recordBuilder.getRecordDefinition(); + streamContext.fileReadSpan = this.readSpan; + + this.inputStream = new RowServiceInputStream(streamContext, this.dataPartition, restartInfo); this.inputStream.setReadRequestSpanBatchSize(context.readRequestSpanBatchSize); long bytesToSkip = resumeInfo.recordReaderStreamPos - resumeInfo.inputStreamPos; if (bytesToSkip < 0) diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java index e2467b066..7fe3c0ae0 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java @@ -66,26 +66,60 @@ private static class ReadRequestEvent public int requestSize = 0; }; + private static class RowServiceResponse + { + int len = 0; + int errorCode = 0; + int handle = -1; + String errorMessage = null; + } + public static class RestartInformation { public long streamPos = 0; public byte[] tokenBin = null; } - private static class RowServiceResponse + public static class StreamContext { - int len = 0; - int errorCode = 0; - int handle = -1; - String errorMessage = null; + public FieldDef recordDefinition = null; + public FieldDef projectedRecordDefinition = null; + public int recordReadLimit = -1; + public int maxReadSizeKB = DEFAULT_MAX_READ_SIZE_KB; + public int initialReadSizeKB = DEFAULT_INITIAL_REQUEST_READ_SIZE_KB; + public boolean createPrefetchThread = true; + public boolean isFetching = false; + public int connectTimeoutMS = DEFAULT_CONNECT_TIMEOUT_MILIS; + public int socketOpTimeoutMS = DEFAULT_SOCKET_OP_TIMEOUT_MS; + public Span fileReadSpan = null; + }; + + private static StreamContext constructStreamContext(FieldDef rd, FieldDef pRd, int connectTimeout, int limit, + boolean createPrefetchThread, int maxReadSizeInKB, + boolean isFetching, int socketOpTimeoutMS, Span rdSpan) + { + StreamContext ctx = new StreamContext(); + ctx.recordDefinition = rd; + ctx.projectedRecordDefinition = pRd; + ctx.recordReadLimit = limit; + ctx.maxReadSizeKB = maxReadSizeInKB; + ctx.createPrefetchThread = createPrefetchThread; + ctx.isFetching = isFetching; + ctx.connectTimeoutMS = connectTimeout; + ctx.socketOpTimeoutMS = socketOpTimeoutMS; + ctx.fileReadSpan = rdSpan; + return ctx; } public static final int DEFAULT_READ_REQUEST_SPAN_BATCH_SIZE = 25; public static final int DEFAULT_CONNECT_TIMEOUT_MILIS = 5000; // 5 second connection timeout public static final int DEFAULT_SOCKET_OP_TIMEOUT_MS = 15000; // 15 second timeout on read / write operations + public static final int DEFAULT_MAX_CONCURRENT_CONNECTION_STARTUPS = 10; // Note: The platform may respond with more data than this if records are larger than this limit. public static final int DEFAULT_MAX_READ_SIZE_KB = 4096; + public static final int DEFAULT_INITIAL_REQUEST_READ_SIZE_KB = 32; + private static final int SHORT_SLEEP_MS = 1; private static final int LONG_WAIT_THRESHOLD_US = 100; private static final int MAX_HOT_LOOP_NS = 10000; @@ -111,8 +145,12 @@ private static class RowServiceResponse public static final String PARTIAL_BLOCK_READS_METRIC = "numPartialBlockReads"; public static final String BLOCK_READS_METRIC = "numBlockReads"; + private static AtomicInteger connectionStartupCount = new AtomicInteger(0); + private static int maxConcurrentStartups = DEFAULT_MAX_CONCURRENT_CONNECTION_STARTUPS; + private AtomicBoolean active = new AtomicBoolean(false); private AtomicBoolean closed = new AtomicBoolean(false); + private boolean isFirstReadRequest = false; private boolean simulateFail = false; private boolean forceTokenUse = false; private boolean inFetchingMode = false; @@ -190,6 +228,7 @@ private static class RowServiceResponse private static final Logger log = LogManager.getLogger(RowServiceInputStream.class); private int maxReadSizeKB = DEFAULT_MAX_READ_SIZE_KB; + private int initialReadSizeKB = DEFAULT_INITIAL_REQUEST_READ_SIZE_KB; // Buffer compact threshold should always be smaller than buffer prefetch threshold private int bufferPrefetchThresholdKB = DEFAULT_MAX_READ_SIZE_KB/2; @@ -396,13 +435,31 @@ public RowServiceInputStream(DataPartition dp, FieldDef rd, FieldDef pRd, int co public RowServiceInputStream(DataPartition dp, FieldDef rd, FieldDef pRd, int connectTimeout, int limit, boolean createPrefetchThread, int maxReadSizeInKB, RestartInformation restartInfo, boolean isFetching, int socketOpTimeoutMS, Span rdSpan) throws Exception { - this.recordDefinition = rd; - this.projectedRecordDefinition = pRd; - this.inFetchingMode = isFetching; + this(constructStreamContext(rd, pRd, connectTimeout, limit, createPrefetchThread, maxReadSizeInKB, + isFetching, socketOpTimeoutMS, rdSpan), dp, restartInfo); + } + + /** + * A plain socket connect to a THOR node for remote read + * + * @param context Streaming configuration context + * @param dp Data partition to read + * @param restartInfo Restart information, can be null + */ + public RowServiceInputStream(StreamContext context, DataPartition dp, RestartInformation restartInfo) throws Exception + { + this.recordDefinition = context.recordDefinition; + this.projectedRecordDefinition = context.projectedRecordDefinition; + this.inFetchingMode = context.isFetching; - if (maxReadSizeInKB > 0) + if (context.maxReadSizeKB > 0) { - this.maxReadSizeKB = maxReadSizeInKB; + this.maxReadSizeKB = context.maxReadSizeKB; + } + + if (context.initialReadSizeKB > 0) + { + this.initialReadSizeKB = context.initialReadSizeKB; } this.bufferPrefetchThresholdKB = this.maxReadSizeKB/2; @@ -413,9 +470,9 @@ public RowServiceInputStream(DataPartition dp, FieldDef rd, FieldDef pRd, int co this.dataPart = dp; - if (rdSpan != null && rdSpan.getSpanContext().isValid()) + if (context.fileReadSpan != null && context.fileReadSpan.getSpanContext().isValid()) { - this.fileReadSpan = rdSpan; + this.fileReadSpan = context.fileReadSpan; this.traceContextHeader = org.hpccsystems.ws.client.utils.Utils.getTraceParentHeader(fileReadSpan); } @@ -433,17 +490,17 @@ public RowServiceInputStream(DataPartition dp, FieldDef rd, FieldDef pRd, int co this.tokenBin = null; this.simulateFail = false; - if (connectTimeout > 0) + if (context.connectTimeoutMS > 0) { - this.connectTimeout = connectTimeout; + this.connectTimeout = context.connectTimeoutMS; } - if (socketOpTimeoutMS > 0) + if (context.socketOpTimeoutMS > 0) { - this.socketOpTimeoutMs = socketOpTimeoutMS; + this.socketOpTimeoutMs = context.socketOpTimeoutMS; } - this.recordLimit = limit; + this.recordLimit = context.recordReadLimit; this.readBufferCapacity.set(this.maxReadSizeKB*1024*2); this.readBuffer = new byte[this.readBufferCapacity.get()]; @@ -515,7 +572,7 @@ public RowServiceInputStream(DataPartition dp, FieldDef rd, FieldDef pRd, int co } } - if (createPrefetchThread) + if (context.createPrefetchThread) { RowServiceInputStream rowInputStream = this; Runnable prefetchTask = new Runnable() @@ -694,6 +751,15 @@ public int getHandle() return handle; } + /** + * Sets the global max concurrent connection startups. + * @param max the new max concurrent connection startups + */ + public static void setMaxConcurrentConnectionStartups(int max) + { + maxConcurrentStartups = max; + } + /** * The delay in milliseconds between read requests. Primarily used for testing. * @param sleepTimeMS the sleep time in milliseconds @@ -857,6 +923,7 @@ public void startBlockingFetchRequest(List fetchOffsets) throws Exception { finishFetch(); } + } } @@ -1252,6 +1319,12 @@ private void finishFetch() finishReadRequestSpan(); + // After we have read the first request allow another connection to start + if (isFirstReadRequest) + { + isFirstReadRequest = false; + connectionStartupCount.decrementAndGet(); + } //------------------------------------------------------------------------------ // Send read ahead request @@ -1469,6 +1542,13 @@ public int available() throws IOException @Override public void close() throws IOException { + // If we close before the first read request is finished we need to decrement the connection count + if (isFirstReadRequest) + { + isFirstReadRequest = false; + connectionStartupCount.decrementAndGet(); + } + // Using getAndSet to prevent main thread and background thread from // closing at the same time if (this.closed.getAndSet(true) == false) @@ -1785,6 +1865,21 @@ public List getMetrics() private void makeActive() throws HpccFileException { + // Limit the number of concurrent connection startups + int currentCount = connectionStartupCount.get(); + int newCount = currentCount+1; + isFirstReadRequest = true; + while (newCount > maxConcurrentStartups || !connectionStartupCount.compareAndSet(currentCount, newCount)) + { + try + { + Thread.sleep(1); + } catch (InterruptedException e) {} // We don't care about waking early + + currentCount = connectionStartupCount.get(); + newCount = currentCount+1; + } + this.active.set(false); this.handle = 0; String prefix = "RowServiceInputStream.makeActive, file " + dataPart.getFileName() + " part " + dataPart.getThisPart() + " on IP " + getIP() + ":"; @@ -1797,7 +1892,6 @@ private void makeActive() throws HpccFileException connectSpan.setAttribute("server.index", getFilePartCopy()); } - boolean needsRetry = false; do { @@ -2016,6 +2110,10 @@ private void makeActive() throws HpccFileException needsRetry = true; if (!setNextFilePartCopy()) { + // This connection has failed, decrement the connection count to allow another connection to start + isFirstReadRequest = false; + connectionStartupCount.decrementAndGet(); + throw new HpccFileException(prefix + " Unsuccessfuly attempted to connect to all file part copies", e); } } @@ -2311,7 +2409,7 @@ private String makeInitialRequest() sb.append(RFCCodes.RFCStreamReadCmd); sb.append("{ \"format\" : \"binary\", \n"); - sb.append("\"replyLimit\" : " + this.maxReadSizeKB + ",\n"); + sb.append("\"replyLimit\" : " + this.initialReadSizeKB + ",\n"); final String trace = traceContextHeader != null ? "\"_trace\": { \"traceparent\" : \"" + traceContextHeader + "\" },\n" : ""; sb.append(trace); @@ -2383,6 +2481,7 @@ private String makeHandleRequest() sb.append(RFCCodes.RFCStreamReadCmd); sb.append("{ \"format\" : \"binary\",\n"); sb.append(" \"handle\" : \"" + Integer.toString(this.handle) + "\","); + sb.append("\"replyLimit\" : " + this.maxReadSizeKB + ",\n"); final String trace = traceContextHeader != null ? "\"_trace\": { \"traceparent\" : \"" + traceContextHeader + "\" },\n" : ""; sb.append(trace);