From 1c3b1e4fc6ec85efaba9f422a6f3b94ee11b0424 Mon Sep 17 00:00:00 2001 From: James McMullan Date: Thu, 22 Feb 2024 19:03:37 -0500 Subject: [PATCH] HPCC4J-579 prefetch thread hot loop fix - Changed prefetch thread behavior to short sleep after each request - Added short sleep on main thread if blocked by prefetch thread for more than 10us Signed-off-by: James McMullan James.McMullan@lexisnexis.com --- .../dfs/client/RowServiceInputStream.java | 68 +++++++++++-------- 1 file changed, 41 insertions(+), 27 deletions(-) diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java index 0ea652111..8c9bab788 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java @@ -117,8 +117,9 @@ public class RowServiceInputStream extends InputStream implements IProfilable // Note: The platform may respond with more data than this if records are larger than this limit. private static final int DEFAULT_MAX_READ_SIZE_KB = 4096; - private static final int PREFETCH_SLEEP_MS = 1; + private static final int SHORT_SLEEP_MS = 1; private static final int LONG_WAIT_THRESHOLD_US = 100; + private static final int MAX_HOT_LOOP_NS = 10000; // This is used to prevent the prefetch thread from hot looping when // the network connection is slow. The read on the socket will block until @@ -406,7 +407,7 @@ public RowServiceInputStream(DataPartition dp, FieldDef rd, FieldDef pRd, int co this.skip(avail); // Sleep 1ms to prevent hot loop from eating CPU resources - Thread.sleep(1); + Thread.sleep(SHORT_SLEEP_MS); } } catch (IOException e) @@ -435,27 +436,24 @@ public void run() { while (inputStream.isClosed() == false) { - // If we don't have room in the buffer to fetch more data sleep - if (inputStream.getRemainingBufferCapacity() <= (inputStream.bufferPrefetchThresholdKB * 1024)) + inputStream.prefetchData(); + + // Sleep after each prefetch to prevent hot loop from eating CPU resources + try { - try + if (CompileTimeConstants.PROFILE_CODE) + { + long sleepTime = System.nanoTime(); + Thread.sleep(SHORT_SLEEP_MS); + sleepTime = System.nanoTime() - sleepTime; + sleepTimeNS += sleepTime; + } + else { - if (CompileTimeConstants.PROFILE_CODE) - { - long sleepTime = System.nanoTime(); - Thread.sleep(PREFETCH_SLEEP_MS); - sleepTime = System.nanoTime() - sleepTime; - sleepTimeNS += sleepTime; - } - else - { - Thread.sleep(PREFETCH_SLEEP_MS); - } + Thread.sleep(SHORT_SLEEP_MS); } - catch(Exception e){} } - - inputStream.prefetchData(); + catch(Exception e){} } } }; @@ -1340,14 +1338,34 @@ public int read() throws IOException } // We are waiting on a single byte so hot loop - long waitNS = 0; + long waitNS = System.nanoTime(); try { - // Available will throw an exception when it reaches EOS and available bytes == 0 + while (this.available() < 1) + { + long currentWaitNS = System.nanoTime() - waitNS; + if (currentWaitNS >= MAX_HOT_LOOP_NS) + { + try + { + if (CompileTimeConstants.PROFILE_CODE) + { + long sleepTime = System.nanoTime(); + Thread.sleep(SHORT_SLEEP_MS); + sleepTime = System.nanoTime() - sleepTime; + sleepTimeNS += sleepTime; + } + else + { + Thread.sleep(SHORT_SLEEP_MS); + } + } + catch(Exception e){} + } + } + if (CompileTimeConstants.PROFILE_CODE) { - waitNS = System.nanoTime(); - while (this.available() < 1) {} waitNS = System.nanoTime() - waitNS; waitTimeNS += waitNS; @@ -1357,10 +1375,6 @@ public int read() throws IOException numLongWaits++; } } - else - { - while (this.available() < 1) {} - } } catch (IOException e) {