diff --git a/build-tools-internal/src/main/resources/forbidden/es-server-signatures.txt b/build-tools-internal/src/main/resources/forbidden/es-server-signatures.txt index a9da7995c2b36..53480a4a27b0b 100644 --- a/build-tools-internal/src/main/resources/forbidden/es-server-signatures.txt +++ b/build-tools-internal/src/main/resources/forbidden/es-server-signatures.txt @@ -155,10 +155,8 @@ org.elasticsearch.cluster.ClusterState#compatibilityVersions() @defaultMessage ClusterFeatures#nodeFeatures is for internal use only. Use FeatureService#clusterHasFeature to determine if a feature is present on the cluster. org.elasticsearch.cluster.ClusterFeatures#nodeFeatures() -@defaultMessage ClusterFeatures#allNodeFeatures is for internal use only. Use FeatureService#clusterHasFeature to determine if a feature is present on the cluster. -org.elasticsearch.cluster.ClusterFeatures#allNodeFeatures() @defaultMessage ClusterFeatures#clusterHasFeature is for internal use only. Use FeatureService#clusterHasFeature to determine if a feature is present on the cluster. -org.elasticsearch.cluster.ClusterFeatures#clusterHasFeature(org.elasticsearch.features.NodeFeature) +org.elasticsearch.cluster.ClusterFeatures#clusterHasFeature(org.elasticsearch.cluster.node.DiscoveryNodes, org.elasticsearch.features.NodeFeature) @defaultMessage Do not construct this records outside the source files they are declared in org.elasticsearch.cluster.SnapshotsInProgress$ShardSnapshotStatus#(java.lang.String, org.elasticsearch.cluster.SnapshotsInProgress$ShardState, org.elasticsearch.repositories.ShardGeneration, java.lang.String, org.elasticsearch.repositories.ShardSnapshotResult) diff --git a/docs/changelog/118143.yaml b/docs/changelog/118143.yaml new file mode 100644 index 0000000000000..4dcbf4b4b6c2c --- /dev/null +++ b/docs/changelog/118143.yaml @@ -0,0 +1,5 @@ +pr: 118143 +summary: Infrastructure for assuming cluster features in the next major version +area: "Infra/Core" +type: feature +issues: [] diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterFeatures.java b/server/src/main/java/org/elasticsearch/cluster/ClusterFeatures.java index ad285cbd391cd..5b5a6577082d7 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterFeatures.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterFeatures.java @@ -9,11 +9,12 @@ package org.elasticsearch.cluster; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.xcontent.ChunkedToXContent; import org.elasticsearch.common.xcontent.ChunkedToXContentObject; -import org.elasticsearch.core.SuppressForbidden; import org.elasticsearch.features.NodeFeature; import org.elasticsearch.xcontent.ToXContent; @@ -79,28 +80,61 @@ public Map> nodeFeatures() { return nodeFeatures; } - /** - * The features in all nodes in the cluster. - *

