diff --git a/solr/core/src/java/org/apache/solr/cloud/ActiveReplicaWatcher.java b/solr/core/src/java/org/apache/solr/cloud/ActiveReplicaWatcher.java index 4fe3963b7cd..cc495edabed 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ActiveReplicaWatcher.java +++ b/solr/core/src/java/org/apache/solr/cloud/ActiveReplicaWatcher.java @@ -44,8 +44,6 @@ public class ActiveReplicaWatcher implements CollectionStateWatcher { private final List solrCoreNames = new ArrayList<>(); private final List activeReplicas = new ArrayList<>(); - private int lastZkVersion = -1; - private SolrCloseableLatch latch; /** @@ -149,11 +147,6 @@ public synchronized boolean onStateChanged(Set liveNodes, DocCollection log.debug("-- already done, exiting..."); return true; } - if (collectionState.getZNodeVersion() == lastZkVersion) { - log.debug("-- spurious call with already seen zkVersion= {}, ignoring...", lastZkVersion); - return false; - } - lastZkVersion = collectionState.getZNodeVersion(); for (Slice slice : collectionState.getSlices()) { for (Replica replica : slice.getReplicas()) { diff --git a/solr/core/src/test/org/apache/solr/cloud/MoveReplicaTest.java b/solr/core/src/test/org/apache/solr/cloud/MoveReplicaTest.java index 265cbd4d1ea..ae91ad73822 100644 --- a/solr/core/src/test/org/apache/solr/cloud/MoveReplicaTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/MoveReplicaTest.java @@ -25,10 +25,9 @@ */ public class MoveReplicaTest extends AbstractMoveReplicaTestBase { - @Override @Test public void test() throws Exception { - super.test(); + super.test(random().nextBoolean()); } @Override diff --git a/solr/modules/hdfs/src/test/org/apache/solr/hdfs/cloud/MoveReplicaHdfsTest.java b/solr/modules/hdfs/src/test/org/apache/solr/hdfs/cloud/MoveReplicaHdfsTest.java index 3d504762802..b0951606cb3 100644 --- a/solr/modules/hdfs/src/test/org/apache/solr/hdfs/cloud/MoveReplicaHdfsTest.java +++ b/solr/modules/hdfs/src/test/org/apache/solr/hdfs/cloud/MoveReplicaHdfsTest.java @@ -65,8 +65,7 @@ protected String getConfigSet() { @Test public void testNormalMove() throws Exception { - inPlaceMove = false; - super.test(); + super.test(inPlaceMove = false); } @Test diff --git a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/ZkStateReader.java index 0e36734fdf1..881dd9efafd 100644 --- a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/ZkStateReader.java +++ b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/ZkStateReader.java @@ -1806,9 +1806,11 @@ public void waitForState( AtomicReference docCollection = new AtomicReference<>(); CollectionStateWatcher watcher = (n, c) -> { - docCollection.set(c); boolean matches = predicate.matches(n, c); - if (matches) latch.countDown(); + if (matches) { + docCollection.set(c); + latch.countDown(); + } return matches; }; diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/AbstractMoveReplicaTestBase.java b/solr/test-framework/src/java/org/apache/solr/cloud/AbstractMoveReplicaTestBase.java index 73baf2b8c84..802687226b7 100644 --- a/solr/test-framework/src/java/org/apache/solr/cloud/AbstractMoveReplicaTestBase.java +++ b/solr/test-framework/src/java/org/apache/solr/cloud/AbstractMoveReplicaTestBase.java @@ -17,14 +17,23 @@ package org.apache.solr.cloud; +import java.io.Closeable; import java.io.IOException; import java.lang.invoke.MethodHandles; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Random; import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.solr.client.solrj.SolrClient; import org.apache.solr.client.solrj.SolrQuery; import org.apache.solr.client.solrj.SolrServerException; @@ -38,6 +47,7 @@ import org.apache.solr.common.cloud.Replica; import org.apache.solr.common.cloud.Slice; import org.apache.solr.common.cloud.ZkStateReader; +import org.apache.solr.common.util.ExecutorUtil; import org.apache.solr.common.util.NamedList; import org.apache.solr.embedded.JettySolrRunner; import org.apache.solr.util.IdUtils; @@ -101,25 +111,83 @@ public void afterTest() throws Exception { } } - @Test - public void test() throws Exception { + private interface IndexFunction { + int accept(int docId) throws IOException, SolrServerException; + } + + private static boolean index(IndexFunction func, int docId, long retryDuration) + throws InterruptedException, IOException, SolrServerException { + try { + func.accept(docId); + return true; + } catch (CloudSolrClient.RouteException e) { + // we know the replica just moved back, so retry + } + long start = System.nanoTime(); + boolean success = false; + do { + Thread.sleep(100); + try { + func.accept(docId); + success = true; + } catch (CloudSolrClient.RouteException e) { + // we know the replica just moved back, so retry + } + } while (!success && System.nanoTime() - start < retryDuration); + return success; + } + + private static int validateNumFound(IndexFunction func, int expectCount, long retryDuration) + throws InterruptedException, SolrServerException, IOException { + int actual = -1; + try { + actual = func.accept(-1); + if (retryDuration == 0 || actual == expectCount) { + return actual; + } + } catch (CloudSolrClient.RouteException e) { + // we know the replica just moved back, so retry + } + long start = System.nanoTime(); + do { + Thread.sleep(100); + try { + actual = func.accept(-1); + } catch (CloudSolrClient.RouteException e) { + // we know the replica just moved back, so retry + } + } while (actual != expectCount && System.nanoTime() - start < retryDuration); + return actual; + } + + @SuppressWarnings("try") + protected void test(boolean inPlaceMove) throws Exception { + this.inPlaceMove = inPlaceMove; String coll = getTestClass().getSimpleName() + "_coll_" + inPlaceMove; if (log.isInfoEnabled()) { log.info("total_jettys: {}", cluster.getJettySolrRunners().size()); } - int REPLICATION = 2; + int REPLICATION = random().nextInt(2) + 1; + int N_SHARDS = random().nextInt(3) + 2; CloudSolrClient cloudClient = cluster.getSolrClient(); // random create tlog or pull type replicas with nrt - boolean isTlog = random().nextBoolean(); + int[] replicaTypeCounts = new int[] {1, 0, 0}; + if (REPLICATION > 1) { + replicaTypeCounts[random().nextInt(replicaTypeCounts.length)]++; + } CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection( - coll, "conf1", 2, 1, isTlog ? 1 : 0, !isTlog ? 1 : 0); + coll, + "conf1", + N_SHARDS, + replicaTypeCounts[0], + replicaTypeCounts[1], + replicaTypeCounts[2]); + create.setPerReplicaState(random().nextBoolean()); cloudClient.request(create); - addDocs(coll, 100); - Replica replica = getRandomReplica(coll, cloudClient); Set liveNodes = cloudClient.getClusterState().getLiveNodes(); ArrayList l = new ArrayList<>(liveNodes); @@ -147,112 +215,205 @@ public void test() throws Exception { createMoveReplicaRequest(coll, replica, targetNode); moveReplica.setInPlaceMove(inPlaceMove); String asyncId = IdUtils.randomId(); - moveReplica.processAsync(asyncId, cloudClient); - CollectionAdminRequest.RequestStatus requestStatus = - CollectionAdminRequest.requestStatus(asyncId); - // wait for async request success - boolean success = false; - for (int i = 0; i < 200; i++) { - CollectionAdminRequest.RequestStatusResponse rsp = requestStatus.process(cloudClient); - if (rsp.getRequestStatus() == RequestStatusState.COMPLETED) { - success = true; - break; + + ExecutorService background = ExecutorUtil.newMDCAwareCachedThreadPool("indexingPool"); + AtomicBoolean stop = new AtomicBoolean(false); + try (Closeable execClose = () -> ExecutorUtil.shutdownAndAwaitTermination(background); + Closeable stopBackground = () -> stop.set(true)) { + + AtomicBoolean pause = new AtomicBoolean(false); + AtomicInteger expectCount = new AtomicInteger(-1); + CountDownLatch[] cdl = new CountDownLatch[] {new CountDownLatch(2), new CountDownLatch(2)}; + + Random indexRandom = new Random(random().nextLong()); + IndexFunction asyncCommit = + (ignored) -> { + cloudClient.commit(coll, false, false); + return 0; + }; + IndexFunction syncCommit = + (ignored) -> { + cloudClient.commit(coll); + return 0; + }; + IndexFunction indexDoc = + (docId) -> { + SolrInputDocument doc = new SolrInputDocument(); + doc.addField("id", docId); + cloudClient.add(coll, doc); + return docId; + }; + long retryDuration = TimeUnit.SECONDS.toNanos(10); + Future indexing = + background.submit( + () -> { + int docId = 0; + try { + while (!stop.get()) { + assertTrue(index(indexDoc, docId++, retryDuration)); + if (indexRandom.nextInt(10) == 0) { + assertTrue(index(asyncCommit, -1, retryDuration)); + } + if (pause.get()) { + pause.set(false); + expectCount.set(docId); + assertTrue(index(syncCommit, -1, retryDuration)); + cdl[0].countDown(); // commit complete + cdl[1].countDown(); // wait before proceeding + cdl[1].await(); + } + } + } catch (Throwable t) { + cdl[0].countDown(); + cdl[1].countDown(); + throw t; + } + expectCount.set(docId); + assertTrue(index(syncCommit, -1, retryDuration)); + return null; + }); + + Thread.sleep(1000); // allow some docs in the index + + Future querying = + background.submit( + () -> { + while (!stop.get()) { + try { + assertTrue( + cloudClient.query(coll, new SolrQuery("*:*")).getResults().getNumFound() + > 0); + } catch (Exception e) { + // we know there might be exceptions, but this is just to generate load. + } + Thread.sleep(100); + } + return null; + }); + + moveReplica.processAsync(asyncId, cloudClient); + CollectionAdminRequest.RequestStatus requestStatus = + CollectionAdminRequest.requestStatus(asyncId); + // wait for async request success + boolean success = false; + for (int i = 0; i < 200; i++) { + CollectionAdminRequest.RequestStatusResponse rsp = requestStatus.process(cloudClient); + if (rsp.getRequestStatus() == RequestStatusState.COMPLETED) { + success = true; + break; + } + assertNotSame(rsp.getRequestStatus(), RequestStatusState.FAILED); + Thread.sleep(500); } - assertNotSame(rsp.getRequestStatus(), RequestStatusState.FAILED); - Thread.sleep(500); - } - assertTrue(success); - assertEquals( - "should be one less core on the source node!", - sourceNumCores - 1, - getNumOfCores(cloudClient, replica.getNodeName(), coll, replica.getType().name())); - assertEquals( - "should be one more core on target node!", - targetNumCores + 1, - getNumOfCores(cloudClient, targetNode, coll, replica.getType().name())); - // wait for recovery - boolean recovered = false; - for (int i = 0; i < 300; i++) { - DocCollection collState = getCollectionState(coll); - log.debug("###### {}", collState); - Collection replicas = collState.getSlice(shardId).getReplicas(); - boolean allActive = true; - boolean hasLeaders = true; - if (replicas != null && !replicas.isEmpty()) { - for (Replica r : replicas) { - if (!r.getNodeName().equals(targetNode)) { - continue; + assertTrue(success); + assertEquals( + "should be one less core on the source node!", + sourceNumCores - 1, + getNumOfCores(cloudClient, replica.getNodeName(), coll, replica.getType().name())); + assertEquals( + "should be one more core on target node!", + targetNumCores + 1, + getNumOfCores(cloudClient, targetNode, coll, replica.getType().name())); + // wait for recovery + boolean recovered = false; + for (int i = 0; i < 300; i++) { + DocCollection collState = getCollectionState(coll); + log.debug("###### {}", collState); + Collection replicas = collState.getSlice(shardId).getReplicas(); + boolean allActive = true; + boolean hasLeaders = true; + if (replicas != null && !replicas.isEmpty()) { + for (Replica r : replicas) { + if (!r.getNodeName().equals(targetNode)) { + continue; + } + if (!r.isActive(Collections.singleton(targetNode))) { + log.info("Not active: {}", r); + allActive = false; + } } - if (!r.isActive(Collections.singleton(targetNode))) { - log.info("Not active: {}", r); - allActive = false; + } else { + allActive = false; + } + for (Slice slice : collState.getSlices()) { + if (slice.getLeader() == null) { + hasLeaders = false; } } - } else { - allActive = false; - } - for (Slice slice : collState.getSlices()) { - if (slice.getLeader() == null) { - hasLeaders = false; + if (allActive && hasLeaders) { + // check the number of active replicas + assertEquals("total number of replicas", REPLICATION, replicas.size()); + recovered = true; + break; + } else { + log.info("--- waiting, allActive={}, hasLeaders={}", allActive, hasLeaders); + Thread.sleep(1000); } } - if (allActive && hasLeaders) { - // check the number of active replicas - assertEquals("total number of replicas", REPLICATION, replicas.size()); - recovered = true; - break; - } else { - log.info("--- waiting, allActive={}, hasLeaders={}", allActive, hasLeaders); - Thread.sleep(1000); - } - } - assertTrue("replica never fully recovered", recovered); - - assertEquals( - 100, cluster.getSolrClient().query(coll, new SolrQuery("*:*")).getResults().getNumFound()); - - moveReplica = createMoveReplicaRequest(coll, replica, targetNode, shardId); - moveReplica.setInPlaceMove(inPlaceMove); - moveReplica.process(cloudClient); - checkNumOfCores(cloudClient, replica.getNodeName(), coll, sourceNumCores); - // wait for recovery - recovered = false; - for (int i = 0; i < 300; i++) { - DocCollection collState = getCollectionState(coll); - log.debug("###### {}", collState); - Collection replicas = collState.getSlice(shardId).getReplicas(); - boolean allActive = true; - boolean hasLeaders = true; - if (replicas != null && !replicas.isEmpty()) { - for (Replica r : replicas) { - if (!r.getNodeName().equals(replica.getNodeName())) { - continue; + assertTrue("replica never fully recovered", recovered); + + pause.set(true); + cdl[0].countDown(); + cdl[0].await(); + IndexFunction getNumFound = + (ignored) -> { + return (int) cloudClient.query(coll, new SolrQuery("*:*")).getResults().getNumFound(); + }; + boolean hasNonNrt = replicaTypeCounts[1] > 0 || replicaTypeCounts[2] > 0; + long numFoundRetryDuration = hasNonNrt ? retryDuration : 0; + int expect = expectCount.get(); + int actualNumFound = validateNumFound(getNumFound, expect, numFoundRetryDuration); + assertEquals( + "midpoint count wrong " + Arrays.toString(replicaTypeCounts), expect, actualNumFound); + cdl[1].countDown(); + + moveReplica = createMoveReplicaRequest(coll, replica, targetNode, shardId); + moveReplica.setInPlaceMove(inPlaceMove); + moveReplica.process(cloudClient); + checkNumOfCores(cloudClient, replica.getNodeName(), coll, sourceNumCores); + // wait for recovery + recovered = false; + for (int i = 0; i < 300; i++) { + DocCollection collState = getCollectionState(coll); + log.debug("###### {}", collState); + Collection replicas = collState.getSlice(shardId).getReplicas(); + boolean allActive = true; + boolean hasLeaders = true; + if (replicas != null && !replicas.isEmpty()) { + for (Replica r : replicas) { + if (!r.getNodeName().equals(replica.getNodeName())) { + continue; + } + if (!r.isActive(Collections.singleton(replica.getNodeName()))) { + log.info("Not active yet: {}", r); + allActive = false; + } } - if (!r.isActive(Collections.singleton(replica.getNodeName()))) { - log.info("Not active yet: {}", r); - allActive = false; + } else { + allActive = false; + } + for (Slice slice : collState.getSlices()) { + if (slice.getLeader() == null) { + hasLeaders = false; } } - } else { - allActive = false; - } - for (Slice slice : collState.getSlices()) { - if (slice.getLeader() == null) { - hasLeaders = false; + if (allActive && hasLeaders) { + assertEquals("total number of replicas", REPLICATION, replicas.size()); + recovered = true; + break; + } else { + Thread.sleep(1000); } } - if (allActive && hasLeaders) { - assertEquals("total number of replicas", REPLICATION, replicas.size()); - recovered = true; - break; - } else { - Thread.sleep(1000); - } + assertTrue("replica never fully recovered", recovered); + + stop.set(true); + indexing.get(); + expect = expectCount.get(); + actualNumFound = validateNumFound(getNumFound, expect, numFoundRetryDuration); + assertEquals("final count wrong", expect, actualNumFound); + querying.get(); } - assertTrue("replica never fully recovered", recovered); - - assertEquals( - 100, cluster.getSolrClient().query(coll, new SolrQuery("*:*")).getResults().getNumFound()); } @Test