diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java b/solr/core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java index c83a965d5a1..11cb258334f 100644 --- a/solr/core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java +++ b/solr/core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java @@ -154,6 +154,10 @@ public ZkWriteCommand setShardLeader(ClusterState clusterState, ZkNodeProps mess log.error("Could not mark shard leader for non existing collection: {}", collectionName); return ZkStateWriter.NO_OP; } + if (coll.isPerReplicaState()) { + log.debug("Do not mark shard leader for PRS collection: {}", collectionName); + return ZkStateWriter.NO_OP; + } Map slices = coll.getSlicesMap(); Slice slice = slices.get(sliceName); diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java b/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java index 72bb2879662..23189214ff8 100644 --- a/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java +++ b/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java @@ -226,7 +226,9 @@ private final void validate() { Objects.requireNonNull(this.collection, "'collection' must not be null"); Objects.requireNonNull(this.shard, "'shard' must not be null"); Objects.requireNonNull(this.type, "'type' must not be null"); - Objects.requireNonNull(this.state, "'state' must not be null"); + if (perReplicaStatesRef == null) { // PRS collection + Objects.requireNonNull(this.state, "'state' must not be null"); + } Objects.requireNonNull(this.node, "'node' must not be null"); String baseUrl = (String) propMap.get(ReplicaStateProps.BASE_URL); @@ -236,7 +238,9 @@ private final void validate() { propMap.put(ReplicaStateProps.NODE_NAME, node); propMap.put(ReplicaStateProps.CORE_NAME, core); propMap.put(ReplicaStateProps.TYPE, type.toString()); - propMap.put(ReplicaStateProps.STATE, state.toString()); + if (perReplicaStatesRef == null) { // PRS collection + propMap.put(ReplicaStateProps.STATE, state.toString()); + } } public String getCollection() { @@ -389,8 +393,12 @@ public void writeMap(MapWriter.EntryWriter ew) throws IOException { ew.putIfNotNull(ReplicaStateProps.CORE_NAME, core) .putIfNotNull(ReplicaStateProps.NODE_NAME, node) .putIfNotNull(ReplicaStateProps.TYPE, type.toString()) - .putIfNotNull(ReplicaStateProps.STATE, getState().toString()) - .putIfNotNull(ReplicaStateProps.LEADER, () -> isLeader() ? "true" : null) + .putIfNotNull( + ReplicaStateProps.STATE, + () -> perReplicaStatesRef == null ? getState().toString() : null) + .putIfNotNull( + ReplicaStateProps.LEADER, + () -> perReplicaStatesRef != null || !isLeader() ? null : "true") .putIfNotNull( ReplicaStateProps.FORCE_SET_STATE, propMap.get(ReplicaStateProps.FORCE_SET_STATE)) .putIfNotNull(ReplicaStateProps.BASE_URL, propMap.get(ReplicaStateProps.BASE_URL)); diff --git a/solr/solrj/src/test/org/apache/solr/common/cloud/PerReplicaStatesIntegrationTest.java b/solr/solrj/src/test/org/apache/solr/common/cloud/PerReplicaStatesIntegrationTest.java index 2edfe3b7d54..bf1d5031767 100644 --- a/solr/solrj/src/test/org/apache/solr/common/cloud/PerReplicaStatesIntegrationTest.java +++ b/solr/solrj/src/test/org/apache/solr/common/cloud/PerReplicaStatesIntegrationTest.java @@ -22,6 +22,7 @@ import java.lang.invoke.MethodHandles; import java.util.Collections; +import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.solr.client.solrj.SolrClient; @@ -31,6 +32,9 @@ import org.apache.solr.client.solrj.response.SolrPingResponse; import org.apache.solr.cloud.MiniSolrCloudCluster; import org.apache.solr.cloud.SolrCloudTestCase; +import org.apache.solr.common.MapWriterMap; +import org.apache.solr.common.NavigableObject; +import org.apache.solr.common.util.Utils; import org.apache.solr.embedded.JettySolrRunner; import org.apache.solr.util.LogLevel; import org.apache.zookeeper.data.Stat; @@ -40,6 +44,7 @@ /** This test would be faster if we simulated the zk state instead. */ @LogLevel( "org.apache.solr.common.cloud.ZkStateReader=DEBUG;" + + "org.apache.solr.cloud.overseer.ZkStateWriter=DEBUG;" + "org.apache.solr.handler.admin.CollectionsHandler=DEBUG;" + "org.apache.solr.common.cloud.PerReplicaStatesOps=DEBUG;" + "org.apache.solr.cloud.Overseer=INFO;" @@ -187,17 +192,24 @@ public void testRestart() throws Exception { collectionPath, SolrCloudTestCase.cluster.getZkClient(), null); PerReplicaStates.State st = prs.get(replicaName); assertNotEquals(Replica.State.ACTIVE, st.state); - CollectionAdminResponse rsp = - new CollectionAdminRequest.ClusterStatus() - .setCollectionName(testCollection) - .process(cluster.getSolrClient()); - assertEquals( - "true", + @SuppressWarnings("unchecked") + NavigableObject rsp = + new MapWriterMap( + (Map) + Utils.fromJSON( + SolrCloudTestCase.cluster + .getZkClient() + .getData( + DocCollection.getCollectionPath(testCollection), + null, + null, + true))); + + assertNull( rsp._get( "cluster/collections/prs_restart_test/shards/shard1/replicas/core_node2/leader", null)); - assertEquals( - "active", + assertNull( rsp._get( "cluster/collections/prs_restart_test/shards/shard1/replicas/core_node2/state", null)); @@ -205,8 +217,7 @@ public void testRestart() throws Exception { rsp._get( "cluster/collections/prs_restart_test/shards/shard1/replicas/core_node4/leader", null)); - assertEquals( - "down", + assertNull( rsp._get( "cluster/collections/prs_restart_test/shards/shard1/replicas/core_node4/state", null)); @@ -317,7 +328,8 @@ public void testZkNodeVersions() throws Exception { CollectionAdminRequest.createCollection(PRS_COLL, "conf", 10, 1) .setPerReplicaState(Boolean.TRUE) .process(cluster.getSolrClient()); - stat = cluster.getZkClient().exists(DocCollection.getCollectionPath(PRS_COLL), null, true); + String PRS_PATH = DocCollection.getCollectionPath(PRS_COLL); + stat = cluster.getZkClient().exists(PRS_PATH, null, true); // +1 after all replica are added with on state.json write to CreateCollectionCmd.setData() assertEquals(1, stat.getVersion()); // For each replica: @@ -332,7 +344,7 @@ public void testZkNodeVersions() throws Exception { CollectionAdminRequest.addReplicaToShard(PRS_COLL, "shard1") .process(cluster.getSolrClient()); cluster.waitForActiveCollection(PRS_COLL, 10, 11); - stat = cluster.getZkClient().exists(DocCollection.getCollectionPath(PRS_COLL), null, true); + stat = cluster.getZkClient().exists(PRS_PATH, null, true); // For the new replica: // +2 for state.json overseer writes, even though there's no longer PRS updates from // overseer, current code would still do a "TOUCH" on the PRS entry @@ -352,7 +364,7 @@ public void testZkNodeVersions() throws Exception { CollectionAdminRequest.deleteReplica(PRS_COLL, "shard1", addedReplica.getName()) .process(cluster.getSolrClient()); cluster.waitForActiveCollection(PRS_COLL, 10, 10); - stat = cluster.getZkClient().exists(DocCollection.getCollectionPath(PRS_COLL), null, true); + stat = cluster.getZkClient().exists(PRS_PATH, null, true); // For replica deletion // +1 for ZkController#unregister, which delete the PRS entry from data node // overseer, current code would still do a "TOUCH" on the PRS entry @@ -361,11 +373,49 @@ public void testZkNodeVersions() throws Exception { for (JettySolrRunner j : cluster.getJettySolrRunners()) { j.stop(); j.start(true); - stat = cluster.getZkClient().exists(DocCollection.getCollectionPath(PRS_COLL), null, true); + stat = cluster.getZkClient().exists(PRS_PATH, null, true); // ensure restart does not update the state.json, after addReplica/deleteReplica, 2 more // updates hence at version 3 on state.json version assertEquals(3, stat.getVersion()); } + + // test for leader election + Replica leader = + cluster.getZkStateReader().clusterState.getCollection(PRS_COLL).getLeader("shard2"); + + JettySolrRunner j2 = cluster.startJettySolrRunner(); + response = + CollectionAdminRequest.addReplicaToShard(PRS_COLL, "shard2") + .setNode(j2.getNodeName()) + .process(cluster.getSolrClient()); + + // wait for the new replica to be active + cluster.waitForActiveCollection(PRS_COLL, 10, 11); + stat = cluster.getZkClient().exists(PRS_PATH, null, true); + // +1 for a new replica + assertEquals(4, stat.getVersion()); + DocCollection c = cluster.getZkStateReader().getCollection(PRS_COLL); + Replica newreplica = c.getReplica((s, replica) -> replica.node.equals(j2.getNodeName())); + + // let's stop the old leader + JettySolrRunner oldJetty = cluster.getReplicaJetty(leader); + oldJetty.stop(); + + cluster + .getZkStateReader() + .waitForState( + PRS_COLL, + 10, + TimeUnit.SECONDS, + (liveNodes, collectionState) -> + PerReplicaStatesOps.fetch(PRS_PATH, cluster.getZkClient(), null) + .states + .get(newreplica.name) + .isLeader); + PerReplicaStates prs = PerReplicaStatesOps.fetch(PRS_PATH, cluster.getZkClient(), null); + stat = cluster.getZkClient().exists(PRS_PATH, null, true); + // the version should not have updated + assertEquals(4, stat.getVersion()); } finally { cluster.shutdown(); }