Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HPCC4J-579 prefetch thread hot loop fix #685

Merged
merged 1 commit into from
Feb 28, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,9 @@ public class RowServiceInputStream extends InputStream implements IProfilable

// Note: The platform may respond with more data than this if records are larger than this limit.
private static final int DEFAULT_MAX_READ_SIZE_KB = 4096;
private static final int PREFETCH_SLEEP_MS = 1;
private static final int SHORT_SLEEP_MS = 1;
private static final int LONG_WAIT_THRESHOLD_US = 100;
private static final int MAX_HOT_LOOP_NS = 10000;

// This is used to prevent the prefetch thread from hot looping when
// the network connection is slow. The read on the socket will block until
Expand Down Expand Up @@ -406,7 +407,11 @@ public RowServiceInputStream(DataPartition dp, FieldDef rd, FieldDef pRd, int co
this.skip(avail);

// Sleep 1ms to prevent hot loop from eating CPU resources
Thread.sleep(1);
try
{
Thread.sleep(SHORT_SLEEP_MS);
}
catch(InterruptedException e) {/*We don't care about waking early*/}
}
}
catch (IOException e)
Expand Down Expand Up @@ -435,27 +440,24 @@ public void run()
{
while (inputStream.isClosed() == false)
{
// If we don't have room in the buffer to fetch more data sleep
if (inputStream.getRemainingBufferCapacity() <= (inputStream.bufferPrefetchThresholdKB * 1024))
inputStream.prefetchData();

// Sleep after each prefetch to prevent hot loop from eating CPU resources
try
{
try
if (CompileTimeConstants.PROFILE_CODE)
{
long sleepTime = System.nanoTime();
Thread.sleep(SHORT_SLEEP_MS);
sleepTime = System.nanoTime() - sleepTime;
sleepTimeNS += sleepTime;
}
else
{
if (CompileTimeConstants.PROFILE_CODE)
{
long sleepTime = System.nanoTime();
Thread.sleep(PREFETCH_SLEEP_MS);
sleepTime = System.nanoTime() - sleepTime;
sleepTimeNS += sleepTime;
}
else
{
Thread.sleep(PREFETCH_SLEEP_MS);
}
Thread.sleep(SHORT_SLEEP_MS);
}
catch(Exception e){}
}

inputStream.prefetchData();
catch(InterruptedException e) {/*We don't care about waking early*/}
}
}
};
Expand Down Expand Up @@ -1335,14 +1337,34 @@ public int read() throws IOException
}

// We are waiting on a single byte so hot loop
long waitNS = 0;
long waitNS = System.nanoTime();
try
{
// Available will throw an exception when it reaches EOS and available bytes == 0
while (this.available() < 1)
{
long currentWaitNS = System.nanoTime() - waitNS;
if (currentWaitNS >= MAX_HOT_LOOP_NS)
{
try
{
if (CompileTimeConstants.PROFILE_CODE)
{
long sleepTime = System.nanoTime();
Thread.sleep(SHORT_SLEEP_MS);
sleepTime = System.nanoTime() - sleepTime;
sleepTimeNS += sleepTime;
}
else
{
Thread.sleep(SHORT_SLEEP_MS);
}
}
catch(InterruptedException e) {/*We don't care about waking early*/}
}
}

if (CompileTimeConstants.PROFILE_CODE)
{
waitNS = System.nanoTime();
while (this.available() < 1) {}
waitNS = System.nanoTime() - waitNS;
waitTimeNS += waitNS;

Expand All @@ -1352,10 +1374,6 @@ public int read() throws IOException
numLongWaits++;
}
}
else
{
while (this.available() < 1) {}
}
}
catch (IOException e)
{
Expand Down
Loading