Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SAI-5162: Experimental downnode approach to use CoreContainer if available #232

Draft
wants to merge 3 commits into
base: fs/branch_9x
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 48 additions & 12 deletions solr/core/src/java/org/apache/solr/cloud/ZkController.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.lang.reflect.Array;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -2902,10 +2903,9 @@ public boolean checkIfCoreNodeNameAlreadyExists(CoreDescriptor dcore) {
*/
public Collection<String> publishNodeAsDown(String nodeName) {
log.info("Publish node={} as DOWN", nodeName);
Map<DocCollection, List<Replica>> replicasPerCollectionOnNode =
getReplicasPerCollectionOnThisNode();

ClusterState clusterState = getClusterState();
Map<String, List<Replica>> replicasPerCollectionOnNode =
clusterState.getReplicaNamesPerCollectionOnNode(nodeName);
if (distributedClusterStateUpdater.isDistributedStateUpdate()) {
// Note that with the current implementation, when distributed cluster state updates are
// enabled, we mark the node down synchronously from this thread, whereas the Overseer cluster
Expand All @@ -2917,20 +2917,19 @@ public Collection<String> publishNodeAsDown(String nodeName) {
} else {
try {
boolean sendToOverseer = false;
for (String collName : replicasPerCollectionOnNode.keySet()) {
DocCollection coll = null;
if (collName != null
&& (coll = zkStateReader.getCollection(collName)) != null
&& coll.isPerReplicaState()) {
for (DocCollection coll : replicasPerCollectionOnNode.keySet()) {
if (coll.isPerReplicaState()) {
PerReplicaStatesOps.downReplicas(
replicasPerCollectionOnNode.get(collName).stream()
replicasPerCollectionOnNode.get(coll).stream()
.map(Replica::getName)
.collect(Collectors.toList()),
PerReplicaStatesOps.fetch(
coll.getZNode(), zkClient, coll.getPerReplicaStates()))
.persist(coll.getZNode(), zkClient);
}
if (coll != null && !coll.isPerReplicaState()) {
} else {
// if this node contains any non PRS collection, then we need to notify the overseer as
// overseer
// manages the replica state for non PRS collections
sendToOverseer = true;
}
}
Expand Down Expand Up @@ -2958,7 +2957,44 @@ public Collection<String> publishNodeAsDown(String nodeName) {
log.warn("Could not publish node as down: ", e);
}
}
return replicasPerCollectionOnNode.keySet();
return replicasPerCollectionOnNode.keySet().stream()
.map(DocCollection::getName)
.collect(Collectors.toSet());
}

Map<DocCollection, List<Replica>> getReplicasPerCollectionOnThisNode() {
Map<DocCollection, List<Replica>> result = new HashMap<>();
if (cc.isStatusLoadComplete()) {
Set<String> processedCollections = new HashSet<>();
for (CoreDescriptor cd : cc.getCoreDescriptors()) {
String collName = cd.getCollectionName();
DocCollection coll;
if (collName != null
&& processedCollections.add(collName)
&& (coll = zkStateReader.getCollection(collName)) != null) {
final List<Replica> replicasOnThisNode = new ArrayList<>();
coll.forEachReplica(
(s, replica) -> {
if (replica.getNodeName().equals(nodeName)) {
replicasOnThisNode.add(replica);
}
});
result.put(coll, replicasOnThisNode);
}
}
} else {
getClusterState().getCollectionStates().values().stream()
.map(ClusterState.CollectionRef::get)
.filter(Objects::nonNull)
.forEach(
col -> {
List<Replica> replicas = col.getReplicas(nodeName);
if (replicas != null && !replicas.isEmpty()) {
result.put(col, replicas);
}
});
}
return result;
}

/**
Expand Down
11 changes: 8 additions & 3 deletions solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -394,10 +394,15 @@ public List<CoreDescriptor> getCoreDescriptors() {
zkController.getZkStateReader().forciblyRefreshAllClusterStateSlow();
ClusterState clusterState = zkController.getClusterState();

Map<String, List<Replica>> replicasOnNode =
clusterState.getReplicaNamesPerCollectionOnNode(nodeName);
Map<DocCollection, List<Replica>> replicasOnNode =
zkController.getReplicasPerCollectionOnThisNode();
assertNotNull("There should be replicas on the existing node", replicasOnNode);
List<Replica> replicas = replicasOnNode.get(collectionName);
List<Replica> replicas =
replicasOnNode.get(
replicasOnNode.keySet().stream()
.filter(docColl -> collectionName.equals(docColl.getName()))
.findFirst()
.get());
assertNotNull("There should be replicas for the collection on the existing node", replicas);
assertEquals(
"Wrong number of replicas for the collection on the existing node", 1, replicas.size());
Expand Down
16 changes: 0 additions & 16 deletions solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -211,21 +210,6 @@ public String getShardId(String collectionName, String nodeName, String coreName
return null;
}

public Map<String, List<Replica>> getReplicaNamesPerCollectionOnNode(final String nodeName) {
Map<String, List<Replica>> replicaNamesPerCollectionOnNode = new HashMap<>();
collectionStates.values().stream()
.map(CollectionRef::get)
.filter(Objects::nonNull)
.forEach(
col -> {
List<Replica> replicas = col.getReplicas(nodeName);
if (replicas != null && !replicas.isEmpty()) {
replicaNamesPerCollectionOnNode.put(col.getName(), replicas);
}
});
return replicaNamesPerCollectionOnNode;
}

/** Check if node is alive. */
public boolean liveNodesContain(String name) {
return liveNodes.contains(name);
Expand Down
Loading