Skip to content

Commit

Permalink
SOLR-16837: introducing AffinityPlacementFactory.withCollectionShards (
Browse files Browse the repository at this point in the history
…apache#1709)

* SOLR-16837: Introducing AffinityPlacementFactory.withCollectionShards
  • Loading branch information
mkhludnev authored Jun 29, 2023
1 parent c8ee862 commit ce4b59e
Show file tree
Hide file tree
Showing 7 changed files with 331 additions and 15 deletions.
3 changes: 3 additions & 0 deletions solr/CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ New Features

* SOLR-16844: Added new parameter `backupConfigset` to collection Backup to optionally skip backing up configsets. (Tomás Fernández Löbbe)

* SOLR-16837: Introducing AffinityPlacementFactory.withCollectionShards to collocate corresponding shards of two collections.
eg primaryColl.shard1 will be placed to the node where secondaryColl.shard1 resides, etc. (Mikhail Khludnev)

Improvements
---------------------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@

package org.apache.solr.cluster.placement.plugins;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import org.apache.solr.cluster.placement.PlacementPluginConfig;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.annotation.JsonProperty;

/** Configuration bean for {@link AffinityPlacementFactory}. */
Expand Down Expand Up @@ -103,6 +106,11 @@ public class AffinityPlacementConfig implements PlacementPluginConfig {
* acceptable node types).
*/
@JsonProperty public Map<String, String> collectionNodeType;
/**
* Same as {@link AffinityPlacementConfig#withCollection} but ensures shard to shard
* correspondence. should be disjoint with {@link AffinityPlacementConfig#withCollection}.
*/
@JsonProperty public Map<String, String> withCollectionShards;

/**
* When this property is set to {@code true}, Solr will try to place replicas for the same shard
Expand Down Expand Up @@ -161,12 +169,40 @@ public AffinityPlacementConfig(
long minimalFreeDiskGB,
long prioritizedFreeDiskGB,
Map<String, String> withCollection,
Map<String, String> withCollectionShards,
Map<String, String> collectionNodeType) {
this.minimalFreeDiskGB = minimalFreeDiskGB;
this.prioritizedFreeDiskGB = prioritizedFreeDiskGB;
Objects.requireNonNull(withCollection);
Objects.requireNonNull(withCollectionShards);
Objects.requireNonNull(collectionNodeType);
this.withCollection = withCollection;
this.withCollectionShards = withCollectionShards;
this.collectionNodeType = collectionNodeType;
}

public AffinityPlacementConfig(
long minimalFreeDiskGB,
long prioritizedFreeDiskGB,
Map<String, String> withCollection,
Map<String, String> collectionNodeType) {
this(
minimalFreeDiskGB,
prioritizedFreeDiskGB,
withCollection,
Collections.emptyMap(),
collectionNodeType);
}

public void validate() {
if (!Collections.disjoint(withCollection.keySet(), withCollectionShards.keySet())) {
final ArrayList<String> collections = new ArrayList<>(withCollection.keySet());
collections.retainAll(withCollectionShards.keySet());
throw new SolrException(
SolrException.ErrorCode.BAD_REQUEST,
"withCollection and withCollectionShards should be disjoint. But there are "
+ collections
+ " in common.");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.solr.cluster.Cluster;
import org.apache.solr.cluster.Node;
import org.apache.solr.cluster.Replica;
Expand Down Expand Up @@ -123,17 +124,20 @@ public AffinityPlacementFactory() {}

@Override
public PlacementPlugin createPluginInstance() {
config.validate();
return new AffinityPlacementPlugin(
config.minimalFreeDiskGB,
config.prioritizedFreeDiskGB,
config.withCollection,
config.withCollectionShards,
config.collectionNodeType,
config.spreadAcrossDomains);
}

@Override
public void configure(AffinityPlacementConfig cfg) {
Objects.requireNonNull(cfg, "configuration must never be null");
cfg.validate();
this.config = cfg;
}

Expand All @@ -154,7 +158,9 @@ static class AffinityPlacementPlugin extends OrderedNodePlacementPlugin {

// primary to secondary (1:1)
private final Map<String, String> withCollections;
// secondary to primary (1:N)
// same but shardwise
private final Map<String, String> withCollectionShards;
// secondary to primary (1:N) + shard-wise_primary (1:N)
private final Map<String, Set<String>> collocatedWith;

private final Map<String, Set<String>> nodeTypes;
Expand All @@ -169,22 +175,28 @@ private AffinityPlacementPlugin(
long minimalFreeDiskGB,
long prioritizedFreeDiskGB,
Map<String, String> withCollections,
Map<String, String> withCollectionShards,
Map<String, String> collectionNodeTypes,
boolean spreadAcrossDomains) {
this.minimalFreeDiskGB = minimalFreeDiskGB;
this.prioritizedFreeDiskGB = prioritizedFreeDiskGB;
Objects.requireNonNull(withCollections, "withCollections must not be null");
Objects.requireNonNull(collectionNodeTypes, "collectionNodeTypes must not be null");
Objects.requireNonNull(withCollectionShards, "withCollectionShards must not be null");
this.spreadAcrossDomains = spreadAcrossDomains;
this.withCollections = withCollections;
if (withCollections.isEmpty()) {
collocatedWith = Map.of();
} else {
collocatedWith = new HashMap<>();
withCollections.forEach(
(primary, secondary) ->
collocatedWith.computeIfAbsent(secondary, s -> new HashSet<>()).add(primary));
}
this.withCollectionShards = withCollectionShards;
Map<String, Set<String>> collocated = new HashMap<>();
// reverse both relations: shard-agnostic and shard-wise
List.of(this.withCollections, this.withCollectionShards)
.forEach(
direct ->
direct.forEach(
(primary, secondary) ->
collocated
.computeIfAbsent(secondary, s -> new HashSet<>())
.add(primary)));
this.collocatedWith = Collections.unmodifiableMap(collocated);

if (collectionNodeTypes.isEmpty()) {
nodeTypes = Map.of();
Expand Down Expand Up @@ -521,6 +533,12 @@ public boolean canAddReplica(Replica replica) {
&& Optional.ofNullable(withCollections.get(collection))
.map(this::hasCollectionOnNode)
.orElse(true)
// Ensure same shard is collocated if required
&& Optional.ofNullable(withCollectionShards.get(collection))
.map(
shardWiseOf ->
getShardsOnNode(shardWiseOf).contains(replica.getShard().getShardName()))
.orElse(true)
// Ensure the disk space will not go below the minimum if the replica is added
&& (minimalFreeDiskGB <= 0
|| nodeFreeDiskGB - getProjectedSizeOfReplica(replica) > minimalFreeDiskGB);
Expand All @@ -547,6 +565,14 @@ public Map<Replica, String> canRemoveReplicas(Collection<Replica> replicas) {
continue;
}

Stream<String> shardWiseCollocations =
collocatedCollections.stream()
.filter(
priColl -> collection.getName().equals(withCollectionShards.get(priColl)));
final Set<String> mandatoryShardsOrAll =
shardWiseCollocations
.flatMap(priColl -> getShardsOnNode(priColl).stream())
.collect(Collectors.toSet());
// There are collocatedCollections for this shard, so make sure there is a replica of this
// shard left on the node after it is removed
Set<Replica> replicasRemovedForShard =
Expand All @@ -555,11 +581,18 @@ public Map<Replica, String> canRemoveReplicas(Collection<Replica> replicas) {
replica.getShard().getCollection().getName(), k -> new HashMap<>())
.computeIfAbsent(replica.getShard().getShardName(), k -> new HashSet<>());
replicasRemovedForShard.add(replica);

if (replicasRemovedForShard.size()
>= getReplicasForShardOnNode(replica.getShard()).size()) {
replicaRemovalExceptions.put(
replica, "co-located with replicas of " + collocatedCollections);
// either if all shards are mandatory, or the current one is mandatory
boolean shardWise = false;
if (mandatoryShardsOrAll.isEmpty()
|| (shardWise = mandatoryShardsOrAll.contains(replica.getShard().getShardName()))) {
if (replicasRemovedForShard.size()
>= getReplicasForShardOnNode(replica.getShard()).size()) {
replicaRemovalExceptions.put(
replica,
"co-located with replicas of "
+ (shardWise ? replica.getShard().getShardName() + " of " : "")
+ collocatedCollections);
}
}
}
return replicaRemovalExceptions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -613,6 +613,11 @@ public boolean equals(Object o) {
}
}
}

@Override
public String toString() {
return "WeightedNode{" + "node=" + node + ", lastSortedWeight=" + lastSortedWeight + '}';
}
}

/**
Expand Down Expand Up @@ -666,6 +671,15 @@ public Replica getLeader() {
public ShardState getState() {
return null;
}

@Override
public String toString() {
return Optional.ofNullable(collection)
.map(SolrCollection::getName)
.orElse("<no collection>")
+ "/"
+ shardName;
}
};
return new Replica() {
@Override
Expand Down Expand Up @@ -697,6 +711,15 @@ public String getCoreName() {
public Node getNode() {
return node;
}

@Override
public String toString() {
return Optional.ofNullable(shard).map(Shard::getShardName).orElse("<no shard>")
+ "@"
+ Optional.ofNullable(node).map(Node::getName).orElse("<no node>")
+ " of "
+ type;
}
};
}
}
Loading

0 comments on commit ce4b59e

Please sign in to comment.