diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java index 73d2c35da40..aa915892999 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java +++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java @@ -33,6 +33,7 @@ import java.lang.reflect.Array; import java.net.URLEncoder; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -2902,10 +2903,9 @@ public boolean checkIfCoreNodeNameAlreadyExists(CoreDescriptor dcore) { */ public Collection publishNodeAsDown(String nodeName) { log.info("Publish node={} as DOWN", nodeName); + Map> replicasPerCollectionOnNode = + getReplicasPerCollectionOnThisNode(); - ClusterState clusterState = getClusterState(); - Map> replicasPerCollectionOnNode = - clusterState.getReplicaNamesPerCollectionOnNode(nodeName); if (distributedClusterStateUpdater.isDistributedStateUpdate()) { // Note that with the current implementation, when distributed cluster state updates are // enabled, we mark the node down synchronously from this thread, whereas the Overseer cluster @@ -2917,20 +2917,19 @@ public Collection publishNodeAsDown(String nodeName) { } else { try { boolean sendToOverseer = false; - for (String collName : replicasPerCollectionOnNode.keySet()) { - DocCollection coll = null; - if (collName != null - && (coll = zkStateReader.getCollection(collName)) != null - && coll.isPerReplicaState()) { + for (DocCollection coll : replicasPerCollectionOnNode.keySet()) { + if (coll.isPerReplicaState()) { PerReplicaStatesOps.downReplicas( - replicasPerCollectionOnNode.get(collName).stream() + replicasPerCollectionOnNode.get(coll).stream() .map(Replica::getName) .collect(Collectors.toList()), PerReplicaStatesOps.fetch( coll.getZNode(), zkClient, coll.getPerReplicaStates())) .persist(coll.getZNode(), zkClient); - } - if (coll != null && !coll.isPerReplicaState()) { + } else { + // if this node contains any non PRS collection, then we need to notify the overseer as + // overseer + // manages the replica state for non PRS collections sendToOverseer = true; } } @@ -2958,7 +2957,44 @@ public Collection publishNodeAsDown(String nodeName) { log.warn("Could not publish node as down: ", e); } } - return replicasPerCollectionOnNode.keySet(); + return replicasPerCollectionOnNode.keySet().stream() + .map(DocCollection::getName) + .collect(Collectors.toSet()); + } + + Map> getReplicasPerCollectionOnThisNode() { + Map> result = new HashMap<>(); + if (cc.isStatusLoadComplete()) { + Set processedCollections = new HashSet<>(); + for (CoreDescriptor cd : cc.getCoreDescriptors()) { + String collName = cd.getCollectionName(); + DocCollection coll; + if (collName != null + && processedCollections.add(collName) + && (coll = zkStateReader.getCollection(collName)) != null) { + final List replicasOnThisNode = new ArrayList<>(); + coll.forEachReplica( + (s, replica) -> { + if (replica.getNodeName().equals(nodeName)) { + replicasOnThisNode.add(replica); + } + }); + result.put(coll, replicasOnThisNode); + } + } + } else { + getClusterState().getCollectionStates().values().stream() + .map(ClusterState.CollectionRef::get) + .filter(Objects::nonNull) + .forEach( + col -> { + List replicas = col.getReplicas(nodeName); + if (replicas != null && !replicas.isEmpty()) { + result.put(col, replicas); + } + }); + } + return result; } /** diff --git a/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java b/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java index 06ca5cb033e..58f331b2ce9 100644 --- a/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java @@ -394,10 +394,15 @@ public List getCoreDescriptors() { zkController.getZkStateReader().forciblyRefreshAllClusterStateSlow(); ClusterState clusterState = zkController.getClusterState(); - Map> replicasOnNode = - clusterState.getReplicaNamesPerCollectionOnNode(nodeName); + Map> replicasOnNode = + zkController.getReplicasPerCollectionOnThisNode(); assertNotNull("There should be replicas on the existing node", replicasOnNode); - List replicas = replicasOnNode.get(collectionName); + List replicas = + replicasOnNode.get( + replicasOnNode.keySet().stream() + .filter(docColl -> collectionName.equals(docColl.getName())) + .findFirst() + .get()); assertNotNull("There should be replicas for the collection on the existing node", replicas); assertEquals( "Wrong number of replicas for the collection on the existing node", 1, replicas.size()); diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java index 5f7a23f2ed0..5ee8faf9aca 100644 --- a/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java +++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java @@ -28,7 +28,6 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; @@ -211,21 +210,6 @@ public String getShardId(String collectionName, String nodeName, String coreName return null; } - public Map> getReplicaNamesPerCollectionOnNode(final String nodeName) { - Map> replicaNamesPerCollectionOnNode = new HashMap<>(); - collectionStates.values().stream() - .map(CollectionRef::get) - .filter(Objects::nonNull) - .forEach( - col -> { - List replicas = col.getReplicas(nodeName); - if (replicas != null && !replicas.isEmpty()) { - replicaNamesPerCollectionOnNode.put(col.getName(), replicas); - } - }); - return replicaNamesPerCollectionOnNode; - } - /** Check if node is alive. */ public boolean liveNodesContain(String name) { return liveNodes.contains(name);