From ce4b59eca561af89dc6e3ca79c2b81c83f12b006 Mon Sep 17 00:00:00 2001 From: Mikhail Khludnev Date: Thu, 29 Jun 2023 16:03:59 +0300 Subject: [PATCH] SOLR-16837: introducing AffinityPlacementFactory.withCollectionShards (#1709) * SOLR-16837: Introducing AffinityPlacementFactory.withCollectionShards --- solr/CHANGES.txt | 3 + .../plugins/AffinityPlacementConfig.java | 36 ++++ .../plugins/AffinityPlacementFactory.java | 61 ++++-- .../plugins/OrderedNodePlacementPlugin.java | 23 ++ .../plugins/AffinityPlacementFactoryTest.java | 202 ++++++++++++++++++ .../pages/replica-placement-plugins.adoc | 16 +- .../placement/ClusterAbstractionsForTest.java | 5 + 7 files changed, 331 insertions(+), 15 deletions(-) diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 98db7c4d699..bc6668bc668 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -81,6 +81,9 @@ New Features * SOLR-16844: Added new parameter `backupConfigset` to collection Backup to optionally skip backing up configsets. (Tomás Fernández Löbbe) +* SOLR-16837: Introducing AffinityPlacementFactory.withCollectionShards to collocate corresponding shards of two collections. + eg primaryColl.shard1 will be placed to the node where secondaryColl.shard1 resides, etc. (Mikhail Khludnev) + Improvements --------------------- diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/plugins/AffinityPlacementConfig.java b/solr/core/src/java/org/apache/solr/cluster/placement/plugins/AffinityPlacementConfig.java index b561b1f24aa..2a5d2bdb4a6 100644 --- a/solr/core/src/java/org/apache/solr/cluster/placement/plugins/AffinityPlacementConfig.java +++ b/solr/core/src/java/org/apache/solr/cluster/placement/plugins/AffinityPlacementConfig.java @@ -17,9 +17,12 @@ package org.apache.solr.cluster.placement.plugins; +import java.util.ArrayList; +import java.util.Collections; import java.util.Map; import java.util.Objects; import org.apache.solr.cluster.placement.PlacementPluginConfig; +import org.apache.solr.common.SolrException; import org.apache.solr.common.annotation.JsonProperty; /** Configuration bean for {@link AffinityPlacementFactory}. */ @@ -103,6 +106,11 @@ public class AffinityPlacementConfig implements PlacementPluginConfig { * acceptable node types). */ @JsonProperty public Map collectionNodeType; + /** + * Same as {@link AffinityPlacementConfig#withCollection} but ensures shard to shard + * correspondence. should be disjoint with {@link AffinityPlacementConfig#withCollection}. + */ + @JsonProperty public Map withCollectionShards; /** * When this property is set to {@code true}, Solr will try to place replicas for the same shard @@ -161,12 +169,40 @@ public AffinityPlacementConfig( long minimalFreeDiskGB, long prioritizedFreeDiskGB, Map withCollection, + Map withCollectionShards, Map collectionNodeType) { this.minimalFreeDiskGB = minimalFreeDiskGB; this.prioritizedFreeDiskGB = prioritizedFreeDiskGB; Objects.requireNonNull(withCollection); + Objects.requireNonNull(withCollectionShards); Objects.requireNonNull(collectionNodeType); this.withCollection = withCollection; + this.withCollectionShards = withCollectionShards; this.collectionNodeType = collectionNodeType; } + + public AffinityPlacementConfig( + long minimalFreeDiskGB, + long prioritizedFreeDiskGB, + Map withCollection, + Map collectionNodeType) { + this( + minimalFreeDiskGB, + prioritizedFreeDiskGB, + withCollection, + Collections.emptyMap(), + collectionNodeType); + } + + public void validate() { + if (!Collections.disjoint(withCollection.keySet(), withCollectionShards.keySet())) { + final ArrayList collections = new ArrayList<>(withCollection.keySet()); + collections.retainAll(withCollectionShards.keySet()); + throw new SolrException( + SolrException.ErrorCode.BAD_REQUEST, + "withCollection and withCollectionShards should be disjoint. But there are " + + collections + + " in common."); + } + } } diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/plugins/AffinityPlacementFactory.java b/solr/core/src/java/org/apache/solr/cluster/placement/plugins/AffinityPlacementFactory.java index 80d2d43af96..74b2384f595 100644 --- a/solr/core/src/java/org/apache/solr/cluster/placement/plugins/AffinityPlacementFactory.java +++ b/solr/core/src/java/org/apache/solr/cluster/placement/plugins/AffinityPlacementFactory.java @@ -31,6 +31,7 @@ import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.solr.cluster.Cluster; import org.apache.solr.cluster.Node; import org.apache.solr.cluster.Replica; @@ -123,10 +124,12 @@ public AffinityPlacementFactory() {} @Override public PlacementPlugin createPluginInstance() { + config.validate(); return new AffinityPlacementPlugin( config.minimalFreeDiskGB, config.prioritizedFreeDiskGB, config.withCollection, + config.withCollectionShards, config.collectionNodeType, config.spreadAcrossDomains); } @@ -134,6 +137,7 @@ public PlacementPlugin createPluginInstance() { @Override public void configure(AffinityPlacementConfig cfg) { Objects.requireNonNull(cfg, "configuration must never be null"); + cfg.validate(); this.config = cfg; } @@ -154,7 +158,9 @@ static class AffinityPlacementPlugin extends OrderedNodePlacementPlugin { // primary to secondary (1:1) private final Map withCollections; - // secondary to primary (1:N) + // same but shardwise + private final Map withCollectionShards; + // secondary to primary (1:N) + shard-wise_primary (1:N) private final Map> collocatedWith; private final Map> nodeTypes; @@ -169,22 +175,28 @@ private AffinityPlacementPlugin( long minimalFreeDiskGB, long prioritizedFreeDiskGB, Map withCollections, + Map withCollectionShards, Map collectionNodeTypes, boolean spreadAcrossDomains) { this.minimalFreeDiskGB = minimalFreeDiskGB; this.prioritizedFreeDiskGB = prioritizedFreeDiskGB; Objects.requireNonNull(withCollections, "withCollections must not be null"); Objects.requireNonNull(collectionNodeTypes, "collectionNodeTypes must not be null"); + Objects.requireNonNull(withCollectionShards, "withCollectionShards must not be null"); this.spreadAcrossDomains = spreadAcrossDomains; this.withCollections = withCollections; - if (withCollections.isEmpty()) { - collocatedWith = Map.of(); - } else { - collocatedWith = new HashMap<>(); - withCollections.forEach( - (primary, secondary) -> - collocatedWith.computeIfAbsent(secondary, s -> new HashSet<>()).add(primary)); - } + this.withCollectionShards = withCollectionShards; + Map> collocated = new HashMap<>(); + // reverse both relations: shard-agnostic and shard-wise + List.of(this.withCollections, this.withCollectionShards) + .forEach( + direct -> + direct.forEach( + (primary, secondary) -> + collocated + .computeIfAbsent(secondary, s -> new HashSet<>()) + .add(primary))); + this.collocatedWith = Collections.unmodifiableMap(collocated); if (collectionNodeTypes.isEmpty()) { nodeTypes = Map.of(); @@ -521,6 +533,12 @@ public boolean canAddReplica(Replica replica) { && Optional.ofNullable(withCollections.get(collection)) .map(this::hasCollectionOnNode) .orElse(true) + // Ensure same shard is collocated if required + && Optional.ofNullable(withCollectionShards.get(collection)) + .map( + shardWiseOf -> + getShardsOnNode(shardWiseOf).contains(replica.getShard().getShardName())) + .orElse(true) // Ensure the disk space will not go below the minimum if the replica is added && (minimalFreeDiskGB <= 0 || nodeFreeDiskGB - getProjectedSizeOfReplica(replica) > minimalFreeDiskGB); @@ -547,6 +565,14 @@ public Map canRemoveReplicas(Collection replicas) { continue; } + Stream shardWiseCollocations = + collocatedCollections.stream() + .filter( + priColl -> collection.getName().equals(withCollectionShards.get(priColl))); + final Set mandatoryShardsOrAll = + shardWiseCollocations + .flatMap(priColl -> getShardsOnNode(priColl).stream()) + .collect(Collectors.toSet()); // There are collocatedCollections for this shard, so make sure there is a replica of this // shard left on the node after it is removed Set replicasRemovedForShard = @@ -555,11 +581,18 @@ public Map canRemoveReplicas(Collection replicas) { replica.getShard().getCollection().getName(), k -> new HashMap<>()) .computeIfAbsent(replica.getShard().getShardName(), k -> new HashSet<>()); replicasRemovedForShard.add(replica); - - if (replicasRemovedForShard.size() - >= getReplicasForShardOnNode(replica.getShard()).size()) { - replicaRemovalExceptions.put( - replica, "co-located with replicas of " + collocatedCollections); + // either if all shards are mandatory, or the current one is mandatory + boolean shardWise = false; + if (mandatoryShardsOrAll.isEmpty() + || (shardWise = mandatoryShardsOrAll.contains(replica.getShard().getShardName()))) { + if (replicasRemovedForShard.size() + >= getReplicasForShardOnNode(replica.getShard()).size()) { + replicaRemovalExceptions.put( + replica, + "co-located with replicas of " + + (shardWise ? replica.getShard().getShardName() + " of " : "") + + collocatedCollections); + } } } return replicaRemovalExceptions; diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/plugins/OrderedNodePlacementPlugin.java b/solr/core/src/java/org/apache/solr/cluster/placement/plugins/OrderedNodePlacementPlugin.java index a9d1f4ea048..0a3beff2706 100644 --- a/solr/core/src/java/org/apache/solr/cluster/placement/plugins/OrderedNodePlacementPlugin.java +++ b/solr/core/src/java/org/apache/solr/cluster/placement/plugins/OrderedNodePlacementPlugin.java @@ -613,6 +613,11 @@ public boolean equals(Object o) { } } } + + @Override + public String toString() { + return "WeightedNode{" + "node=" + node + ", lastSortedWeight=" + lastSortedWeight + '}'; + } } /** @@ -666,6 +671,15 @@ public Replica getLeader() { public ShardState getState() { return null; } + + @Override + public String toString() { + return Optional.ofNullable(collection) + .map(SolrCollection::getName) + .orElse("") + + "/" + + shardName; + } }; return new Replica() { @Override @@ -697,6 +711,15 @@ public String getCoreName() { public Node getNode() { return node; } + + @Override + public String toString() { + return Optional.ofNullable(shard).map(Shard::getShardName).orElse("") + + "@" + + Optional.ofNullable(node).map(Node::getName).orElse("") + + " of " + + type; + } }; } } diff --git a/solr/core/src/test/org/apache/solr/cluster/placement/plugins/AffinityPlacementFactoryTest.java b/solr/core/src/test/org/apache/solr/cluster/placement/plugins/AffinityPlacementFactoryTest.java index c71c823e59b..424161dafa4 100644 --- a/solr/core/src/test/org/apache/solr/cluster/placement/plugins/AffinityPlacementFactoryTest.java +++ b/solr/core/src/test/org/apache/solr/cluster/placement/plugins/AffinityPlacementFactoryTest.java @@ -18,10 +18,13 @@ package org.apache.solr.cluster.placement.plugins; import java.lang.invoke.MethodHandles; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Optional; @@ -29,6 +32,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; +import java.util.stream.Stream; import java.util.stream.StreamSupport; import org.apache.solr.cluster.Cluster; import org.apache.solr.cluster.Node; @@ -48,6 +52,7 @@ import org.apache.solr.cluster.placement.impl.BalanceRequestImpl; import org.apache.solr.cluster.placement.impl.ModificationRequestImpl; import org.apache.solr.cluster.placement.impl.PlacementRequestImpl; +import org.apache.solr.common.SolrException; import org.apache.solr.common.util.Pair; import org.apache.solr.common.util.StrUtils; import org.junit.Before; @@ -793,6 +798,18 @@ public void testFreeDiskConstraintsWithNewReplicas() throws Exception { () -> plugin.computePlacement(badShardPlacementRequest, placementContext)); } + @Test(expected = SolrException.class) + public void testWithCollectionDisjointWithShards() { + AffinityPlacementConfig config = + new AffinityPlacementConfig( + MINIMAL_FREE_DISK_GB, + PRIORITIZED_FREE_DISK_GB, + Map.of(primaryCollectionName, secondaryCollectionName), + Map.of(primaryCollectionName, secondaryCollectionName), + Map.of()); + configurePlugin(config); + } + @Test public void testWithCollectionPlacement() throws Exception { AffinityPlacementConfig config = @@ -851,6 +868,96 @@ public void testWithCollectionPlacement() throws Exception { } } + @Test + public void testWithCollectionShardsPlacement() throws Exception { + AffinityPlacementConfig config = + new AffinityPlacementConfig( + MINIMAL_FREE_DISK_GB, + PRIORITIZED_FREE_DISK_GB, + Map.of(), + Map.of(primaryCollectionName, secondaryCollectionName), + Map.of()); + configurePlugin(config); + + int NUM_NODES = 3; + Builders.ClusterBuilder clusterBuilder = + Builders.newClusterBuilder().initializeLiveNodes(NUM_NODES); + Builders.CollectionBuilder collectionBuilder = + Builders.newCollectionBuilder(secondaryCollectionName); + collectionBuilder.initializeShardsReplicas(2, 1, 0, 0, clusterBuilder.getLiveNodeBuilders()); + clusterBuilder.addCollection(collectionBuilder); + + collectionBuilder = Builders.newCollectionBuilder(primaryCollectionName); + collectionBuilder.initializeShardsReplicas(0, 0, 0, 0, clusterBuilder.getLiveNodeBuilders()); + clusterBuilder.addCollection(collectionBuilder); + + PlacementContext placementContext = clusterBuilder.buildPlacementContext(); + Cluster cluster = placementContext.getCluster(); + + SolrCollection secondaryCollection = cluster.getCollection(secondaryCollectionName); + SolrCollection primaryCollection = cluster.getCollection(primaryCollectionName); + + Set secondaryNodes = new HashSet<>(); + secondaryCollection + .shards() + .forEach(s -> s.replicas().forEach(r -> secondaryNodes.add(r.getNode()))); + + final List liveNodes = new ArrayList<>(cluster.getLiveNodes()); + Collections.shuffle(liveNodes, random()); + PlacementRequestImpl placementRequest = + new PlacementRequestImpl( + primaryCollection, + shuffle(Arrays.asList("shard2", "shard1")), + shuffle(cluster.getLiveNodes()), + 1, + 0, + 0); + + PlacementPlan pp = plugin.computePlacement(placementRequest, placementContext); + assertEquals(2, pp.getReplicaPlacements().size()); + // verify that all placements are on nodes with the secondary replica + pp.getReplicaPlacements() + .forEach( + placement -> { + assertTrue( + "placement node " + placement.getNode() + " not in secondary=" + secondaryNodes, + secondaryNodes.contains(placement.getNode())); + boolean collocated = false; + final Shard shard = secondaryCollection.getShard(placement.getShardName()); + StringBuilder msg = new StringBuilder(); + for (Iterator secReplicas = shard.iterator(); + secReplicas.hasNext() && !collocated; ) { + final Replica secReplica = secReplicas.next(); + collocated |= placement.getNode().getName().equals(secReplica.getNode().getName()); + msg.append(secReplica.getReplicaName()); + msg.append("@"); + msg.append(secReplica.getNode().getName()); + msg.append(", "); + } + assertTrue(placement + " is expected to be collocated with " + msg, collocated); + }); + + placementRequest = + new PlacementRequestImpl( + primaryCollection, Set.of("shard3"), cluster.getLiveNodes(), 1, 0, 0); + try { + pp = plugin.computePlacement(placementRequest, placementContext); + fail("should generate 'has no replicas on eligible nodes' failure here"); + } catch (PlacementException pe) { + assertTrue(pe.toString(), pe.toString().contains("Not enough eligible nodes")); + } + } + + private Set shuffle(Set liveNodes) { + final List nodes = new ArrayList<>(liveNodes); + return shuffle(nodes); + } + + private static Set shuffle(List nodes) { + Collections.shuffle(nodes, random()); + return new LinkedHashSet<>(nodes); + } + @Test public void testWithCollectionModificationRejected() throws Exception { AffinityPlacementConfig config = @@ -924,6 +1031,101 @@ public void testWithCollectionModificationRejected() throws Exception { } } + @Test + public void testWithCollectionShardsModificationRejected() throws Exception { + AffinityPlacementConfig config = + new AffinityPlacementConfig( + MINIMAL_FREE_DISK_GB, + PRIORITIZED_FREE_DISK_GB, + Map.of(), + Map.of(primaryCollectionName, secondaryCollectionName), + Map.of()); + configurePlugin(config); + + int NUM_NODES = 2; + Builders.ClusterBuilder clusterBuilder = + Builders.newClusterBuilder().initializeLiveNodes(NUM_NODES); + Builders.CollectionBuilder collectionBuilder = + Builders.newCollectionBuilder(secondaryCollectionName); + collectionBuilder.initializeShardsReplicas(2, 3, 0, 0, clusterBuilder.getLiveNodeBuilders()); + clusterBuilder.addCollection(collectionBuilder); + + collectionBuilder = Builders.newCollectionBuilder(primaryCollectionName); + collectionBuilder.initializeShardsReplicas( + 2, random().nextBoolean() ? 1 : 2, 0, 0, clusterBuilder.getLiveNodeBuilders()); + clusterBuilder.addCollection(collectionBuilder); + + PlacementContext placementContext = clusterBuilder.buildPlacementContext(); + Cluster cluster = placementContext.getCluster(); + + SolrCollection secondaryCollection = cluster.getCollection(secondaryCollectionName); + SolrCollection primaryCollection = cluster.getCollection(primaryCollectionName); + + final ArrayList nodes = new ArrayList<>(cluster.getLiveNodes()); + Collections.shuffle(nodes, random()); + Set toRemove = new HashSet<>(); + DeleteReplicasRequest deleteReplicasRequest; + for (Node node : nodes) { + Set seen = new HashSet<>(); + final Set mustHaveShards = + replicas(primaryCollection) + .filter(r -> r.getNode().getName().equals(node.getName())) + .map(r -> r.getShard().getShardName()) + .collect(Collectors.toSet()); + + replicas(secondaryCollection) + .filter(r -> r.getNode().getName().equals(node.getName())) + .forEach( + r -> { + final String secRepShard = r.getShard().getShardName(); + if (mustHaveShards.contains(secRepShard)) { + if (seen.contains(secRepShard)) { + toRemove.add(r); + } else { + seen.add(secRepShard); + } + } else { + toRemove.add(r); + } + }); + + assertFalse(toRemove.isEmpty()); + deleteReplicasRequest = + ModificationRequestImpl.createDeleteReplicasRequest(secondaryCollection, toRemove); + try { + plugin.verifyAllowedModification(deleteReplicasRequest, placementContext); + } catch (PlacementException pe) { + fail("should have succeeded: " + pe); + } + } + final List remainingReplicas = + replicas(secondaryCollection) + .filter(r -> !toRemove.contains(r)) + .collect(Collectors.toList()); + Collections.shuffle(remainingReplicas, random()); + toRemove.add(remainingReplicas.iterator().next()); + + deleteReplicasRequest = + ModificationRequestImpl.createDeleteReplicasRequest(secondaryCollection, toRemove); + try { + plugin.verifyAllowedModification(deleteReplicasRequest, placementContext); + fail("should have failed: " + deleteReplicasRequest); + } catch (PlacementException pe) { + } + } + + private Stream replicas(SolrCollection primaryCollection) { + return shards(primaryCollection).flatMap(shard -> replicas(shard)); + } + + private Stream replicas(Shard shard) { + return StreamSupport.stream(shard.replicas().spliterator(), false); + } + + private static Stream shards(SolrCollection primaryCollection) { + return StreamSupport.stream(primaryCollection.shards().spliterator(), false); + } + @Test public void testNodeType() throws Exception { Builders.ClusterBuilder clusterBuilder = Builders.newClusterBuilder().initializeLiveNodes(9); diff --git a/solr/solr-ref-guide/modules/configuration-guide/pages/replica-placement-plugins.adoc b/solr/solr-ref-guide/modules/configuration-guide/pages/replica-placement-plugins.adoc index e3769bc00b1..1f5f19e14ae 100644 --- a/solr/solr-ref-guide/modules/configuration-guide/pages/replica-placement-plugins.adoc +++ b/solr/solr-ref-guide/modules/configuration-guide/pages/replica-placement-plugins.adoc @@ -113,6 +113,7 @@ The autoscaling specification in the configuration linked above aimed to do the It also supports additional per-collection constraints: * `withCollection` enforces the placement of co-located collections' replicas on the same nodes, and prevents deletions of collections and replicas that would break this constraint. +* `withCollectionShards` same as above but also collocates shards. ie. shardN is placed at the same node where shardN from the referred collection is located. Note: keys of `withCollectionShards` should be disjoint with `withCollection` keys. * `collectionNodeType` limits the nodes eligible for placement to only those that match one or more of the specified node types. See below for more details on these constraints. @@ -120,7 +121,7 @@ See below for more details on these constraints. Overall strategy of this plugin: * The set of nodes in the cluster is obtained. -If `withCollection` is defined and applicable to the current collection then this candidate set is filtered so that only eligible nodes remain according to this constraint. +If `withCollection` or `withCollectionShards` are defined and applicable to the current collection then this candidate set is filtered so that only eligible nodes remain according to this constraint. * The resulting node set is transformed into 3 independent sets (that can overlap) of nodes accepting each of the three replica types (NRT, TLOG, and PULL). * For each shard on which placing replicas is required and then for each replica type to place (starting with NRT, then TLOG, then PULL): ** The set of candidates nodes corresponding to the replica type is used and from that set are removed nodes that already have a replica (of any type) for that shard. @@ -173,6 +174,18 @@ The plugin will assume that the secondary collection replicas are already in pla + See the section <> below. +`withCollectionShards`:: ++ +[%autowidth,frame=none] +|=== +|Optional |Default: none +|=== ++ +Same as `withCollection` but enforces a shard level constraint. +eg. shardN of the primary collection (occurs in a key) is placed only on nodes where shardN of secondary collection (occurs as a value) resides. +The same constraint is enforced on deleting when a replica of a secondary collection shardN is deleted. It prevents deletion if primary collection's shardN is collocated on certain node. +Keys should be disjoint with `withCollection`. ++ `collectionNodeType`:: + [%autowidth,frame=none] @@ -199,6 +212,7 @@ The plugin preserves this co-location by rejecting delete operation of secondary In order to delete a secondary collection (or its replicas) from these nodes first the replicas of the primary collection must be removed from the co-located nodes, or the configuration must be changed to remove the co-location mapping for the primary collection. + == Example Configurations This is a simple configuration that uses default values: diff --git a/solr/test-framework/src/java/org/apache/solr/cluster/placement/ClusterAbstractionsForTest.java b/solr/test-framework/src/java/org/apache/solr/cluster/placement/ClusterAbstractionsForTest.java index c6e59364c3f..f84e6353215 100644 --- a/solr/test-framework/src/java/org/apache/solr/cluster/placement/ClusterAbstractionsForTest.java +++ b/solr/test-framework/src/java/org/apache/solr/cluster/placement/ClusterAbstractionsForTest.java @@ -231,6 +231,11 @@ public boolean equals(Object obj) { public int hashCode() { return Objects.hash(shardName, collection, shardState); } + + @Override + public String toString() { + return "ShardImpl{" + "shardName='" + shardName + '\'' + '}'; + } } static class ReplicaImpl implements Replica {