Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SOLR-17272: PerReplicaState: Replica "state" and "leader" are still i… #205

Open
wants to merge 2 commits into
base: fs/branch_9_3
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,10 @@ public ZkWriteCommand setShardLeader(ClusterState clusterState, ZkNodeProps mess
log.error("Could not mark shard leader for non existing collection: {}", collectionName);
return ZkStateWriter.NO_OP;
}
if (coll.isPerReplicaState()) {
log.debug("Do not mark shard leader for PRS collection: {}", collectionName);
return ZkStateWriter.NO_OP;
}

Map<String, Slice> slices = coll.getSlicesMap();
Slice slice = slices.get(sliceName);
Expand Down
16 changes: 12 additions & 4 deletions solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,9 @@ private final void validate() {
Objects.requireNonNull(this.collection, "'collection' must not be null");
Objects.requireNonNull(this.shard, "'shard' must not be null");
Objects.requireNonNull(this.type, "'type' must not be null");
Objects.requireNonNull(this.state, "'state' must not be null");
if (perReplicaStatesRef == null) { // PRS collection
Objects.requireNonNull(this.state, "'state' must not be null");
}
Objects.requireNonNull(this.node, "'node' must not be null");

String baseUrl = (String) propMap.get(ReplicaStateProps.BASE_URL);
Expand All @@ -236,7 +238,9 @@ private final void validate() {
propMap.put(ReplicaStateProps.NODE_NAME, node);
propMap.put(ReplicaStateProps.CORE_NAME, core);
propMap.put(ReplicaStateProps.TYPE, type.toString());
propMap.put(ReplicaStateProps.STATE, state.toString());
if (perReplicaStatesRef == null) { // PRS collection
propMap.put(ReplicaStateProps.STATE, state.toString());
}
}

public String getCollection() {
Expand Down Expand Up @@ -389,8 +393,12 @@ public void writeMap(MapWriter.EntryWriter ew) throws IOException {
ew.putIfNotNull(ReplicaStateProps.CORE_NAME, core)
.putIfNotNull(ReplicaStateProps.NODE_NAME, node)
.putIfNotNull(ReplicaStateProps.TYPE, type.toString())
.putIfNotNull(ReplicaStateProps.STATE, getState().toString())
.putIfNotNull(ReplicaStateProps.LEADER, () -> isLeader() ? "true" : null)
.putIfNotNull(
ReplicaStateProps.STATE,
() -> perReplicaStatesRef == null ? getState().toString() : null)
.putIfNotNull(
ReplicaStateProps.LEADER,
() -> perReplicaStatesRef != null || !isLeader() ? null : "true")
.putIfNotNull(
ReplicaStateProps.FORCE_SET_STATE, propMap.get(ReplicaStateProps.FORCE_SET_STATE))
.putIfNotNull(ReplicaStateProps.BASE_URL, propMap.get(ReplicaStateProps.BASE_URL));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import java.lang.invoke.MethodHandles;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.solr.client.solrj.SolrClient;
Expand All @@ -31,6 +32,9 @@
import org.apache.solr.client.solrj.response.SolrPingResponse;
import org.apache.solr.cloud.MiniSolrCloudCluster;
import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.common.MapWriterMap;
import org.apache.solr.common.NavigableObject;
import org.apache.solr.common.util.Utils;
import org.apache.solr.embedded.JettySolrRunner;
import org.apache.solr.util.LogLevel;
import org.apache.zookeeper.data.Stat;
Expand All @@ -40,6 +44,7 @@
/** This test would be faster if we simulated the zk state instead. */
@LogLevel(
"org.apache.solr.common.cloud.ZkStateReader=DEBUG;"
+ "org.apache.solr.cloud.overseer.ZkStateWriter=DEBUG;"
+ "org.apache.solr.handler.admin.CollectionsHandler=DEBUG;"
+ "org.apache.solr.common.cloud.PerReplicaStatesOps=DEBUG;"
+ "org.apache.solr.cloud.Overseer=INFO;"
Expand Down Expand Up @@ -187,26 +192,32 @@ public void testRestart() throws Exception {
collectionPath, SolrCloudTestCase.cluster.getZkClient(), null);
PerReplicaStates.State st = prs.get(replicaName);
assertNotEquals(Replica.State.ACTIVE, st.state);
CollectionAdminResponse rsp =
new CollectionAdminRequest.ClusterStatus()
.setCollectionName(testCollection)
.process(cluster.getSolrClient());
assertEquals(
"true",
@SuppressWarnings("unchecked")
NavigableObject rsp =
new MapWriterMap(
(Map<String, Object>)
Utils.fromJSON(
SolrCloudTestCase.cluster
.getZkClient()
.getData(
DocCollection.getCollectionPath(testCollection),
null,
null,
true)));

assertNull(
rsp._get(
"cluster/collections/prs_restart_test/shards/shard1/replicas/core_node2/leader",
null));
assertEquals(
"active",
assertNull(
rsp._get(
"cluster/collections/prs_restart_test/shards/shard1/replicas/core_node2/state",
null));
assertNull(
rsp._get(
"cluster/collections/prs_restart_test/shards/shard1/replicas/core_node4/leader",
null));
assertEquals(
"down",
assertNull(
rsp._get(
"cluster/collections/prs_restart_test/shards/shard1/replicas/core_node4/state",
null));
Expand Down Expand Up @@ -317,7 +328,8 @@ public void testZkNodeVersions() throws Exception {
CollectionAdminRequest.createCollection(PRS_COLL, "conf", 10, 1)
.setPerReplicaState(Boolean.TRUE)
.process(cluster.getSolrClient());
stat = cluster.getZkClient().exists(DocCollection.getCollectionPath(PRS_COLL), null, true);
String PRS_PATH = DocCollection.getCollectionPath(PRS_COLL);
stat = cluster.getZkClient().exists(PRS_PATH, null, true);
// +1 after all replica are added with on state.json write to CreateCollectionCmd.setData()
assertEquals(1, stat.getVersion());
// For each replica:
Expand All @@ -332,7 +344,7 @@ public void testZkNodeVersions() throws Exception {
CollectionAdminRequest.addReplicaToShard(PRS_COLL, "shard1")
.process(cluster.getSolrClient());
cluster.waitForActiveCollection(PRS_COLL, 10, 11);
stat = cluster.getZkClient().exists(DocCollection.getCollectionPath(PRS_COLL), null, true);
stat = cluster.getZkClient().exists(PRS_PATH, null, true);
// For the new replica:
// +2 for state.json overseer writes, even though there's no longer PRS updates from
// overseer, current code would still do a "TOUCH" on the PRS entry
Expand All @@ -352,7 +364,7 @@ public void testZkNodeVersions() throws Exception {
CollectionAdminRequest.deleteReplica(PRS_COLL, "shard1", addedReplica.getName())
.process(cluster.getSolrClient());
cluster.waitForActiveCollection(PRS_COLL, 10, 10);
stat = cluster.getZkClient().exists(DocCollection.getCollectionPath(PRS_COLL), null, true);
stat = cluster.getZkClient().exists(PRS_PATH, null, true);
// For replica deletion
// +1 for ZkController#unregister, which delete the PRS entry from data node
// overseer, current code would still do a "TOUCH" on the PRS entry
Expand All @@ -361,11 +373,49 @@ public void testZkNodeVersions() throws Exception {
for (JettySolrRunner j : cluster.getJettySolrRunners()) {
j.stop();
j.start(true);
stat = cluster.getZkClient().exists(DocCollection.getCollectionPath(PRS_COLL), null, true);
stat = cluster.getZkClient().exists(PRS_PATH, null, true);
// ensure restart does not update the state.json, after addReplica/deleteReplica, 2 more
// updates hence at version 3 on state.json version
assertEquals(3, stat.getVersion());
}

// test for leader election
Replica leader =
cluster.getZkStateReader().clusterState.getCollection(PRS_COLL).getLeader("shard2");

JettySolrRunner j2 = cluster.startJettySolrRunner();
response =
CollectionAdminRequest.addReplicaToShard(PRS_COLL, "shard2")
.setNode(j2.getNodeName())
.process(cluster.getSolrClient());

// wait for the new replica to be active
cluster.waitForActiveCollection(PRS_COLL, 10, 11);
stat = cluster.getZkClient().exists(PRS_PATH, null, true);
// +1 for a new replica
assertEquals(4, stat.getVersion());
DocCollection c = cluster.getZkStateReader().getCollection(PRS_COLL);
Replica newreplica = c.getReplica((s, replica) -> replica.node.equals(j2.getNodeName()));

// let's stop the old leader
JettySolrRunner oldJetty = cluster.getReplicaJetty(leader);
oldJetty.stop();

cluster
.getZkStateReader()
.waitForState(
PRS_COLL,
10,
TimeUnit.SECONDS,
(liveNodes, collectionState) ->
PerReplicaStatesOps.fetch(PRS_PATH, cluster.getZkClient(), null)
.states
.get(newreplica.name)
.isLeader);
PerReplicaStates prs = PerReplicaStatesOps.fetch(PRS_PATH, cluster.getZkClient(), null);
stat = cluster.getZkClient().exists(PRS_PATH, null, true);
// the version should not have updated
assertEquals(4, stat.getVersion());
} finally {
cluster.shutdown();
}
Expand Down
Loading