Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added strict timeout (timeAllowed ) for shard tolerant requests #201

Open
wants to merge 12 commits into
base: fs/branch_9_3
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,12 @@ protected ShardResponse transfomResponse(
*/
@Override
public ShardResponse takeCompletedIncludingErrors() {
return take(false);
return take(false, -1);
}

@Override
public ShardResponse takeCompletedIncludingErrorsWithTimeout(long maxAllowedTimeInMillis) {
return take(false, maxAllowedTimeInMillis);
}

/**
Expand All @@ -232,13 +237,22 @@ public ShardResponse takeCompletedIncludingErrors() {
*/
@Override
public ShardResponse takeCompletedOrError() {
return take(true);
return take(true, -1);
}

private ShardResponse take(boolean bailOnError) {
private ShardResponse take(boolean bailOnError, long maxAllowedTimeInMillis) {
try {
long deadline = System.nanoTime();
if (maxAllowedTimeInMillis > 0) {
deadline += TimeUnit.MILLISECONDS.toNanos(maxAllowedTimeInMillis);
} else {
deadline = Long.MAX_VALUE;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can't do deadline = Long.MAX_VALUE -- there are no guarantees about nanoTime being non-negative, nor starting from 0, etc. So any computation we do needs to make no such assumptions, and should intentionally play well with overflow.

With the approach you're taking, deadline += Long.MAX_VALUE would work here instead.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@magibney good catch. didn't realize can return non-positive. I have added delay of day.

}

while (pending.get() > 0) {
ShardResponse rsp = responses.take();
long waitTime = deadline - System.nanoTime();
ShardResponse rsp = responses.poll(waitTime, TimeUnit.NANOSECONDS);
if (rsp == null) return null;
responseCancellableMap.remove(rsp);

pending.decrementAndGet();
Expand Down Expand Up @@ -403,4 +417,14 @@ private boolean canShortCircuit(
public ShardHandlerFactory getShardHandlerFactory() {
return httpShardHandlerFactory;
}

// test helper function
void setPendingRequest(int val) {
this.pending.set(val);
}

// test helper function
void setResponse(ShardResponse shardResponse) {
this.responses.add(shardResponse);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,7 @@ public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throw
}
} else {
// a distributed request
long maxTimeAllowed = req.getParams().getLong(CommonParams.TIME_ALLOWED, -1);

if (rb.outgoing == null) {
rb.outgoing = new ArrayList<>();
Expand Down Expand Up @@ -593,7 +594,7 @@ public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throw
while (rb.outgoing.size() == 0) {
ShardResponse srsp =
tolerant
? shardHandler1.takeCompletedIncludingErrors()
? shardHandler1.takeCompletedIncludingErrorsWithTimeout(maxTimeAllowed)
: shardHandler1.takeCompletedOrError();
if (srsp == null) break; // no more requests to wait for

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ public abstract class ShardHandler {

public abstract ShardResponse takeCompletedIncludingErrors();

public abstract ShardResponse takeCompletedIncludingErrorsWithTimeout(
long maxAllowedTimeInMillis);

public abstract ShardResponse takeCompletedOrError();

public abstract void cancelAll();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ public ShardResponse takeCompletedIncludingErrors() {
return null;
}

@Override
public ShardResponse takeCompletedIncludingErrorsWithTimeout(long maxAllowedTime) {
return null;
}

@Override
public ShardResponse takeCompletedOrError() {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.client.solrj.impl.LBSolrClient;
import org.apache.solr.client.solrj.request.QueryRequest;
Expand Down Expand Up @@ -155,4 +156,57 @@ public void testLiveNodesToHostUrl() {
MatcherAssert.assertThat(hostSet, hasItem("1.2.3.4:9000"));
MatcherAssert.assertThat(hostSet, hasItem("1.2.3.4:9001"));
}

@Test
public void testHttpShardHandlerTimeout() {
HttpShardHandlerFactory httpShardHandlerFactory = new HttpShardHandlerFactory();
HttpShardHandler shardHandler = (HttpShardHandler) httpShardHandlerFactory.getShardHandler();

long startTime = System.nanoTime();
long timeAllowedInMillis = 120;
shardHandler.setPendingRequest(1);
ShardResponse shardResponse =
shardHandler.takeCompletedIncludingErrorsWithTimeout(timeAllowedInMillis);

assertEquals(null, shardResponse);
hiteshk25 marked this conversation as resolved.
Show resolved Hide resolved

long timeTakenInMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime);
assertTrue(
"Should have taken more than 100 milli seconds " + timeTakenInMillis,
timeTakenInMillis > 100);
assertTrue(
"Should have timeout in 120 milli seconds " + timeTakenInMillis, timeTakenInMillis < 130);
hiteshk25 marked this conversation as resolved.
Show resolved Hide resolved
}

@Test
public void testHttpShardHandlerWithResponse() {
HttpShardHandlerFactory httpShardHandlerFactory = new HttpShardHandlerFactory();
HttpShardHandler shardHandler = (HttpShardHandler) httpShardHandlerFactory.getShardHandler();

long startTime = System.nanoTime();
long timeAllowedInMillis = -1;
shardHandler.setPendingRequest(1);

ShardResponse shardResponse = new ShardResponse();
shardResponse.setShard("shard_1");
ShardRequest shardRequest = new ShardRequest();
shardRequest.actualShards = new String[] {"shard_1"};
shardResponse.setShardRequest(shardRequest);

Thread thread =
new Thread(
() -> {
shardHandler.setResponse(shardResponse);
});
thread.start();
hiteshk25 marked this conversation as resolved.
Show resolved Hide resolved
ShardResponse gotResponse =
shardHandler.takeCompletedIncludingErrorsWithTimeout(timeAllowedInMillis);

assertEquals(shardResponse, gotResponse);

long timeTakenInMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime);
assertTrue(
"Should have taken less than 100 milli seconds " + timeTakenInMillis,
timeTakenInMillis < 100);
hiteshk25 marked this conversation as resolved.
Show resolved Hide resolved
}
}
Loading