diff --git a/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java b/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java index 0ad76294335..652dd0bc3d7 100644 --- a/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java +++ b/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java @@ -223,7 +223,12 @@ protected ShardResponse transfomResponse( */ @Override public ShardResponse takeCompletedIncludingErrors() { - return take(false); + return take(false, -1); + } + + @Override + public ShardResponse takeCompletedIncludingErrorsWithTimeout(long maxAllowedTimeInMillis) { + return take(false, maxAllowedTimeInMillis); } /** @@ -232,13 +237,23 @@ public ShardResponse takeCompletedIncludingErrors() { */ @Override public ShardResponse takeCompletedOrError() { - return take(true); + return take(true, -1); } - private ShardResponse take(boolean bailOnError) { + private ShardResponse take(boolean bailOnError, long maxAllowedTimeInMillis) { try { + long deadline = System.nanoTime(); + if (maxAllowedTimeInMillis > 0) { + deadline += TimeUnit.MILLISECONDS.toNanos(maxAllowedTimeInMillis); + } else { + deadline = System.nanoTime() + TimeUnit.DAYS.toNanos(1); + } + + ShardResponse previousResponse = null; while (pending.get() > 0) { - ShardResponse rsp = responses.take(); + long waitTime = deadline - System.nanoTime(); + ShardResponse rsp = responses.poll(waitTime, TimeUnit.NANOSECONDS); + if (rsp == null) return previousResponse; responseCancellableMap.remove(rsp); pending.decrementAndGet(); @@ -249,6 +264,7 @@ private ShardResponse take(boolean bailOnError) { // for a request was received. Otherwise we might return the same // request more than once. rsp.getShardRequest().responses.add(rsp); + previousResponse = rsp; if (rsp.getShardRequest().responses.size() == rsp.getShardRequest().actualShards.length) { return rsp; } @@ -403,4 +419,14 @@ private boolean canShortCircuit( public ShardHandlerFactory getShardHandlerFactory() { return httpShardHandlerFactory; } + + // test helper function + void setPendingRequest(int val) { + this.pending.set(val); + } + + // test helper function + void setResponse(ShardResponse shardResponse) { + this.responses.add(shardResponse); + } } diff --git a/solr/core/src/java/org/apache/solr/handler/component/SearchHandler.java b/solr/core/src/java/org/apache/solr/handler/component/SearchHandler.java index 7470852eaa2..c2aeeb231b2 100644 --- a/solr/core/src/java/org/apache/solr/handler/component/SearchHandler.java +++ b/solr/core/src/java/org/apache/solr/handler/component/SearchHandler.java @@ -529,6 +529,7 @@ public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throw } } else { // a distributed request + long maxTimeAllowed = req.getParams().getLong(CommonParams.TIME_ALLOWED, -1); if (rb.outgoing == null) { rb.outgoing = new ArrayList<>(); @@ -593,7 +594,7 @@ public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throw while (rb.outgoing.size() == 0) { ShardResponse srsp = tolerant - ? shardHandler1.takeCompletedIncludingErrors() + ? shardHandler1.takeCompletedIncludingErrorsWithTimeout(maxTimeAllowed) : shardHandler1.takeCompletedOrError(); if (srsp == null) break; // no more requests to wait for diff --git a/solr/core/src/java/org/apache/solr/handler/component/ShardHandler.java b/solr/core/src/java/org/apache/solr/handler/component/ShardHandler.java index d56ad24d7c5..ebeae4ccf22 100644 --- a/solr/core/src/java/org/apache/solr/handler/component/ShardHandler.java +++ b/solr/core/src/java/org/apache/solr/handler/component/ShardHandler.java @@ -25,6 +25,9 @@ public abstract class ShardHandler { public abstract ShardResponse takeCompletedIncludingErrors(); + public abstract ShardResponse takeCompletedIncludingErrorsWithTimeout( + long maxAllowedTimeInMillis); + public abstract ShardResponse takeCompletedOrError(); public abstract void cancelAll(); diff --git a/solr/core/src/test/org/apache/solr/TestTimeAllowedSearch.java b/solr/core/src/test/org/apache/solr/TestTimeAllowedSearch.java new file mode 100644 index 00000000000..580c603711a --- /dev/null +++ b/solr/core/src/test/org/apache/solr/TestTimeAllowedSearch.java @@ -0,0 +1,90 @@ +package org.apache.solr; + +import com.carrotsearch.randomizedtesting.generators.RandomStrings; +import java.util.Locale; +import org.apache.solr.client.solrj.SolrQuery; +import org.apache.solr.client.solrj.impl.CloudSolrClient; +import org.apache.solr.client.solrj.request.CollectionAdminRequest; +import org.apache.solr.client.solrj.request.UpdateRequest; +import org.apache.solr.client.solrj.response.QueryResponse; +import org.apache.solr.cloud.MiniSolrCloudCluster; +import org.apache.solr.cloud.SolrCloudTestCase; +import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.params.CommonParams; +import org.apache.solr.common.params.ShardParams; + +public class TestTimeAllowedSearch extends SolrCloudTestCase { + + /** + * This test demonstrates timeAllowed expectation at @{@link + * org.apache.solr.handler.component.HttpShardHandler} level This test creates collection with + * 'implicit` router, which has two shards shard_1 has 100000 docs, so that query should take some + * time shard_2 has only 1 doc to demonstrate the HttpSHardHandler timeout Then it execute + * substring query with TIME_ALLOWED 50, assuming this query will time out on shard_1 + */ + public void testTimeAllowed() throws Exception { + MiniSolrCloudCluster cluster = + configureCluster(2).addConfig("conf", configset("cloud-minimal")).configure(); + try { + CloudSolrClient client = cluster.getSolrClient(); + String COLLECTION_NAME = "test_coll"; + CollectionAdminRequest.createCollection(COLLECTION_NAME, "conf", 2, 1) + .setRouterName("implicit") + .setShards("shard_1,shard_2") + .process(cluster.getSolrClient()); + cluster.waitForActiveCollection(COLLECTION_NAME, 2, 2); + UpdateRequest ur = new UpdateRequest(); + for (int i = 0; i < 100000; i++) { + SolrInputDocument doc = new SolrInputDocument(); + doc.addField("id", "" + i); + final String s = + RandomStrings.randomAsciiLettersOfLengthBetween(random(), 10, 100) + .toLowerCase(Locale.ROOT); + doc.setField("subject_s", s); + doc.setField("_route_", "shard_1"); + ur.add(doc); + } + + // adding "abc" in each shard as we will have query *abc* + SolrInputDocument doc = new SolrInputDocument(); + doc.addField("id", "" + 10000); + doc.setField("subject_s", "abc"); + doc.setField("_route_", "shard_2"); + ur.add(doc); + + doc = new SolrInputDocument(); + doc.addField("id", "" + 100001); + doc.setField("subject_s", "abc"); + doc.setField("_route_", "shard_1"); + ur.add(doc); + + ur.commit(client, COLLECTION_NAME); + + // warm up query + SolrQuery query = new SolrQuery(); + query.setQuery("subject_s:*abcd*"); + query.set(ShardParams.SHARDS_TOLERANT, "true"); + QueryResponse response = client.query(COLLECTION_NAME, query); + + query = new SolrQuery(); + query.setQuery("subject_s:*abc*"); + query.set(CommonParams.TIME_ALLOWED, 25); + query.set(ShardParams.SHARDS_TOLERANT, "true"); + response = client.query(COLLECTION_NAME, query); + assertTrue( + "Should have found 1 doc (shard_2) as timeallowed is 25ms found:" + + response.getResults().getNumFound(), + response.getResults().getNumFound() == 1); + + query = new SolrQuery(); + query.setQuery("subject_s:*abc*"); + query.set(ShardParams.SHARDS_TOLERANT, "true"); + response = client.query(COLLECTION_NAME, query); + assertTrue( + "Should have found few docs as timeallowed is unlimited ", + response.getResults().getNumFound() > 1); + } finally { + cluster.shutdown(); + } + } +} diff --git a/solr/core/src/test/org/apache/solr/core/MockShardHandlerFactory.java b/solr/core/src/test/org/apache/solr/core/MockShardHandlerFactory.java index 299f751393e..c86afdb4ec3 100644 --- a/solr/core/src/test/org/apache/solr/core/MockShardHandlerFactory.java +++ b/solr/core/src/test/org/apache/solr/core/MockShardHandlerFactory.java @@ -48,6 +48,11 @@ public ShardResponse takeCompletedIncludingErrors() { return null; } + @Override + public ShardResponse takeCompletedIncludingErrorsWithTimeout(long maxAllowedTime) { + return null; + } + @Override public ShardResponse takeCompletedOrError() { return null; diff --git a/solr/core/src/test/org/apache/solr/handler/component/TestHttpShardHandlerFactory.java b/solr/core/src/test/org/apache/solr/handler/component/TestHttpShardHandlerFactory.java index 6c20bdcdf08..b28ec96ef0a 100644 --- a/solr/core/src/test/org/apache/solr/handler/component/TestHttpShardHandlerFactory.java +++ b/solr/core/src/test/org/apache/solr/handler/component/TestHttpShardHandlerFactory.java @@ -27,10 +27,13 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; import org.apache.solr.SolrTestCaseJ4; import org.apache.solr.client.solrj.impl.LBSolrClient; import org.apache.solr.client.solrj.request.QueryRequest; import org.apache.solr.common.cloud.ClusterState; +import org.apache.solr.common.util.ExecutorUtil; import org.apache.solr.core.CoreContainer; import org.hamcrest.MatcherAssert; import org.junit.AfterClass; @@ -155,4 +158,83 @@ public void testLiveNodesToHostUrl() { MatcherAssert.assertThat(hostSet, hasItem("1.2.3.4:9000")); MatcherAssert.assertThat(hostSet, hasItem("1.2.3.4:9001")); } + + @Test + public void testHttpShardHandlerTimeout() { + HttpShardHandlerFactory httpShardHandlerFactory = new HttpShardHandlerFactory(); + HttpShardHandler shardHandler = (HttpShardHandler) httpShardHandlerFactory.getShardHandler(); + + long startTime = System.nanoTime(); + long timeAllowedInMillis = 120; + shardHandler.setPendingRequest(1); + ShardResponse shardResponse = + shardHandler.takeCompletedIncludingErrorsWithTimeout(timeAllowedInMillis); + + assertNull(shardResponse); + + long timeTakenInMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime); + assertTrue( + "Should have taken more than 100 milli seconds " + timeTakenInMillis, + timeTakenInMillis >= timeAllowedInMillis); + } + + @Test + public void testHttpShardHandlerWithResponse() { + HttpShardHandlerFactory httpShardHandlerFactory = new HttpShardHandlerFactory(); + HttpShardHandler shardHandler = (HttpShardHandler) httpShardHandlerFactory.getShardHandler(); + + long timeAllowedInMillis = -1; + // setting one pending request. + shardHandler.setPendingRequest(1); + + ShardResponse shardResponse = new ShardResponse(); + shardResponse.setShard("shard_1"); + ShardRequest shardRequest = new ShardRequest(); + // one shard + shardRequest.actualShards = new String[] {"shard_1"}; + shardResponse.setShardRequest(shardRequest); + + ExecutorService exec = ExecutorUtil.newMDCAwareCachedThreadPool("timeAllowedTest"); + try { + // generating shardresponse for one shard + exec.submit(() -> shardHandler.setResponse(shardResponse)); + } finally { + ExecutorUtil.shutdownAndAwaitTermination(exec); + } + ShardResponse gotResponse = + shardHandler.takeCompletedIncludingErrorsWithTimeout(timeAllowedInMillis); + + assertEquals(shardResponse, gotResponse); + } + + @Test + public void testHttpShardHandlerWithPartialResponse() { + HttpShardHandlerFactory httpShardHandlerFactory = new HttpShardHandlerFactory(); + HttpShardHandler shardHandler = (HttpShardHandler) httpShardHandlerFactory.getShardHandler(); + + long timeAllowedInMillis = 100; + // setting two pending requests. + shardHandler.setPendingRequest(2); + + ShardResponse shardResponse = new ShardResponse(); + shardResponse.setShard("shard_1"); + ShardRequest shardRequest = new ShardRequest(); + // two shards + shardRequest.actualShards = new String[] {"shard_1", "shard_2"}; + shardResponse.setShardRequest(shardRequest); + + ExecutorService exec = ExecutorUtil.newMDCAwareCachedThreadPool("timeAllowedTest"); + try { + // generating shardresponse for one shard only + exec.submit(() -> shardHandler.setResponse(shardResponse)); + } finally { + ExecutorUtil.shutdownAndAwaitTermination(exec); + } + + // partial response + ShardResponse gotResponse = + shardHandler.takeCompletedIncludingErrorsWithTimeout(timeAllowedInMillis); + + assertEquals(shardResponse, gotResponse); + } } diff --git a/solr/test-framework/src/java/org/apache/solr/handler/component/TrackingShardHandlerFactory.java b/solr/test-framework/src/java/org/apache/solr/handler/component/TrackingShardHandlerFactory.java index d7bfc961b77..2bd588a887a 100644 --- a/solr/test-framework/src/java/org/apache/solr/handler/component/TrackingShardHandlerFactory.java +++ b/solr/test-framework/src/java/org/apache/solr/handler/component/TrackingShardHandlerFactory.java @@ -103,6 +103,11 @@ public ShardResponse takeCompletedIncludingErrors() { return wrapped.takeCompletedIncludingErrors(); } + @Override + public ShardResponse takeCompletedIncludingErrorsWithTimeout(long maxAllowedTimeInMillis) { + return wrapped.takeCompletedIncludingErrors(); + } + @Override public ShardResponse takeCompletedOrError() { return wrapped.takeCompletedOrError();