diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java index 795fd8478..372bdce1e 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java @@ -117,8 +117,9 @@ public class RowServiceInputStream extends InputStream implements IProfilable // Note: The platform may respond with more data than this if records are larger than this limit. private static final int DEFAULT_MAX_READ_SIZE_KB = 4096; - private static final int PREFETCH_SLEEP_MS = 1; + private static final int SHORT_SLEEP_MS = 1; private static final int LONG_WAIT_THRESHOLD_US = 100; + private static final int MAX_HOT_LOOP_NS = 10000; // This is used to prevent the prefetch thread from hot looping when // the network connection is slow. The read on the socket will block until @@ -406,7 +407,11 @@ public RowServiceInputStream(DataPartition dp, FieldDef rd, FieldDef pRd, int co this.skip(avail); // Sleep 1ms to prevent hot loop from eating CPU resources - Thread.sleep(1); + try + { + Thread.sleep(SHORT_SLEEP_MS); + } + catch(InterruptedException e) {/*We don't care about waking early*/} } } catch (IOException e) @@ -435,27 +440,24 @@ public void run() { while (inputStream.isClosed() == false) { - // If we don't have room in the buffer to fetch more data sleep - if (inputStream.getRemainingBufferCapacity() <= (inputStream.bufferPrefetchThresholdKB * 1024)) + inputStream.prefetchData(); + + // Sleep after each prefetch to prevent hot loop from eating CPU resources + try { - try + if (CompileTimeConstants.PROFILE_CODE) + { + long sleepTime = System.nanoTime(); + Thread.sleep(SHORT_SLEEP_MS); + sleepTime = System.nanoTime() - sleepTime; + sleepTimeNS += sleepTime; + } + else { - if (CompileTimeConstants.PROFILE_CODE) - { - long sleepTime = System.nanoTime(); - Thread.sleep(PREFETCH_SLEEP_MS); - sleepTime = System.nanoTime() - sleepTime; - sleepTimeNS += sleepTime; - } - else - { - Thread.sleep(PREFETCH_SLEEP_MS); - } + Thread.sleep(SHORT_SLEEP_MS); } - catch(Exception e){} } - - inputStream.prefetchData(); + catch(InterruptedException e) {/*We don't care about waking early*/} } } }; @@ -1335,14 +1337,34 @@ public int read() throws IOException } // We are waiting on a single byte so hot loop - long waitNS = 0; + long waitNS = System.nanoTime(); try { - // Available will throw an exception when it reaches EOS and available bytes == 0 + while (this.available() < 1) + { + long currentWaitNS = System.nanoTime() - waitNS; + if (currentWaitNS >= MAX_HOT_LOOP_NS) + { + try + { + if (CompileTimeConstants.PROFILE_CODE) + { + long sleepTime = System.nanoTime(); + Thread.sleep(SHORT_SLEEP_MS); + sleepTime = System.nanoTime() - sleepTime; + sleepTimeNS += sleepTime; + } + else + { + Thread.sleep(SHORT_SLEEP_MS); + } + } + catch(InterruptedException e) {/*We don't care about waking early*/} + } + } + if (CompileTimeConstants.PROFILE_CODE) { - waitNS = System.nanoTime(); - while (this.available() < 1) {} waitNS = System.nanoTime() - waitNS; waitTimeNS += waitNS; @@ -1352,10 +1374,6 @@ public int read() throws IOException numLongWaits++; } } - else - { - while (this.available() < 1) {} - } } catch (IOException e) {