Skip to content

Commit

Permalink
Change to optimize node down replica loading. Use CoreContainer if po…
Browse files Browse the repository at this point in the history
…ssible
  • Loading branch information
patsonluk committed Oct 11, 2024
1 parent 8973d47 commit bec693f
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 27 deletions.
56 changes: 44 additions & 12 deletions solr/core/src/java/org/apache/solr/cloud/ZkController.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.lang.reflect.Array;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -2902,10 +2903,8 @@ public boolean checkIfCoreNodeNameAlreadyExists(CoreDescriptor dcore) {
*/
public Collection<String> publishNodeAsDown(String nodeName) {
log.info("Publish node={} as DOWN", nodeName);
Map<DocCollection, List<Replica>> replicasPerCollectionOnNode = getReplicasPerCollectionOnThisNode();

ClusterState clusterState = getClusterState();
Map<String, List<Replica>> replicasPerCollectionOnNode =
clusterState.getReplicaNamesPerCollectionOnNode(nodeName);
if (distributedClusterStateUpdater.isDistributedStateUpdate()) {
// Note that with the current implementation, when distributed cluster state updates are
// enabled, we mark the node down synchronously from this thread, whereas the Overseer cluster
Expand All @@ -2917,20 +2916,18 @@ public Collection<String> publishNodeAsDown(String nodeName) {
} else {
try {
boolean sendToOverseer = false;
for (String collName : replicasPerCollectionOnNode.keySet()) {
DocCollection coll = null;
if (collName != null
&& (coll = zkStateReader.getCollection(collName)) != null
&& coll.isPerReplicaState()) {
for (DocCollection coll : replicasPerCollectionOnNode.keySet()) {
if (coll.isPerReplicaState()) {
PerReplicaStatesOps.downReplicas(
replicasPerCollectionOnNode.get(collName).stream()
replicasPerCollectionOnNode.get(coll).stream()
.map(Replica::getName)
.collect(Collectors.toList()),
PerReplicaStatesOps.fetch(
coll.getZNode(), zkClient, coll.getPerReplicaStates()))
.persist(coll.getZNode(), zkClient);
}
if (coll != null && !coll.isPerReplicaState()) {
} else {
// if this node contains any non PRS collection, then we need to notify the overseer as overseer
// manages the replica state for non PRS collections
sendToOverseer = true;
}
}
Expand Down Expand Up @@ -2958,7 +2955,42 @@ public Collection<String> publishNodeAsDown(String nodeName) {
log.warn("Could not publish node as down: ", e);
}
}
return replicasPerCollectionOnNode.keySet();
return replicasPerCollectionOnNode.keySet().stream().map(DocCollection::getName).collect(Collectors.toSet());
}

private Map<DocCollection, List<Replica>> getReplicasPerCollectionOnThisNode() {
Map<DocCollection, List<Replica>> result = new HashMap<>();
if (cc.isStatusLoadComplete()) {
Set<String> processedCollections = new HashSet<>();
for (CoreDescriptor cd : cc.getCoreDescriptors()) {
String collName = cd.getCollectionName();
DocCollection coll;
if (collName != null
&& processedCollections.add(collName)
&& (coll = zkStateReader.getCollection(collName)) != null) {
final List<Replica> replicasOnThisNode = new ArrayList<>();
coll.forEachReplica(
(s, replica) -> {
if (replica.getNodeName().equals(nodeName)) {
replicasOnThisNode.add(replica);
}
});
result.put(coll, replicasOnThisNode);
}
}
} else {
getClusterState().getCollectionStates().values().stream()
.map(ClusterState.CollectionRef::get)
.filter(Objects::nonNull)
.forEach(
col -> {
List<Replica> replicas = col.getReplicas(nodeName);
if (replicas != null && !replicas.isEmpty()) {
result.put(col, replicas);
}
});
}
return result;
}

/**
Expand Down
15 changes: 0 additions & 15 deletions solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java
Original file line number Diff line number Diff line change
Expand Up @@ -211,21 +211,6 @@ public String getShardId(String collectionName, String nodeName, String coreName
return null;
}

public Map<String, List<Replica>> getReplicaNamesPerCollectionOnNode(final String nodeName) {
Map<String, List<Replica>> replicaNamesPerCollectionOnNode = new HashMap<>();
collectionStates.values().stream()
.map(CollectionRef::get)
.filter(Objects::nonNull)
.forEach(
col -> {
List<Replica> replicas = col.getReplicas(nodeName);
if (replicas != null && !replicas.isEmpty()) {
replicaNamesPerCollectionOnNode.put(col.getName(), replicas);
}
});
return replicaNamesPerCollectionOnNode;
}

/** Check if node is alive. */
public boolean liveNodesContain(String name) {
return liveNodes.contains(name);
Expand Down

0 comments on commit bec693f

Please sign in to comment.