From bec693f2bf6f2227362268a2bbc23c14eef44ac4 Mon Sep 17 00:00:00 2001 From: patsonluk Date: Thu, 10 Oct 2024 18:19:10 -0700 Subject: [PATCH 1/3] Change to optimize node down replica loading. Use CoreContainer if possible --- .../org/apache/solr/cloud/ZkController.java | 56 +++++++++++++++---- .../solr/common/cloud/ClusterState.java | 15 ----- 2 files changed, 44 insertions(+), 27 deletions(-) diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java index 73d2c35da40..5f33d01a370 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java +++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java @@ -33,6 +33,7 @@ import java.lang.reflect.Array; import java.net.URLEncoder; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -2902,10 +2903,8 @@ public boolean checkIfCoreNodeNameAlreadyExists(CoreDescriptor dcore) { */ public Collection publishNodeAsDown(String nodeName) { log.info("Publish node={} as DOWN", nodeName); + Map> replicasPerCollectionOnNode = getReplicasPerCollectionOnThisNode(); - ClusterState clusterState = getClusterState(); - Map> replicasPerCollectionOnNode = - clusterState.getReplicaNamesPerCollectionOnNode(nodeName); if (distributedClusterStateUpdater.isDistributedStateUpdate()) { // Note that with the current implementation, when distributed cluster state updates are // enabled, we mark the node down synchronously from this thread, whereas the Overseer cluster @@ -2917,20 +2916,18 @@ public Collection publishNodeAsDown(String nodeName) { } else { try { boolean sendToOverseer = false; - for (String collName : replicasPerCollectionOnNode.keySet()) { - DocCollection coll = null; - if (collName != null - && (coll = zkStateReader.getCollection(collName)) != null - && coll.isPerReplicaState()) { + for (DocCollection coll : replicasPerCollectionOnNode.keySet()) { + if (coll.isPerReplicaState()) { PerReplicaStatesOps.downReplicas( - replicasPerCollectionOnNode.get(collName).stream() + replicasPerCollectionOnNode.get(coll).stream() .map(Replica::getName) .collect(Collectors.toList()), PerReplicaStatesOps.fetch( coll.getZNode(), zkClient, coll.getPerReplicaStates())) .persist(coll.getZNode(), zkClient); - } - if (coll != null && !coll.isPerReplicaState()) { + } else { + // if this node contains any non PRS collection, then we need to notify the overseer as overseer + // manages the replica state for non PRS collections sendToOverseer = true; } } @@ -2958,7 +2955,42 @@ public Collection publishNodeAsDown(String nodeName) { log.warn("Could not publish node as down: ", e); } } - return replicasPerCollectionOnNode.keySet(); + return replicasPerCollectionOnNode.keySet().stream().map(DocCollection::getName).collect(Collectors.toSet()); + } + + private Map> getReplicasPerCollectionOnThisNode() { + Map> result = new HashMap<>(); + if (cc.isStatusLoadComplete()) { + Set processedCollections = new HashSet<>(); + for (CoreDescriptor cd : cc.getCoreDescriptors()) { + String collName = cd.getCollectionName(); + DocCollection coll; + if (collName != null + && processedCollections.add(collName) + && (coll = zkStateReader.getCollection(collName)) != null) { + final List replicasOnThisNode = new ArrayList<>(); + coll.forEachReplica( + (s, replica) -> { + if (replica.getNodeName().equals(nodeName)) { + replicasOnThisNode.add(replica); + } + }); + result.put(coll, replicasOnThisNode); + } + } + } else { + getClusterState().getCollectionStates().values().stream() + .map(ClusterState.CollectionRef::get) + .filter(Objects::nonNull) + .forEach( + col -> { + List replicas = col.getReplicas(nodeName); + if (replicas != null && !replicas.isEmpty()) { + result.put(col, replicas); + } + }); + } + return result; } /** diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java index 5f7a23f2ed0..b3e18dea862 100644 --- a/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java +++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java @@ -211,21 +211,6 @@ public String getShardId(String collectionName, String nodeName, String coreName return null; } - public Map> getReplicaNamesPerCollectionOnNode(final String nodeName) { - Map> replicaNamesPerCollectionOnNode = new HashMap<>(); - collectionStates.values().stream() - .map(CollectionRef::get) - .filter(Objects::nonNull) - .forEach( - col -> { - List replicas = col.getReplicas(nodeName); - if (replicas != null && !replicas.isEmpty()) { - replicaNamesPerCollectionOnNode.put(col.getName(), replicas); - } - }); - return replicaNamesPerCollectionOnNode; - } - /** Check if node is alive. */ public boolean liveNodesContain(String name) { return liveNodes.contains(name); From 003117a0759093dbe24a2925bdbe035eba1851d4 Mon Sep 17 00:00:00 2001 From: patsonluk Date: Fri, 11 Oct 2024 12:07:21 -0700 Subject: [PATCH 2/3] ./gradlew tidy --- .../org/apache/solr/cloud/ZkController.java | 42 ++++++++++--------- .../solr/common/cloud/ClusterState.java | 1 - 2 files changed, 23 insertions(+), 20 deletions(-) diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java index 5f33d01a370..32eaa418984 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java +++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java @@ -2903,7 +2903,8 @@ public boolean checkIfCoreNodeNameAlreadyExists(CoreDescriptor dcore) { */ public Collection publishNodeAsDown(String nodeName) { log.info("Publish node={} as DOWN", nodeName); - Map> replicasPerCollectionOnNode = getReplicasPerCollectionOnThisNode(); + Map> replicasPerCollectionOnNode = + getReplicasPerCollectionOnThisNode(); if (distributedClusterStateUpdater.isDistributedStateUpdate()) { // Note that with the current implementation, when distributed cluster state updates are @@ -2926,7 +2927,8 @@ public Collection publishNodeAsDown(String nodeName) { coll.getZNode(), zkClient, coll.getPerReplicaStates())) .persist(coll.getZNode(), zkClient); } else { - // if this node contains any non PRS collection, then we need to notify the overseer as overseer + // if this node contains any non PRS collection, then we need to notify the overseer as + // overseer // manages the replica state for non PRS collections sendToOverseer = true; } @@ -2955,7 +2957,9 @@ public Collection publishNodeAsDown(String nodeName) { log.warn("Could not publish node as down: ", e); } } - return replicasPerCollectionOnNode.keySet().stream().map(DocCollection::getName).collect(Collectors.toSet()); + return replicasPerCollectionOnNode.keySet().stream() + .map(DocCollection::getName) + .collect(Collectors.toSet()); } private Map> getReplicasPerCollectionOnThisNode() { @@ -2966,29 +2970,29 @@ private Map> getReplicasPerCollectionOnThisNode() { String collName = cd.getCollectionName(); DocCollection coll; if (collName != null - && processedCollections.add(collName) - && (coll = zkStateReader.getCollection(collName)) != null) { + && processedCollections.add(collName) + && (coll = zkStateReader.getCollection(collName)) != null) { final List replicasOnThisNode = new ArrayList<>(); coll.forEachReplica( - (s, replica) -> { - if (replica.getNodeName().equals(nodeName)) { - replicasOnThisNode.add(replica); - } - }); + (s, replica) -> { + if (replica.getNodeName().equals(nodeName)) { + replicasOnThisNode.add(replica); + } + }); result.put(coll, replicasOnThisNode); } } } else { getClusterState().getCollectionStates().values().stream() - .map(ClusterState.CollectionRef::get) - .filter(Objects::nonNull) - .forEach( - col -> { - List replicas = col.getReplicas(nodeName); - if (replicas != null && !replicas.isEmpty()) { - result.put(col, replicas); - } - }); + .map(ClusterState.CollectionRef::get) + .filter(Objects::nonNull) + .forEach( + col -> { + List replicas = col.getReplicas(nodeName); + if (replicas != null && !replicas.isEmpty()) { + result.put(col, replicas); + } + }); } return result; } diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java index b3e18dea862..5ee8faf9aca 100644 --- a/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java +++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java @@ -28,7 +28,6 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; From b10351505b4ae807abe5daa67e88cd74d0e53a55 Mon Sep 17 00:00:00 2001 From: patsonluk Date: Fri, 11 Oct 2024 12:50:56 -0700 Subject: [PATCH 3/3] Fixed unit test case --- .../src/java/org/apache/solr/cloud/ZkController.java | 2 +- .../test/org/apache/solr/cloud/ZkControllerTest.java | 11 ++++++++--- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java index 32eaa418984..aa915892999 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java +++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java @@ -2962,7 +2962,7 @@ public Collection publishNodeAsDown(String nodeName) { .collect(Collectors.toSet()); } - private Map> getReplicasPerCollectionOnThisNode() { + Map> getReplicasPerCollectionOnThisNode() { Map> result = new HashMap<>(); if (cc.isStatusLoadComplete()) { Set processedCollections = new HashSet<>(); diff --git a/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java b/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java index 06ca5cb033e..58f331b2ce9 100644 --- a/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java @@ -394,10 +394,15 @@ public List getCoreDescriptors() { zkController.getZkStateReader().forciblyRefreshAllClusterStateSlow(); ClusterState clusterState = zkController.getClusterState(); - Map> replicasOnNode = - clusterState.getReplicaNamesPerCollectionOnNode(nodeName); + Map> replicasOnNode = + zkController.getReplicasPerCollectionOnThisNode(); assertNotNull("There should be replicas on the existing node", replicasOnNode); - List replicas = replicasOnNode.get(collectionName); + List replicas = + replicasOnNode.get( + replicasOnNode.keySet().stream() + .filter(docColl -> collectionName.equals(docColl.getName())) + .findFirst() + .get()); assertNotNull("There should be replicas for the collection on the existing node", replicas); assertEquals( "Wrong number of replicas for the collection on the existing node", 1, replicas.size());