Skip to content

Commit

Permalink
Infrastructure for assuming cluster features in the next major version (
Browse files Browse the repository at this point in the history
elastic#118143)

Allow features to be marked as 'assumed', allowing them to be removed in the next major version.
  • Loading branch information
thecoop authored Dec 17, 2024
1 parent 6516a53 commit f5712e4
Show file tree
Hide file tree
Showing 14 changed files with 464 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -155,10 +155,8 @@ org.elasticsearch.cluster.ClusterState#compatibilityVersions()

@defaultMessage ClusterFeatures#nodeFeatures is for internal use only. Use FeatureService#clusterHasFeature to determine if a feature is present on the cluster.
org.elasticsearch.cluster.ClusterFeatures#nodeFeatures()
@defaultMessage ClusterFeatures#allNodeFeatures is for internal use only. Use FeatureService#clusterHasFeature to determine if a feature is present on the cluster.
org.elasticsearch.cluster.ClusterFeatures#allNodeFeatures()
@defaultMessage ClusterFeatures#clusterHasFeature is for internal use only. Use FeatureService#clusterHasFeature to determine if a feature is present on the cluster.
org.elasticsearch.cluster.ClusterFeatures#clusterHasFeature(org.elasticsearch.features.NodeFeature)
org.elasticsearch.cluster.ClusterFeatures#clusterHasFeature(org.elasticsearch.cluster.node.DiscoveryNodes, org.elasticsearch.features.NodeFeature)

@defaultMessage Do not construct this records outside the source files they are declared in
org.elasticsearch.cluster.SnapshotsInProgress$ShardSnapshotStatus#<init>(java.lang.String, org.elasticsearch.cluster.SnapshotsInProgress$ShardState, org.elasticsearch.repositories.ShardGeneration, java.lang.String, org.elasticsearch.repositories.ShardSnapshotResult)
Expand Down
5 changes: 5 additions & 0 deletions docs/changelog/118143.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 118143
summary: Infrastructure for assuming cluster features in the next major version
area: "Infra/Core"
type: feature
issues: []
56 changes: 45 additions & 11 deletions server/src/main/java/org/elasticsearch/cluster/ClusterFeatures.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@

package org.elasticsearch.cluster;

import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ChunkedToXContent;
import org.elasticsearch.common.xcontent.ChunkedToXContentObject;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.features.NodeFeature;
import org.elasticsearch.xcontent.ToXContent;

Expand Down Expand Up @@ -79,28 +80,61 @@ public Map<String, Set<String>> nodeFeatures() {
return nodeFeatures;
}

/**
* The features in all nodes in the cluster.
* <p>
* NOTE: This should not be used directly.
* Please use {@link org.elasticsearch.features.FeatureService#clusterHasFeature} instead.
*/
public Set<String> allNodeFeatures() {
private Set<String> allNodeFeatures() {
if (allNodeFeatures == null) {
allNodeFeatures = Set.copyOf(calculateAllNodeFeatures(nodeFeatures.values()));
}
return allNodeFeatures;
}

/**
* Returns {@code true} if {@code node} can have assumed features.
* @see org.elasticsearch.env.BuildVersion#canRemoveAssumedFeatures
*/
public static boolean featuresCanBeAssumedForNode(DiscoveryNode node) {
return node.getBuildVersion().canRemoveAssumedFeatures();
}

/**
* Returns {@code true} if one or more nodes in {@code nodes} can have assumed features.
* @see org.elasticsearch.env.BuildVersion#canRemoveAssumedFeatures
*/
public static boolean featuresCanBeAssumedForNodes(DiscoveryNodes nodes) {
return nodes.getAllNodes().stream().anyMatch(n -> n.getBuildVersion().canRemoveAssumedFeatures());
}

/**
* {@code true} if {@code feature} is present on all nodes in the cluster.
* <p>
* NOTE: This should not be used directly.
* Please use {@link org.elasticsearch.features.FeatureService#clusterHasFeature} instead.
*/
@SuppressForbidden(reason = "directly reading cluster features")
public boolean clusterHasFeature(NodeFeature feature) {
return allNodeFeatures().contains(feature.id());
public boolean clusterHasFeature(DiscoveryNodes nodes, NodeFeature feature) {
assert nodes.getNodes().keySet().equals(nodeFeatures.keySet())
: "Cluster features nodes " + nodeFeatures.keySet() + " is different to discovery nodes " + nodes.getNodes().keySet();

// basic case
boolean allNodesHaveFeature = allNodeFeatures().contains(feature.id());
if (allNodesHaveFeature) {
return true;
}

// if the feature is assumed, check the versions more closely
// it's actually ok if the feature is assumed, and all nodes missing the feature can assume it
// TODO: do we need some kind of transient cache of this calculation?
if (feature.assumedAfterNextCompatibilityBoundary()) {
for (var nf : nodeFeatures.entrySet()) {
if (nf.getValue().contains(feature.id()) == false
&& featuresCanBeAssumedForNode(nodes.getNodes().get(nf.getKey())) == false) {
return false;
}
}

// all nodes missing the feature can assume it - so that's alright then
return true;
}

return false;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Strings;
import org.elasticsearch.features.FeatureService;
import org.elasticsearch.features.NodeFeature;
import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.index.IndexVersions;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
Expand All @@ -39,6 +40,7 @@
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -137,8 +139,8 @@ public ClusterState execute(BatchExecutionContext<JoinTask> batchExecutionContex

DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(newState.nodes());
Map<String, CompatibilityVersions> compatibilityVersionsMap = new HashMap<>(newState.compatibilityVersions());
Map<String, Set<String>> nodeFeatures = new HashMap<>(newState.nodeFeatures());
Set<String> allNodesFeatures = ClusterFeatures.calculateAllNodeFeatures(nodeFeatures.values());
Map<String, Set<String>> nodeFeatures = new HashMap<>(newState.nodeFeatures()); // as present in cluster state
Set<String> effectiveClusterFeatures = calculateEffectiveClusterFeatures(newState.nodes(), nodeFeatures);

assert nodesBuilder.isLocalNodeElectedMaster();

Expand Down Expand Up @@ -174,14 +176,17 @@ public ClusterState execute(BatchExecutionContext<JoinTask> batchExecutionContex
}
blockForbiddenVersions(compatibilityVersions.transportVersion());
ensureNodesCompatibility(node.getVersion(), minClusterNodeVersion, maxClusterNodeVersion);
enforceNodeFeatureBarrier(node.getId(), allNodesFeatures, features);
Set<String> newNodeEffectiveFeatures = enforceNodeFeatureBarrier(node, effectiveClusterFeatures, features);
// we do this validation quite late to prevent race conditions between nodes joining and importing dangling indices
// we have to reject nodes that don't support all indices we have in this cluster
ensureIndexCompatibility(node.getMinIndexVersion(), node.getMaxIndexVersion(), initialState.getMetadata());

nodesBuilder.add(node);
compatibilityVersionsMap.put(node.getId(), compatibilityVersions);
// store the actual node features here, not including assumed features, as this is persisted in cluster state
nodeFeatures.put(node.getId(), features);
allNodesFeatures.retainAll(features);
effectiveClusterFeatures.retainAll(newNodeEffectiveFeatures);

nodesChanged = true;
minClusterNodeVersion = Version.min(minClusterNodeVersion, node.getVersion());
maxClusterNodeVersion = Version.max(maxClusterNodeVersion, node.getVersion());
Expand Down Expand Up @@ -355,6 +360,35 @@ private static void blockForbiddenVersions(TransportVersion joiningTransportVers
}
}

/**
* Calculate the cluster's effective features. This includes all features that are assumed on any nodes in the cluster,
* that are also present across the whole cluster as a result.
*/
private Set<String> calculateEffectiveClusterFeatures(DiscoveryNodes nodes, Map<String, Set<String>> nodeFeatures) {
if (featureService.featuresCanBeAssumedForNodes(nodes)) {
Set<String> assumedFeatures = featureService.getNodeFeatures()
.values()
.stream()
.filter(NodeFeature::assumedAfterNextCompatibilityBoundary)
.map(NodeFeature::id)
.collect(Collectors.toSet());

// add all assumed features to the featureset of all nodes of the next major version
nodeFeatures = new HashMap<>(nodeFeatures);
for (var node : nodes.getNodes().entrySet()) {
if (featureService.featuresCanBeAssumedForNode(node.getValue())) {
assert nodeFeatures.containsKey(node.getKey()) : "Node " + node.getKey() + " does not have any features";
nodeFeatures.computeIfPresent(node.getKey(), (k, v) -> {
var newFeatures = new HashSet<>(v);
return newFeatures.addAll(assumedFeatures) ? newFeatures : v;
});
}
}
}

return ClusterFeatures.calculateAllNodeFeatures(nodeFeatures.values());
}

/**
* Ensures that all indices are compatible with the given index version. This will ensure that all indices in the given metadata
* will not be created with a newer version of elasticsearch as well as that all indices are newer or equal to the minimum index
Expand Down Expand Up @@ -461,13 +495,44 @@ public static void ensureVersionBarrier(Version joiningNodeVersion, Version minC
}
}

private void enforceNodeFeatureBarrier(String nodeId, Set<String> existingNodesFeatures, Set<String> newNodeFeatures) {
/**
* Enforces the feature join barrier - a joining node should have all features already present in all existing nodes in the cluster
*
* @return The set of features that this node has (including assumed features)
*/
private Set<String> enforceNodeFeatureBarrier(DiscoveryNode node, Set<String> effectiveClusterFeatures, Set<String> newNodeFeatures) {
// prevent join if it does not have one or more features that all other nodes have
Set<String> missingFeatures = new HashSet<>(existingNodesFeatures);
Set<String> missingFeatures = new HashSet<>(effectiveClusterFeatures);
missingFeatures.removeAll(newNodeFeatures);

if (missingFeatures.isEmpty() == false) {
throw new IllegalStateException("Node " + nodeId + " is missing required features " + missingFeatures);
if (missingFeatures.isEmpty()) {
// nothing missing - all ok
return newNodeFeatures;
}

if (featureService.featuresCanBeAssumedForNode(node)) {
// it might still be ok for this node to join if this node can have assumed features,
// and all the missing features are assumed
// we can get the NodeFeature object direct from this node's registered features
// as all existing nodes in the cluster have the features present in existingNodesFeatures, including this one
newNodeFeatures = new HashSet<>(newNodeFeatures);
for (Iterator<String> it = missingFeatures.iterator(); it.hasNext();) {
String feature = it.next();
NodeFeature nf = featureService.getNodeFeatures().get(feature);
if (nf.assumedAfterNextCompatibilityBoundary()) {
// its ok for this feature to be missing from this node
it.remove();
// and it should be assumed to still be in the cluster
newNodeFeatures.add(feature);
}
// even if we don't remove it, still continue, so the exception message below is accurate
}
}

if (missingFeatures.isEmpty()) {
return newNodeFeatures;
} else {
throw new IllegalStateException("Node " + node.getId() + " is missing required features " + missingFeatures);
}
}

Expand Down
6 changes: 6 additions & 0 deletions server/src/main/java/org/elasticsearch/env/BuildVersion.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@
*/
public abstract class BuildVersion implements ToXContentFragment, Writeable {

/**
* Checks if this version can operate properly in a cluster without features
* that are assumed in the currently running Elasticsearch.
*/
public abstract boolean canRemoveAssumedFeatures();

/**
* Check whether this version is on or after a minimum threshold.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,17 @@ final class DefaultBuildVersion extends BuildVersion {
this(in.readVInt());
}

@Override
public boolean canRemoveAssumedFeatures() {
/*
* We can remove assumed features if the node version is the next major version.
* This is because the next major version can only form a cluster with the
* latest minor version of the previous major, so any features introduced before that point
* (that are marked as assumed in the running code version) are automatically met by that version.
*/
return version.major == Version.CURRENT.major + 1;
}

@Override
public boolean onOrAfterMinimumCompatible() {
return Version.CURRENT.minimumCompatibilityVersion().onOrBefore(version);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@

package org.elasticsearch.features;

import org.elasticsearch.cluster.ClusterFeatures;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.logging.LogManager;
import org.elasticsearch.logging.Logger;
Expand Down Expand Up @@ -38,9 +41,7 @@ public class FeatureService {
* as the local node's supported feature set
*/
public FeatureService(List<? extends FeatureSpecification> specs) {

var featureData = FeatureData.createFromSpecifications(specs);
nodeFeatures = featureData.getNodeFeatures();
this.nodeFeatures = FeatureData.createFromSpecifications(specs).getNodeFeatures();

logger.info("Registered local node features {}", nodeFeatures.keySet().stream().sorted().toList());
}
Expand All @@ -53,11 +54,25 @@ public Map<String, NodeFeature> getNodeFeatures() {
return nodeFeatures;
}

/**
* Returns {@code true} if {@code node} can have assumed features.
*/
public boolean featuresCanBeAssumedForNode(DiscoveryNode node) {
return ClusterFeatures.featuresCanBeAssumedForNode(node);
}

/**
* Returns {@code true} if one or more nodes in {@code nodes} can have assumed features.
*/
public boolean featuresCanBeAssumedForNodes(DiscoveryNodes nodes) {
return ClusterFeatures.featuresCanBeAssumedForNodes(nodes);
}

/**
* Returns {@code true} if all nodes in {@code state} support feature {@code feature}.
*/
@SuppressForbidden(reason = "We need basic feature information from cluster state")
public boolean clusterHasFeature(ClusterState state, NodeFeature feature) {
return state.clusterFeatures().clusterHasFeature(feature);
return state.clusterFeatures().clusterHasFeature(state.nodes(), feature);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,17 @@
* A feature published by a node.
*
* @param id The feature id. Must be unique in the node.
* @param assumedAfterNextCompatibilityBoundary
* {@code true} if this feature is removed at the next compatibility boundary (ie next major version),
* and so should be assumed to be true for all nodes after that boundary.
*/
public record NodeFeature(String id) {
public record NodeFeature(String id, boolean assumedAfterNextCompatibilityBoundary) {

public NodeFeature {
Objects.requireNonNull(id);
}

public NodeFeature(String id) {
this(id, false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -294,8 +294,8 @@ protected boolean areFileSettingsApplied(ClusterState clusterState) {
}

@SuppressForbidden(reason = "need to check file settings support on exact cluster state")
private static boolean supportsFileSettings(ClusterState clusterState) {
return clusterState.clusterFeatures().clusterHasFeature(FileSettingsFeatures.FILE_SETTINGS_SUPPORTED);
private boolean supportsFileSettings(ClusterState clusterState) {
return clusterState.clusterFeatures().clusterHasFeature(clusterState.nodes(), FileSettingsFeatures.FILE_SETTINGS_SUPPORTED);
}

private void setReady(boolean ready) {
Expand Down
Loading

0 comments on commit f5712e4

Please sign in to comment.