Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added strict timeout (timeAllowed ) for shard tolerant requests #201

Open
wants to merge 12 commits into
base: fs/branch_9_3
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,12 @@ protected ShardResponse transfomResponse(
*/
@Override
public ShardResponse takeCompletedIncludingErrors() {
return take(false);
return take(false, -1);
}

@Override
public ShardResponse takeCompletedIncludingErrorsWithTimeout(long maxAllowedTimeInMillis) {
return take(false, maxAllowedTimeInMillis);
}

/**
Expand All @@ -232,13 +237,23 @@ public ShardResponse takeCompletedIncludingErrors() {
*/
@Override
public ShardResponse takeCompletedOrError() {
return take(true);
return take(true, -1);
}

private ShardResponse take(boolean bailOnError) {
private ShardResponse take(boolean bailOnError, long maxAllowedTimeInMillis) {
try {
long deadline = System.nanoTime();
if (maxAllowedTimeInMillis > 0) {
deadline += TimeUnit.MILLISECONDS.toNanos(maxAllowedTimeInMillis);
} else {
deadline = System.nanoTime() + TimeUnit.DAYS.toNanos(1);
}

ShardResponse previousResponse = null;
while (pending.get() > 0) {
ShardResponse rsp = responses.take();
long waitTime = deadline - System.nanoTime();
ShardResponse rsp = responses.poll(waitTime, TimeUnit.NANOSECONDS);
if (rsp == null) return previousResponse;
responseCancellableMap.remove(rsp);

pending.decrementAndGet();
Expand All @@ -249,6 +264,7 @@ private ShardResponse take(boolean bailOnError) {
// for a request was received. Otherwise we might return the same
// request more than once.
rsp.getShardRequest().responses.add(rsp);
previousResponse = rsp;
if (rsp.getShardRequest().responses.size() == rsp.getShardRequest().actualShards.length) {
return rsp;
}
Expand Down Expand Up @@ -403,4 +419,14 @@ private boolean canShortCircuit(
public ShardHandlerFactory getShardHandlerFactory() {
return httpShardHandlerFactory;
}

// test helper function
void setPendingRequest(int val) {
this.pending.set(val);
}

// test helper function
void setResponse(ShardResponse shardResponse) {
this.responses.add(shardResponse);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,7 @@ public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throw
}
} else {
// a distributed request
long maxTimeAllowed = req.getParams().getLong(CommonParams.TIME_ALLOWED, -1);

if (rb.outgoing == null) {
rb.outgoing = new ArrayList<>();
Expand Down Expand Up @@ -593,7 +594,7 @@ public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throw
while (rb.outgoing.size() == 0) {
ShardResponse srsp =
tolerant
? shardHandler1.takeCompletedIncludingErrors()
? shardHandler1.takeCompletedIncludingErrorsWithTimeout(maxTimeAllowed)
: shardHandler1.takeCompletedOrError();
if (srsp == null) break; // no more requests to wait for

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ public abstract class ShardHandler {

public abstract ShardResponse takeCompletedIncludingErrors();

public abstract ShardResponse takeCompletedIncludingErrorsWithTimeout(
long maxAllowedTimeInMillis);

public abstract ShardResponse takeCompletedOrError();

public abstract void cancelAll();
Expand Down
90 changes: 90 additions & 0 deletions solr/core/src/test/org/apache/solr/TestTimeAllowedSearch.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package org.apache.solr;

import com.carrotsearch.randomizedtesting.generators.RandomStrings;
import java.util.Locale;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.cloud.MiniSolrCloudCluster;
import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ShardParams;

public class TestTimeAllowedSearch extends SolrCloudTestCase {

/**
* This test demonstrates timeAllowed expectation at @{@link
* org.apache.solr.handler.component.HttpShardHandler} level This test creates collection with
* 'implicit` router, which has two shards shard_1 has 100000 docs, so that query should take some
* time shard_2 has only 1 doc to demonstrate the HttpSHardHandler timeout Then it execute
* substring query with TIME_ALLOWED 50, assuming this query will time out on shard_1
*/
public void testTimeAllowed() throws Exception {
MiniSolrCloudCluster cluster =
configureCluster(2).addConfig("conf", configset("cloud-minimal")).configure();
try {
CloudSolrClient client = cluster.getSolrClient();
String COLLECTION_NAME = "test_coll";
CollectionAdminRequest.createCollection(COLLECTION_NAME, "conf", 2, 1)
.setRouterName("implicit")
.setShards("shard_1,shard_2")
.process(cluster.getSolrClient());
cluster.waitForActiveCollection(COLLECTION_NAME, 2, 2);
UpdateRequest ur = new UpdateRequest();
for (int i = 0; i < 100000; i++) {
SolrInputDocument doc = new SolrInputDocument();
doc.addField("id", "" + i);
final String s =
RandomStrings.randomAsciiLettersOfLengthBetween(random(), 10, 100)
.toLowerCase(Locale.ROOT);
doc.setField("subject_s", s);
doc.setField("_route_", "shard_1");
ur.add(doc);
}

// adding "abc" in each shard as we will have query *abc*
SolrInputDocument doc = new SolrInputDocument();
doc.addField("id", "" + 10000);
doc.setField("subject_s", "abc");
doc.setField("_route_", "shard_2");
ur.add(doc);

doc = new SolrInputDocument();
doc.addField("id", "" + 100001);
doc.setField("subject_s", "abc");
doc.setField("_route_", "shard_1");
ur.add(doc);

ur.commit(client, COLLECTION_NAME);

// warm up query
SolrQuery query = new SolrQuery();
query.setQuery("subject_s:*abcd*");
query.set(ShardParams.SHARDS_TOLERANT, "true");
QueryResponse response = client.query(COLLECTION_NAME, query);

query = new SolrQuery();
query.setQuery("subject_s:*abc*");
query.set(CommonParams.TIME_ALLOWED, 25);
query.set(ShardParams.SHARDS_TOLERANT, "true");
response = client.query(COLLECTION_NAME, query);
assertTrue(
"Should have found 1 doc (shard_2) as timeallowed is 25ms found:"
+ response.getResults().getNumFound(),
response.getResults().getNumFound() == 1);

query = new SolrQuery();
query.setQuery("subject_s:*abc*");
query.set(ShardParams.SHARDS_TOLERANT, "true");
response = client.query(COLLECTION_NAME, query);
assertTrue(
"Should have found few docs as timeallowed is unlimited ",
response.getResults().getNumFound() > 1);
} finally {
cluster.shutdown();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ public ShardResponse takeCompletedIncludingErrors() {
return null;
}

@Override
public ShardResponse takeCompletedIncludingErrorsWithTimeout(long maxAllowedTime) {
return null;
}

@Override
public ShardResponse takeCompletedOrError() {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,13 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.client.solrj.impl.LBSolrClient;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.core.CoreContainer;
import org.hamcrest.MatcherAssert;
import org.junit.AfterClass;
Expand Down Expand Up @@ -155,4 +158,83 @@ public void testLiveNodesToHostUrl() {
MatcherAssert.assertThat(hostSet, hasItem("1.2.3.4:9000"));
MatcherAssert.assertThat(hostSet, hasItem("1.2.3.4:9001"));
}

@Test
public void testHttpShardHandlerTimeout() {
HttpShardHandlerFactory httpShardHandlerFactory = new HttpShardHandlerFactory();
HttpShardHandler shardHandler = (HttpShardHandler) httpShardHandlerFactory.getShardHandler();

long startTime = System.nanoTime();
long timeAllowedInMillis = 120;
shardHandler.setPendingRequest(1);
ShardResponse shardResponse =
shardHandler.takeCompletedIncludingErrorsWithTimeout(timeAllowedInMillis);

assertNull(shardResponse);

long timeTakenInMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime);
assertTrue(
"Should have taken more than 100 milli seconds " + timeTakenInMillis,
timeTakenInMillis >= timeAllowedInMillis);
}

@Test
public void testHttpShardHandlerWithResponse() {
HttpShardHandlerFactory httpShardHandlerFactory = new HttpShardHandlerFactory();
HttpShardHandler shardHandler = (HttpShardHandler) httpShardHandlerFactory.getShardHandler();

long timeAllowedInMillis = -1;
// setting one pending request.
shardHandler.setPendingRequest(1);

ShardResponse shardResponse = new ShardResponse();
shardResponse.setShard("shard_1");
ShardRequest shardRequest = new ShardRequest();
// one shard
shardRequest.actualShards = new String[] {"shard_1"};
shardResponse.setShardRequest(shardRequest);

ExecutorService exec = ExecutorUtil.newMDCAwareCachedThreadPool("timeAllowedTest");
try {
// generating shardresponse for one shard
exec.submit(() -> shardHandler.setResponse(shardResponse));
} finally {
ExecutorUtil.shutdownAndAwaitTermination(exec);
}
ShardResponse gotResponse =
shardHandler.takeCompletedIncludingErrorsWithTimeout(timeAllowedInMillis);

assertEquals(shardResponse, gotResponse);
}

@Test
public void testHttpShardHandlerWithPartialResponse() {
HttpShardHandlerFactory httpShardHandlerFactory = new HttpShardHandlerFactory();
HttpShardHandler shardHandler = (HttpShardHandler) httpShardHandlerFactory.getShardHandler();

long timeAllowedInMillis = 100;
// setting two pending requests.
shardHandler.setPendingRequest(2);

ShardResponse shardResponse = new ShardResponse();
shardResponse.setShard("shard_1");
ShardRequest shardRequest = new ShardRequest();
// two shards
shardRequest.actualShards = new String[] {"shard_1", "shard_2"};
shardResponse.setShardRequest(shardRequest);

ExecutorService exec = ExecutorUtil.newMDCAwareCachedThreadPool("timeAllowedTest");
try {
// generating shardresponse for one shard only
exec.submit(() -> shardHandler.setResponse(shardResponse));
} finally {
ExecutorUtil.shutdownAndAwaitTermination(exec);
}

// partial response
ShardResponse gotResponse =
shardHandler.takeCompletedIncludingErrorsWithTimeout(timeAllowedInMillis);

assertEquals(shardResponse, gotResponse);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,11 @@ public ShardResponse takeCompletedIncludingErrors() {
return wrapped.takeCompletedIncludingErrors();
}

@Override
public ShardResponse takeCompletedIncludingErrorsWithTimeout(long maxAllowedTimeInMillis) {
return wrapped.takeCompletedIncludingErrors();
}

@Override
public ShardResponse takeCompletedOrError() {
return wrapped.takeCompletedOrError();
Expand Down
Loading