- * NOTE: This should not be used directly. - * Please use {@link org.elasticsearch.features.FeatureService#clusterHasFeature} instead. - */ - public Set allNodeFeatures() { + private Set allNodeFeatures() { if (allNodeFeatures == null) { allNodeFeatures = Set.copyOf(calculateAllNodeFeatures(nodeFeatures.values())); } return allNodeFeatures; } + /** + * Returns {@code true} if {@code node} can have assumed features. + * @see org.elasticsearch.env.BuildVersion#canRemoveAssumedFeatures + */ + public static boolean featuresCanBeAssumedForNode(DiscoveryNode node) { + return node.getBuildVersion().canRemoveAssumedFeatures(); + } + + /** + * Returns {@code true} if one or more nodes in {@code nodes} can have assumed features. + * @see org.elasticsearch.env.BuildVersion#canRemoveAssumedFeatures + */ + public static boolean featuresCanBeAssumedForNodes(DiscoveryNodes nodes) { + return nodes.getAllNodes().stream().anyMatch(n -> n.getBuildVersion().canRemoveAssumedFeatures()); + } + /** * {@code true} if {@code feature} is present on all nodes in the cluster. *

* NOTE: This should not be used directly. * Please use {@link org.elasticsearch.features.FeatureService#clusterHasFeature} instead. */ - @SuppressForbidden(reason = "directly reading cluster features") - public boolean clusterHasFeature(NodeFeature feature) { - return allNodeFeatures().contains(feature.id()); + public boolean clusterHasFeature(DiscoveryNodes nodes, NodeFeature feature) { + assert nodes.getNodes().keySet().equals(nodeFeatures.keySet()) + : "Cluster features nodes " + nodeFeatures.keySet() + " is different to discovery nodes " + nodes.getNodes().keySet(); + + // basic case + boolean allNodesHaveFeature = allNodeFeatures().contains(feature.id()); + if (allNodesHaveFeature) { + return true; + } + + // if the feature is assumed, check the versions more closely + // it's actually ok if the feature is assumed, and all nodes missing the feature can assume it + // TODO: do we need some kind of transient cache of this calculation? + if (feature.assumedAfterNextCompatibilityBoundary()) { + for (var nf : nodeFeatures.entrySet()) { + if (nf.getValue().contains(feature.id()) == false + && featuresCanBeAssumedForNode(nodes.getNodes().get(nf.getKey())) == false) { + return false; + } + } + + // all nodes missing the feature can assume it - so that's alright then + return true; + } + + return false; } /** diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/NodeJoinExecutor.java b/server/src/main/java/org/elasticsearch/cluster/coordination/NodeJoinExecutor.java index 5235293a54d95..74a8dc7851c89 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/NodeJoinExecutor.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/NodeJoinExecutor.java @@ -29,6 +29,7 @@ import org.elasticsearch.common.Priority; import org.elasticsearch.common.Strings; import org.elasticsearch.features.FeatureService; +import org.elasticsearch.features.NodeFeature; import org.elasticsearch.index.IndexVersion; import org.elasticsearch.index.IndexVersions; import org.elasticsearch.persistent.PersistentTasksCustomMetadata; @@ -39,6 +40,7 @@ import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Objects; @@ -137,8 +139,8 @@ public ClusterState execute(BatchExecutionContext batchExecutionContex DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(newState.nodes()); Map compatibilityVersionsMap = new HashMap<>(newState.compatibilityVersions()); - Map> nodeFeatures = new HashMap<>(newState.nodeFeatures()); - Set allNodesFeatures = ClusterFeatures.calculateAllNodeFeatures(nodeFeatures.values()); + Map> nodeFeatures = new HashMap<>(newState.nodeFeatures()); // as present in cluster state + Set effectiveClusterFeatures = calculateEffectiveClusterFeatures(newState.nodes(), nodeFeatures); assert nodesBuilder.isLocalNodeElectedMaster(); @@ -174,14 +176,17 @@ public ClusterState execute(BatchExecutionContext batchExecutionContex } blockForbiddenVersions(compatibilityVersions.transportVersion()); ensureNodesCompatibility(node.getVersion(), minClusterNodeVersion, maxClusterNodeVersion); - enforceNodeFeatureBarrier(node.getId(), allNodesFeatures, features); + Set newNodeEffectiveFeatures = enforceNodeFeatureBarrier(node, effectiveClusterFeatures, features); // we do this validation quite late to prevent race conditions between nodes joining and importing dangling indices // we have to reject nodes that don't support all indices we have in this cluster ensureIndexCompatibility(node.getMinIndexVersion(), node.getMaxIndexVersion(), initialState.getMetadata()); + nodesBuilder.add(node); compatibilityVersionsMap.put(node.getId(), compatibilityVersions); + // store the actual node features here, not including assumed features, as this is persisted in cluster state nodeFeatures.put(node.getId(), features); - allNodesFeatures.retainAll(features); + effectiveClusterFeatures.retainAll(newNodeEffectiveFeatures); + nodesChanged = true; minClusterNodeVersion = Version.min(minClusterNodeVersion, node.getVersion()); maxClusterNodeVersion = Version.max(maxClusterNodeVersion, node.getVersion()); @@ -355,6 +360,35 @@ private static void blockForbiddenVersions(TransportVersion joiningTransportVers } } + /** + * Calculate the cluster's effective features. This includes all features that are assumed on any nodes in the cluster, + * that are also present across the whole cluster as a result. + */ + private Set calculateEffectiveClusterFeatures(DiscoveryNodes nodes, Map> nodeFeatures) { + if (featureService.featuresCanBeAssumedForNodes(nodes)) { + Set assumedFeatures = featureService.getNodeFeatures() + .values() + .stream() + .filter(NodeFeature::assumedAfterNextCompatibilityBoundary) + .map(NodeFeature::id) + .collect(Collectors.toSet()); + + // add all assumed features to the featureset of all nodes of the next major version + nodeFeatures = new HashMap<>(nodeFeatures); + for (var node : nodes.getNodes().entrySet()) { + if (featureService.featuresCanBeAssumedForNode(node.getValue())) { + assert nodeFeatures.containsKey(node.getKey()) : "Node " + node.getKey() + " does not have any features"; + nodeFeatures.computeIfPresent(node.getKey(), (k, v) -> { + var newFeatures = new HashSet<>(v); + return newFeatures.addAll(assumedFeatures) ? newFeatures : v; + }); + } + } + } + + return ClusterFeatures.calculateAllNodeFeatures(nodeFeatures.values()); + } + /** * Ensures that all indices are compatible with the given index version. This will ensure that all indices in the given metadata * will not be created with a newer version of elasticsearch as well as that all indices are newer or equal to the minimum index @@ -461,13 +495,44 @@ public static void ensureVersionBarrier(Version joiningNodeVersion, Version minC } } - private void enforceNodeFeatureBarrier(String nodeId, Set existingNodesFeatures, Set newNodeFeatures) { + /** + * Enforces the feature join barrier - a joining node should have all features already present in all existing nodes in the cluster + * + * @return The set of features that this node has (including assumed features) + */ + private Set enforceNodeFeatureBarrier(DiscoveryNode node, Set effectiveClusterFeatures, Set newNodeFeatures) { // prevent join if it does not have one or more features that all other nodes have - Set missingFeatures = new HashSet<>(existingNodesFeatures); + Set missingFeatures = new HashSet<>(effectiveClusterFeatures); missingFeatures.removeAll(newNodeFeatures); - if (missingFeatures.isEmpty() == false) { - throw new IllegalStateException("Node " + nodeId + " is missing required features " + missingFeatures); + if (missingFeatures.isEmpty()) { + // nothing missing - all ok + return newNodeFeatures; + } + + if (featureService.featuresCanBeAssumedForNode(node)) { + // it might still be ok for this node to join if this node can have assumed features, + // and all the missing features are assumed + // we can get the NodeFeature object direct from this node's registered features + // as all existing nodes in the cluster have the features present in existingNodesFeatures, including this one + newNodeFeatures = new HashSet<>(newNodeFeatures); + for (Iterator it = missingFeatures.iterator(); it.hasNext();) { + String feature = it.next(); + NodeFeature nf = featureService.getNodeFeatures().get(feature); + if (nf.assumedAfterNextCompatibilityBoundary()) { + // its ok for this feature to be missing from this node + it.remove(); + // and it should be assumed to still be in the cluster + newNodeFeatures.add(feature); + } + // even if we don't remove it, still continue, so the exception message below is accurate + } + } + + if (missingFeatures.isEmpty()) { + return newNodeFeatures; + } else { + throw new IllegalStateException("Node " + node.getId() + " is missing required features " + missingFeatures); } } diff --git a/server/src/main/java/org/elasticsearch/env/BuildVersion.java b/server/src/main/java/org/elasticsearch/env/BuildVersion.java index 7a6b27eab2330..5c3602283fef3 100644 --- a/server/src/main/java/org/elasticsearch/env/BuildVersion.java +++ b/server/src/main/java/org/elasticsearch/env/BuildVersion.java @@ -37,6 +37,12 @@ */ public abstract class BuildVersion implements ToXContentFragment, Writeable { + /** + * Checks if this version can operate properly in a cluster without features + * that are assumed in the currently running Elasticsearch. + */ + public abstract boolean canRemoveAssumedFeatures(); + /** * Check whether this version is on or after a minimum threshold. * diff --git a/server/src/main/java/org/elasticsearch/env/DefaultBuildVersion.java b/server/src/main/java/org/elasticsearch/env/DefaultBuildVersion.java index a7e1a4fee341d..70aa3f6639a4d 100644 --- a/server/src/main/java/org/elasticsearch/env/DefaultBuildVersion.java +++ b/server/src/main/java/org/elasticsearch/env/DefaultBuildVersion.java @@ -47,6 +47,17 @@ final class DefaultBuildVersion extends BuildVersion { this(in.readVInt()); } + @Override + public boolean canRemoveAssumedFeatures() { + /* + * We can remove assumed features if the node version is the next major version. + * This is because the next major version can only form a cluster with the + * latest minor version of the previous major, so any features introduced before that point + * (that are marked as assumed in the running code version) are automatically met by that version. + */ + return version.major == Version.CURRENT.major + 1; + } + @Override public boolean onOrAfterMinimumCompatible() { return Version.CURRENT.minimumCompatibilityVersion().onOrBefore(version); diff --git a/server/src/main/java/org/elasticsearch/features/FeatureService.java b/server/src/main/java/org/elasticsearch/features/FeatureService.java index 9a0ac7cafc183..c04fbae05ee2c 100644 --- a/server/src/main/java/org/elasticsearch/features/FeatureService.java +++ b/server/src/main/java/org/elasticsearch/features/FeatureService.java @@ -9,7 +9,10 @@ package org.elasticsearch.features; +import org.elasticsearch.cluster.ClusterFeatures; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.core.SuppressForbidden; import org.elasticsearch.logging.LogManager; import org.elasticsearch.logging.Logger; @@ -38,9 +41,7 @@ public class FeatureService { * as the local node's supported feature set */ public FeatureService(List specs) { - - var featureData = FeatureData.createFromSpecifications(specs); - nodeFeatures = featureData.getNodeFeatures(); + this.nodeFeatures = FeatureData.createFromSpecifications(specs).getNodeFeatures(); logger.info("Registered local node features {}", nodeFeatures.keySet().stream().sorted().toList()); } @@ -53,11 +54,25 @@ public Map getNodeFeatures() { return nodeFeatures; } + /** + * Returns {@code true} if {@code node} can have assumed features. + */ + public boolean featuresCanBeAssumedForNode(DiscoveryNode node) { + return ClusterFeatures.featuresCanBeAssumedForNode(node); + } + + /** + * Returns {@code true} if one or more nodes in {@code nodes} can have assumed features. + */ + public boolean featuresCanBeAssumedForNodes(DiscoveryNodes nodes) { + return ClusterFeatures.featuresCanBeAssumedForNodes(nodes); + } + /** * Returns {@code true} if all nodes in {@code state} support feature {@code feature}. */ @SuppressForbidden(reason = "We need basic feature information from cluster state") public boolean clusterHasFeature(ClusterState state, NodeFeature feature) { - return state.clusterFeatures().clusterHasFeature(feature); + return state.clusterFeatures().clusterHasFeature(state.nodes(), feature); } } diff --git a/server/src/main/java/org/elasticsearch/features/NodeFeature.java b/server/src/main/java/org/elasticsearch/features/NodeFeature.java index 957308e805562..961b386d62802 100644 --- a/server/src/main/java/org/elasticsearch/features/NodeFeature.java +++ b/server/src/main/java/org/elasticsearch/features/NodeFeature.java @@ -15,10 +15,17 @@ * A feature published by a node. * * @param id The feature id. Must be unique in the node. + * @param assumedAfterNextCompatibilityBoundary + * {@code true} if this feature is removed at the next compatibility boundary (ie next major version), + * and so should be assumed to be true for all nodes after that boundary. */ -public record NodeFeature(String id) { +public record NodeFeature(String id, boolean assumedAfterNextCompatibilityBoundary) { public NodeFeature { Objects.requireNonNull(id); } + + public NodeFeature(String id) { + this(id, false); + } } diff --git a/server/src/main/java/org/elasticsearch/readiness/ReadinessService.java b/server/src/main/java/org/elasticsearch/readiness/ReadinessService.java index 15b9eacfa2118..de56ead9b5aba 100644 --- a/server/src/main/java/org/elasticsearch/readiness/ReadinessService.java +++ b/server/src/main/java/org/elasticsearch/readiness/ReadinessService.java @@ -294,8 +294,8 @@ protected boolean areFileSettingsApplied(ClusterState clusterState) { } @SuppressForbidden(reason = "need to check file settings support on exact cluster state") - private static boolean supportsFileSettings(ClusterState clusterState) { - return clusterState.clusterFeatures().clusterHasFeature(FileSettingsFeatures.FILE_SETTINGS_SUPPORTED); + private boolean supportsFileSettings(ClusterState clusterState) { + return clusterState.clusterFeatures().clusterHasFeature(clusterState.nodes(), FileSettingsFeatures.FILE_SETTINGS_SUPPORTED); } private void setReady(boolean ready) { diff --git a/server/src/test/java/org/elasticsearch/action/datastreams/autosharding/DataStreamAutoShardingServiceTests.java b/server/src/test/java/org/elasticsearch/action/datastreams/autosharding/DataStreamAutoShardingServiceTests.java index 2c6e273bb6e23..ba0f04d174f43 100644 --- a/server/src/test/java/org/elasticsearch/action/datastreams/autosharding/DataStreamAutoShardingServiceTests.java +++ b/server/src/test/java/org/elasticsearch/action/datastreams/autosharding/DataStreamAutoShardingServiceTests.java @@ -19,6 +19,8 @@ import org.elasticsearch.cluster.metadata.IndexMetadataStats; import org.elasticsearch.cluster.metadata.IndexWriteLoad; import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.node.DiscoveryNodeUtils; +import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Setting; @@ -110,6 +112,7 @@ public void testCalculateValidations() { ); builder.put(dataStream); ClusterState state = ClusterState.builder(ClusterName.DEFAULT) + .nodes(DiscoveryNodes.builder().add(DiscoveryNodeUtils.create("n1")).add(DiscoveryNodeUtils.create("n2"))) .nodeFeatures( Map.of( "n1", @@ -143,8 +146,9 @@ public Set getFeatures() { // cluster doesn't have feature ClusterState stateNoFeature = ClusterState.builder(ClusterName.DEFAULT).metadata(Metadata.builder()).build(); + Settings settings = Settings.builder().put(DataStreamAutoShardingService.DATA_STREAMS_AUTO_SHARDING_ENABLED, true).build(); DataStreamAutoShardingService noFeatureService = new DataStreamAutoShardingService( - Settings.builder().put(DataStreamAutoShardingService.DATA_STREAMS_AUTO_SHARDING_ENABLED, true).build(), + settings, clusterService, new FeatureService(List.of()), () -> now @@ -155,15 +159,16 @@ public Set getFeatures() { } { + Settings settings = Settings.builder() + .put(DataStreamAutoShardingService.DATA_STREAMS_AUTO_SHARDING_ENABLED, true) + .putList( + DataStreamAutoShardingService.DATA_STREAMS_AUTO_SHARDING_EXCLUDES_SETTING.getKey(), + List.of("foo", dataStreamName + "*") + ) + .build(); // patterns are configured to exclude the current data stream DataStreamAutoShardingService noFeatureService = new DataStreamAutoShardingService( - Settings.builder() - .put(DataStreamAutoShardingService.DATA_STREAMS_AUTO_SHARDING_ENABLED, true) - .putList( - DataStreamAutoShardingService.DATA_STREAMS_AUTO_SHARDING_EXCLUDES_SETTING.getKey(), - List.of("foo", dataStreamName + "*") - ) - .build(), + settings, clusterService, new FeatureService(List.of()), () -> now @@ -199,6 +204,7 @@ public void testCalculateIncreaseShardingRecommendations() { DataStream dataStream = dataStreamSupplier.apply(null); builder.put(dataStream); ClusterState state = ClusterState.builder(ClusterName.DEFAULT) + .nodes(DiscoveryNodes.builder().add(DiscoveryNodeUtils.create("n1")).add(DiscoveryNodeUtils.create("n2"))) .nodeFeatures( Map.of( "n1", @@ -237,6 +243,7 @@ public void testCalculateIncreaseShardingRecommendations() { ); builder.put(dataStream); ClusterState state = ClusterState.builder(ClusterName.DEFAULT) + .nodes(DiscoveryNodes.builder().add(DiscoveryNodeUtils.create("n1")).add(DiscoveryNodeUtils.create("n2"))) .nodeFeatures( Map.of( "n1", @@ -275,6 +282,7 @@ public void testCalculateIncreaseShardingRecommendations() { ); builder.put(dataStream); ClusterState state = ClusterState.builder(ClusterName.DEFAULT) + .nodes(DiscoveryNodes.builder().add(DiscoveryNodeUtils.create("n1")).add(DiscoveryNodeUtils.create("n2"))) .nodeFeatures( Map.of( "n1", @@ -313,6 +321,7 @@ public void testCalculateDecreaseShardingRecommendations() { DataStream dataStream = dataStreamSupplier.apply(null); builder.put(dataStream); ClusterState state = ClusterState.builder(ClusterName.DEFAULT) + .nodes(DiscoveryNodes.builder().add(DiscoveryNodeUtils.create("n1")).add(DiscoveryNodeUtils.create("n2"))) .nodeFeatures( Map.of( "n1", @@ -353,6 +362,7 @@ public void testCalculateDecreaseShardingRecommendations() { DataStream dataStream = dataStreamSupplier.apply(null); builder.put(dataStream); ClusterState state = ClusterState.builder(ClusterName.DEFAULT) + .nodes(DiscoveryNodes.builder().add(DiscoveryNodeUtils.create("n1")).add(DiscoveryNodeUtils.create("n2"))) .nodeFeatures( Map.of( "n1", @@ -401,6 +411,7 @@ public void testCalculateDecreaseShardingRecommendations() { ); builder.put(dataStream); ClusterState state = ClusterState.builder(ClusterName.DEFAULT) + .nodes(DiscoveryNodes.builder().add(DiscoveryNodeUtils.create("n1")).add(DiscoveryNodeUtils.create("n2"))) .nodeFeatures( Map.of( "n1", @@ -447,6 +458,7 @@ public void testCalculateDecreaseShardingRecommendations() { ); builder.put(dataStream); ClusterState state = ClusterState.builder(ClusterName.DEFAULT) + .nodes(DiscoveryNodes.builder().add(DiscoveryNodeUtils.create("n1")).add(DiscoveryNodeUtils.create("n2"))) .nodeFeatures( Map.of( "n1", @@ -487,6 +499,7 @@ public void testCalculateDecreaseShardingRecommendations() { DataStream dataStream = dataStreamSupplier.apply(null); builder.put(dataStream); ClusterState state = ClusterState.builder(ClusterName.DEFAULT) + .nodes(DiscoveryNodes.builder().add(DiscoveryNodeUtils.create("n1")).add(DiscoveryNodeUtils.create("n2"))) .nodeFeatures( Map.of( "n1", diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinExecutorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinExecutorTests.java index 27775270a83eb..492a142492e18 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinExecutorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinExecutorTests.java @@ -33,6 +33,7 @@ import org.elasticsearch.common.ReferenceDocs; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.SuppressForbidden; import org.elasticsearch.features.FeatureService; import org.elasticsearch.features.FeatureSpecification; import org.elasticsearch.features.NodeFeature; @@ -46,11 +47,13 @@ import org.elasticsearch.threadpool.ThreadPool; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Stream; import static org.elasticsearch.cluster.metadata.DesiredNodesTestCase.assertDesiredNodesStatusIsCorrect; @@ -227,6 +230,227 @@ public Set getFeatures() { ); } + @SuppressForbidden(reason = "we need to actually check what is in cluster state") + private static Map> getRecordedNodeFeatures(ClusterState state) { + return state.clusterFeatures().nodeFeatures(); + } + + private static Version nextMajor() { + return Version.fromId((Version.CURRENT.major + 1) * 1_000_000 + 99); + } + + public void testCanJoinClusterWithAssumedFeatures() throws Exception { + AllocationService allocationService = createAllocationService(); + RerouteService rerouteService = (reason, priority, listener) -> listener.onResponse(null); + FeatureService featureService = new FeatureService(List.of(new FeatureSpecification() { + @Override + public Set getFeatures() { + return Set.of(new NodeFeature("f1"), new NodeFeature("af1", true), new NodeFeature("af2", true)); + } + })); + + NodeJoinExecutor executor = new NodeJoinExecutor(allocationService, rerouteService, featureService); + + DiscoveryNode masterNode = DiscoveryNodeUtils.create(UUIDs.base64UUID()); + DiscoveryNode otherNode = DiscoveryNodeUtils.create(UUIDs.base64UUID()); + Map> features = new HashMap<>(); + features.put(masterNode.getId(), Set.of("f1", "af1", "af2")); + features.put(otherNode.getId(), Set.of("f1", "af1", "af2")); + ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT) + .nodes(DiscoveryNodes.builder().add(masterNode).localNodeId(masterNode.getId()).masterNodeId(masterNode.getId()).add(otherNode)) + .nodeFeatures(features) + .build(); + + // it is valid for major+1 versions to join clusters assumed features still present + // this can happen in the process of marking, then removing, assumed features + // they should still be recorded appropriately + DiscoveryNode newNode = DiscoveryNodeUtils.builder(UUIDs.base64UUID()) + .version(nextMajor(), IndexVersions.MINIMUM_COMPATIBLE, IndexVersion.current()) + .build(); + clusterState = ClusterStateTaskExecutorUtils.executeAndAssertSuccessful( + clusterState, + executor, + List.of( + JoinTask.singleNode( + newNode, + CompatibilityVersionsUtils.staticCurrent(), + Set.of("f1", "af2"), + TEST_REASON, + NO_FAILURE_LISTENER, + 0L + ) + ) + ); + features.put(newNode.getId(), Set.of("f1", "af2")); + + // extra final check that the recorded cluster features are as they should be + assertThat(getRecordedNodeFeatures(clusterState), equalTo(features)); + } + + public void testJoinClusterWithAssumedFeaturesDoesntAllowNonAssumed() throws Exception { + AllocationService allocationService = createAllocationService(); + RerouteService rerouteService = (reason, priority, listener) -> listener.onResponse(null); + FeatureService featureService = new FeatureService(List.of(new FeatureSpecification() { + @Override + public Set getFeatures() { + return Set.of(new NodeFeature("f1"), new NodeFeature("af1", true)); + } + })); + + NodeJoinExecutor executor = new NodeJoinExecutor(allocationService, rerouteService, featureService); + + DiscoveryNode masterNode = DiscoveryNodeUtils.create(UUIDs.base64UUID()); + DiscoveryNode otherNode = DiscoveryNodeUtils.create(UUIDs.base64UUID()); + Map> features = new HashMap<>(); + features.put(masterNode.getId(), Set.of("f1", "af1")); + features.put(otherNode.getId(), Set.of("f1", "af1")); + + ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT) + .nodes(DiscoveryNodes.builder().add(masterNode).localNodeId(masterNode.getId()).masterNodeId(masterNode.getId()).add(otherNode)) + .nodeFeatures(features) + .build(); + + DiscoveryNode newNodeNextMajor = DiscoveryNodeUtils.builder(UUIDs.base64UUID()) + .version(nextMajor(), IndexVersions.MINIMUM_COMPATIBLE, IndexVersion.current()) + .build(); + clusterState = ClusterStateTaskExecutorUtils.executeAndAssertSuccessful( + clusterState, + executor, + List.of( + JoinTask.singleNode( + newNodeNextMajor, + CompatibilityVersionsUtils.staticCurrent(), + Set.of("f1"), + TEST_REASON, + NO_FAILURE_LISTENER, + 0L + ) + ) + ); + features.put(newNodeNextMajor.getId(), Set.of("f1")); + + // even though a next major has joined without af1, this doesnt allow the current major to join with af1 missing features + DiscoveryNode newNodeCurMajor = DiscoveryNodeUtils.create(UUIDs.base64UUID()); + AtomicReference ex = new AtomicReference<>(); + clusterState = ClusterStateTaskExecutorUtils.executeAndAssertSuccessful( + clusterState, + executor, + List.of( + JoinTask.singleNode( + newNodeCurMajor, + CompatibilityVersionsUtils.staticCurrent(), + Set.of("f1"), + TEST_REASON, + ActionTestUtils.assertNoSuccessListener(ex::set), + 0L + ) + ) + ); + assertThat(ex.get().getMessage(), containsString("missing required features [af1]")); + + // a next major can't join missing non-assumed features + DiscoveryNode newNodeNextMajorMissing = DiscoveryNodeUtils.builder(UUIDs.base64UUID()) + .version(nextMajor(), IndexVersions.MINIMUM_COMPATIBLE, IndexVersion.current()) + .build(); + ex.set(null); + clusterState = ClusterStateTaskExecutorUtils.executeAndAssertSuccessful( + clusterState, + executor, + List.of( + JoinTask.singleNode( + newNodeNextMajorMissing, + CompatibilityVersionsUtils.staticCurrent(), + Set.of(), + TEST_REASON, + ActionTestUtils.assertNoSuccessListener(ex::set), + 0L + ) + ) + ); + assertThat(ex.get().getMessage(), containsString("missing required features [f1]")); + + // extra final check that the recorded cluster features are as they should be, and newNodeNextMajor hasn't gained af1 + assertThat(getRecordedNodeFeatures(clusterState), equalTo(features)); + } + + /* + * Same as above but the current major missing features is processed in the same execution + */ + public void testJoinClusterWithAssumedFeaturesDoesntAllowNonAssumedSameExecute() throws Exception { + AllocationService allocationService = createAllocationService(); + RerouteService rerouteService = (reason, priority, listener) -> listener.onResponse(null); + FeatureService featureService = new FeatureService(List.of(new FeatureSpecification() { + @Override + public Set getFeatures() { + return Set.of(new NodeFeature("f1"), new NodeFeature("af1", true)); + } + })); + + NodeJoinExecutor executor = new NodeJoinExecutor(allocationService, rerouteService, featureService); + + DiscoveryNode masterNode = DiscoveryNodeUtils.create(UUIDs.base64UUID()); + DiscoveryNode otherNode = DiscoveryNodeUtils.create(UUIDs.base64UUID()); + Map> features = new HashMap<>(); + features.put(masterNode.getId(), Set.of("f1", "af1")); + features.put(otherNode.getId(), Set.of("f1", "af1")); + + ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT) + .nodes(DiscoveryNodes.builder().add(masterNode).localNodeId(masterNode.getId()).masterNodeId(masterNode.getId()).add(otherNode)) + .nodeFeatures(features) + .build(); + + DiscoveryNode newNodeNextMajor = DiscoveryNodeUtils.builder(UUIDs.base64UUID()) + .version(nextMajor(), IndexVersions.MINIMUM_COMPATIBLE, IndexVersion.current()) + .build(); + DiscoveryNode newNodeCurMajor = DiscoveryNodeUtils.create(UUIDs.base64UUID()); + DiscoveryNode newNodeNextMajorMissing = DiscoveryNodeUtils.builder(UUIDs.base64UUID()) + .version(nextMajor(), IndexVersions.MINIMUM_COMPATIBLE, IndexVersion.current()) + .build(); + // even though a next major could join, this doesnt allow the current major to join with missing features + // nor a next major missing non-assumed features + AtomicReference thisMajorEx = new AtomicReference<>(); + AtomicReference nextMajorEx = new AtomicReference<>(); + List tasks = List.of( + JoinTask.singleNode( + newNodeNextMajor, + CompatibilityVersionsUtils.staticCurrent(), + Set.of("f1"), + TEST_REASON, + NO_FAILURE_LISTENER, + 0L + ), + JoinTask.singleNode( + newNodeCurMajor, + CompatibilityVersionsUtils.staticCurrent(), + Set.of("f1"), + TEST_REASON, + ActionTestUtils.assertNoSuccessListener(thisMajorEx::set), + 0L + ), + JoinTask.singleNode( + newNodeNextMajorMissing, + CompatibilityVersionsUtils.staticCurrent(), + Set.of(), + TEST_REASON, + ActionTestUtils.assertNoSuccessListener(nextMajorEx::set), + 0L + ) + ); + if (randomBoolean()) { + // sometimes combine them together into a single task for completeness + tasks = List.of(new JoinTask(tasks.stream().flatMap(t -> t.nodeJoinTasks().stream()).toList(), false, 0L, null)); + } + + clusterState = ClusterStateTaskExecutorUtils.executeAndAssertSuccessful(clusterState, executor, tasks); + features.put(newNodeNextMajor.getId(), Set.of("f1")); + + assertThat(thisMajorEx.get().getMessage(), containsString("missing required features [af1]")); + assertThat(nextMajorEx.get().getMessage(), containsString("missing required features [f1]")); + + // extra check that the recorded cluster features are as they should be, and newNodeNextMajor hasn't gained af1 + assertThat(getRecordedNodeFeatures(clusterState), equalTo(features)); + } + public void testSuccess() { Settings.builder().build(); Metadata.Builder metaBuilder = Metadata.builder(); @@ -921,8 +1145,8 @@ public void testSetsNodeFeaturesWhenRejoining() throws Exception { .nodeFeatures(Map.of(masterNode.getId(), Set.of("f1", "f2"), rejoinNode.getId(), Set.of())) .build(); - assertThat(clusterState.clusterFeatures().clusterHasFeature(new NodeFeature("f1")), is(false)); - assertThat(clusterState.clusterFeatures().clusterHasFeature(new NodeFeature("f2")), is(false)); + assertThat(clusterState.clusterFeatures().clusterHasFeature(clusterState.nodes(), new NodeFeature("f1")), is(false)); + assertThat(clusterState.clusterFeatures().clusterHasFeature(clusterState.nodes(), new NodeFeature("f2")), is(false)); final var resultingState = ClusterStateTaskExecutorUtils.executeAndAssertSuccessful( clusterState, @@ -939,8 +1163,8 @@ public void testSetsNodeFeaturesWhenRejoining() throws Exception { ) ); - assertThat(resultingState.clusterFeatures().clusterHasFeature(new NodeFeature("f1")), is(true)); - assertThat(resultingState.clusterFeatures().clusterHasFeature(new NodeFeature("f2")), is(true)); + assertThat(resultingState.clusterFeatures().clusterHasFeature(resultingState.nodes(), new NodeFeature("f1")), is(true)); + assertThat(resultingState.clusterFeatures().clusterHasFeature(resultingState.nodes(), new NodeFeature("f2")), is(true)); } private DesiredNodeWithStatus createActualizedDesiredNode() { diff --git a/server/src/test/java/org/elasticsearch/features/FeatureServiceTests.java b/server/src/test/java/org/elasticsearch/features/FeatureServiceTests.java index 874a6a96313e4..a64303f376b20 100644 --- a/server/src/test/java/org/elasticsearch/features/FeatureServiceTests.java +++ b/server/src/test/java/org/elasticsearch/features/FeatureServiceTests.java @@ -9,8 +9,14 @@ package org.elasticsearch.features; +import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.node.DiscoveryNodeUtils; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.node.VersionInformation; +import org.elasticsearch.index.IndexVersion; +import org.elasticsearch.index.IndexVersions; import org.elasticsearch.test.ESTestCase; import java.util.List; @@ -69,6 +75,12 @@ public void testStateHasFeatures() { ); ClusterState state = ClusterState.builder(ClusterName.DEFAULT) + .nodes( + DiscoveryNodes.builder() + .add(DiscoveryNodeUtils.create("node1")) + .add(DiscoveryNodeUtils.create("node2")) + .add(DiscoveryNodeUtils.create("node3")) + ) .nodeFeatures( Map.of("node1", Set.of("f1", "f2", "nf1"), "node2", Set.of("f1", "f2", "nf2"), "node3", Set.of("f1", "f2", "nf1")) ) @@ -81,4 +93,33 @@ public void testStateHasFeatures() { assertFalse(service.clusterHasFeature(state, new NodeFeature("nf2"))); assertFalse(service.clusterHasFeature(state, new NodeFeature("nf3"))); } + + private static Version nextMajor() { + return Version.fromId((Version.CURRENT.major + 1) * 1_000_000 + 99); + } + + public void testStateHasAssumedFeatures() { + List specs = List.of( + new TestFeatureSpecification(Set.of(new NodeFeature("f1"), new NodeFeature("f2"), new NodeFeature("af1", true))) + ); + + ClusterState state = ClusterState.builder(ClusterName.DEFAULT) + .nodes( + DiscoveryNodes.builder() + .add(DiscoveryNodeUtils.create("node1")) + .add(DiscoveryNodeUtils.create("node2")) + .add( + DiscoveryNodeUtils.builder("node3") + .version(new VersionInformation(nextMajor(), IndexVersions.MINIMUM_COMPATIBLE, IndexVersion.current())) + .build() + ) + ) + .nodeFeatures(Map.of("node1", Set.of("f1", "af1"), "node2", Set.of("f1", "f2", "af1"), "node3", Set.of("f1", "f2"))) + .build(); + + FeatureService service = new FeatureService(specs); + assertTrue(service.clusterHasFeature(state, new NodeFeature("f1"))); + assertFalse(service.clusterHasFeature(state, new NodeFeature("f2"))); + assertTrue(service.clusterHasFeature(state, new NodeFeature("af1", true))); + } } diff --git a/server/src/test/java/org/elasticsearch/health/node/selection/HealthNodeTaskExecutorTests.java b/server/src/test/java/org/elasticsearch/health/node/selection/HealthNodeTaskExecutorTests.java index 97f44f7480a72..92bfabf6f1972 100644 --- a/server/src/test/java/org/elasticsearch/health/node/selection/HealthNodeTaskExecutorTests.java +++ b/server/src/test/java/org/elasticsearch/health/node/selection/HealthNodeTaskExecutorTests.java @@ -77,8 +77,8 @@ public void setUp() throws Exception { clusterService = createClusterService(threadPool); localNodeId = clusterService.localNode().getId(); persistentTasksService = mock(PersistentTasksService.class); - featureService = new FeatureService(List.of(new HealthFeatures())); settings = Settings.builder().build(); + featureService = new FeatureService(List.of(new HealthFeatures())); clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); } diff --git a/x-pack/plugin/slm/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleServiceTests.java b/x-pack/plugin/slm/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleServiceTests.java index 36887681f5575..9955fe4cf0f95 100644 --- a/x-pack/plugin/slm/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleServiceTests.java +++ b/x-pack/plugin/slm/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleServiceTests.java @@ -529,6 +529,7 @@ public void testValidateIntervalScheduleSupport() { var featureService = new FeatureService(List.of(new SnapshotLifecycleFeatures())); { ClusterState state = ClusterState.builder(new ClusterName("cluster")) + .nodes(DiscoveryNodes.builder().add(DiscoveryNodeUtils.create("a")).add(DiscoveryNodeUtils.create("b"))) .nodeFeatures(Map.of("a", Set.of(), "b", Set.of(SnapshotLifecycleService.INTERVAL_SCHEDULE.id()))) .build(); @@ -540,6 +541,7 @@ public void testValidateIntervalScheduleSupport() { } { ClusterState state = ClusterState.builder(new ClusterName("cluster")) + .nodes(DiscoveryNodes.builder().add(DiscoveryNodeUtils.create("a"))) .nodeFeatures(Map.of("a", Set.of(SnapshotLifecycleService.INTERVAL_SCHEDULE.id()))) .build(); try { @@ -550,6 +552,7 @@ public void testValidateIntervalScheduleSupport() { } { ClusterState state = ClusterState.builder(new ClusterName("cluster")) + .nodes(DiscoveryNodes.builder().add(DiscoveryNodeUtils.create("a")).add(DiscoveryNodeUtils.create("b"))) .nodeFeatures(Map.of("a", Set.of(), "b", Set.of(SnapshotLifecycleService.INTERVAL_SCHEDULE.id()))) .build(); try {