Skip to content

Commit

Permalink
Merge branch '8.x' into backport/8.x/pr-117989
Browse files Browse the repository at this point in the history
  • Loading branch information
idegtiarenko authored Dec 18, 2024
2 parents 8b9a833 + 1c85bdb commit 56f0d44
Show file tree
Hide file tree
Showing 41 changed files with 888 additions and 161 deletions.
1 change: 1 addition & 0 deletions .buildkite/pipelines/periodic-platform-support.yml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ steps:
image:
- almalinux-8-aarch64
- ubuntu-2004-aarch64
- ubuntu-2404-aarch64
GRADLE_TASK:
- checkPart1
- checkPart2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ org.elasticsearch.cluster.ClusterFeatures#nodeFeatures()
@defaultMessage ClusterFeatures#allNodeFeatures is for internal use only. Use FeatureService#clusterHasFeature to determine if a feature is present on the cluster.
org.elasticsearch.cluster.ClusterFeatures#allNodeFeatures()
@defaultMessage ClusterFeatures#clusterHasFeature is for internal use only. Use FeatureService#clusterHasFeature to determine if a feature is present on the cluster.
org.elasticsearch.cluster.ClusterFeatures#clusterHasFeature(org.elasticsearch.features.NodeFeature)
org.elasticsearch.cluster.ClusterFeatures#clusterHasFeature(org.elasticsearch.cluster.node.DiscoveryNodes, org.elasticsearch.features.NodeFeature)

@defaultMessage Do not construct this records outside the source files they are declared in
org.elasticsearch.cluster.SnapshotsInProgress$ShardSnapshotStatus#<init>(java.lang.String, org.elasticsearch.cluster.SnapshotsInProgress$ShardState, org.elasticsearch.repositories.ShardGeneration, java.lang.String, org.elasticsearch.repositories.ShardSnapshotResult)
Expand Down
5 changes: 5 additions & 0 deletions docs/changelog/118143.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 118143
summary: Infrastructure for assuming cluster features in the next major version
area: "Infra/Core"
type: feature
issues: []
5 changes: 5 additions & 0 deletions docs/changelog/118544.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 118544
summary: ESQL - Remove restrictions for disjunctions in full text functions
area: ES|QL
type: enhancement
issues: []
3 changes: 0 additions & 3 deletions muted-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -434,9 +434,6 @@ tests:
- class: org.elasticsearch.xpack.apmdata.APMYamlTestSuiteIT
method: test {yaml=/20_metrics_ingest/Test metrics-apm.app-* setting event.ingested via ingest pipeline}
issue: https://github.com/elastic/elasticsearch/issues/118875
- class: org.elasticsearch.backwards.MixedClusterClientYamlTestSuiteIT
method: test {p0=synonyms/90_synonyms_reloading_for_synset/Reload analyzers for specific synonym set}
issue: https://github.com/elastic/elasticsearch/issues/116777
- class: org.elasticsearch.xpack.ml.integration.ForecastIT
method: testOverflowToDisk
issue: https://github.com/elastic/elasticsearch/issues/117740
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@

package org.elasticsearch.cluster;

import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ChunkedToXContent;
Expand Down Expand Up @@ -92,15 +94,55 @@ public Set<String> allNodeFeatures() {
return allNodeFeatures;
}

/**
* Returns {@code true} if {@code node} can have assumed features.
* @see org.elasticsearch.env.BuildVersion#canRemoveAssumedFeatures
*/
public static boolean featuresCanBeAssumedForNode(DiscoveryNode node) {
return node.getBuildVersion().canRemoveAssumedFeatures();
}

/**
* Returns {@code true} if one or more nodes in {@code nodes} can have assumed features.
* @see org.elasticsearch.env.BuildVersion#canRemoveAssumedFeatures
*/
public static boolean featuresCanBeAssumedForNodes(DiscoveryNodes nodes) {
return nodes.getAllNodes().stream().anyMatch(n -> n.getBuildVersion().canRemoveAssumedFeatures());
}

/**
* {@code true} if {@code feature} is present on all nodes in the cluster.
* <p>
* NOTE: This should not be used directly, as it does not read historical features.
* Please use {@link org.elasticsearch.features.FeatureService#clusterHasFeature} instead.
*/
@SuppressForbidden(reason = "directly reading cluster features")
public boolean clusterHasFeature(NodeFeature feature) {
return allNodeFeatures().contains(feature.id());
public boolean clusterHasFeature(DiscoveryNodes nodes, NodeFeature feature) {
assert nodes.getNodes().keySet().equals(nodeFeatures.keySet())
: "Cluster features nodes " + nodeFeatures.keySet() + " is different to discovery nodes " + nodes.getNodes().keySet();

// basic case
boolean allNodesHaveFeature = allNodeFeatures().contains(feature.id());
if (allNodesHaveFeature) {
return true;
}

// if the feature is assumed, check the versions more closely
// it's actually ok if the feature is assumed, and all nodes missing the feature can assume it
// TODO: do we need some kind of transient cache of this calculation?
if (feature.assumedAfterNextCompatibilityBoundary()) {
for (var nf : nodeFeatures.entrySet()) {
if (nf.getValue().contains(feature.id()) == false
&& featuresCanBeAssumedForNode(nodes.getNodes().get(nf.getKey())) == false) {
return false;
}
}

// all nodes missing the feature can assume it - so that's alright then
return true;
}

return false;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Strings;
import org.elasticsearch.features.FeatureService;
import org.elasticsearch.features.NodeFeature;
import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.index.IndexVersions;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
Expand All @@ -39,6 +40,7 @@
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -137,8 +139,8 @@ public ClusterState execute(BatchExecutionContext<JoinTask> batchExecutionContex

DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(newState.nodes());
Map<String, CompatibilityVersions> compatibilityVersionsMap = new HashMap<>(newState.compatibilityVersions());
Map<String, Set<String>> nodeFeatures = new HashMap<>(newState.nodeFeatures());
Set<String> allNodesFeatures = ClusterFeatures.calculateAllNodeFeatures(nodeFeatures.values());
Map<String, Set<String>> nodeFeatures = new HashMap<>(newState.nodeFeatures()); // as present in cluster state
Set<String> effectiveClusterFeatures = calculateEffectiveClusterFeatures(newState.nodes(), nodeFeatures);

assert nodesBuilder.isLocalNodeElectedMaster();

Expand Down Expand Up @@ -174,14 +176,17 @@ public ClusterState execute(BatchExecutionContext<JoinTask> batchExecutionContex
}
blockForbiddenVersions(compatibilityVersions.transportVersion());
ensureNodesCompatibility(node.getVersion(), minClusterNodeVersion, maxClusterNodeVersion);
enforceNodeFeatureBarrier(node.getId(), allNodesFeatures, features);
Set<String> newNodeEffectiveFeatures = enforceNodeFeatureBarrier(node, effectiveClusterFeatures, features);
// we do this validation quite late to prevent race conditions between nodes joining and importing dangling indices
// we have to reject nodes that don't support all indices we have in this cluster
ensureIndexCompatibility(node.getMinIndexVersion(), node.getMaxIndexVersion(), initialState.getMetadata());

nodesBuilder.add(node);
compatibilityVersionsMap.put(node.getId(), compatibilityVersions);
// store the actual node features here, not including assumed features, as this is persisted in cluster state
nodeFeatures.put(node.getId(), features);
allNodesFeatures.retainAll(features);
effectiveClusterFeatures.retainAll(newNodeEffectiveFeatures);

nodesChanged = true;
minClusterNodeVersion = Version.min(minClusterNodeVersion, node.getVersion());
maxClusterNodeVersion = Version.max(maxClusterNodeVersion, node.getVersion());
Expand Down Expand Up @@ -355,6 +360,35 @@ private static void blockForbiddenVersions(TransportVersion joiningTransportVers
}
}

/**
* Calculate the cluster's effective features. This includes all features that are assumed on any nodes in the cluster,
* that are also present across the whole cluster as a result.
*/
private Set<String> calculateEffectiveClusterFeatures(DiscoveryNodes nodes, Map<String, Set<String>> nodeFeatures) {
if (featureService.featuresCanBeAssumedForNodes(nodes)) {
Set<String> assumedFeatures = featureService.getNodeFeatures()
.values()
.stream()
.filter(NodeFeature::assumedAfterNextCompatibilityBoundary)
.map(NodeFeature::id)
.collect(Collectors.toSet());

// add all assumed features to the featureset of all nodes of the next major version
nodeFeatures = new HashMap<>(nodeFeatures);
for (var node : nodes.getNodes().entrySet()) {
if (featureService.featuresCanBeAssumedForNode(node.getValue())) {
assert nodeFeatures.containsKey(node.getKey()) : "Node " + node.getKey() + " does not have any features";
nodeFeatures.computeIfPresent(node.getKey(), (k, v) -> {
var newFeatures = new HashSet<>(v);
return newFeatures.addAll(assumedFeatures) ? newFeatures : v;
});
}
}
}

return ClusterFeatures.calculateAllNodeFeatures(nodeFeatures.values());
}

/**
* Ensures that all indices are compatible with the given index version. This will ensure that all indices in the given metadata
* will not be created with a newer version of elasticsearch as well as that all indices are newer or equal to the minimum index
Expand Down Expand Up @@ -461,13 +495,44 @@ public static void ensureVersionBarrier(Version joiningNodeVersion, Version minC
}
}

private void enforceNodeFeatureBarrier(String nodeId, Set<String> existingNodesFeatures, Set<String> newNodeFeatures) {
/**
* Enforces the feature join barrier - a joining node should have all features already present in all existing nodes in the cluster
*
* @return The set of features that this node has (including assumed features)
*/
private Set<String> enforceNodeFeatureBarrier(DiscoveryNode node, Set<String> effectiveClusterFeatures, Set<String> newNodeFeatures) {
// prevent join if it does not have one or more features that all other nodes have
Set<String> missingFeatures = new HashSet<>(existingNodesFeatures);
Set<String> missingFeatures = new HashSet<>(effectiveClusterFeatures);
missingFeatures.removeAll(newNodeFeatures);

if (missingFeatures.isEmpty() == false) {
throw new IllegalStateException("Node " + nodeId + " is missing required features " + missingFeatures);
if (missingFeatures.isEmpty()) {
// nothing missing - all ok
return newNodeFeatures;
}

if (featureService.featuresCanBeAssumedForNode(node)) {
// it might still be ok for this node to join if this node can have assumed features,
// and all the missing features are assumed
// we can get the NodeFeature object direct from this node's registered features
// as all existing nodes in the cluster have the features present in existingNodesFeatures, including this one
newNodeFeatures = new HashSet<>(newNodeFeatures);
for (Iterator<String> it = missingFeatures.iterator(); it.hasNext();) {
String feature = it.next();
NodeFeature nf = featureService.getNodeFeatures().get(feature);
if (nf.assumedAfterNextCompatibilityBoundary()) {
// its ok for this feature to be missing from this node
it.remove();
// and it should be assumed to still be in the cluster
newNodeFeatures.add(feature);
}
// even if we don't remove it, still continue, so the exception message below is accurate
}
}

if (missingFeatures.isEmpty()) {
return newNodeFeatures;
} else {
throw new IllegalStateException("Node " + node.getId() + " is missing required features " + missingFeatures);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.util.StringLiteralDeduplicator;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.env.BuildVersion;
import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.index.IndexVersions;
import org.elasticsearch.node.Node;
Expand Down Expand Up @@ -503,6 +504,10 @@ public Version getVersion() {
return this.versionInfo.nodeVersion();
}

public BuildVersion getBuildVersion() {
return BuildVersion.fromVersionId(getVersion().id);
}

public OptionalInt getPre811VersionId() {
// Even if Version is removed from this class completely it will need to read the version ID
// off the wire for old node versions, so the value of this variable can be obtained from that
Expand Down
6 changes: 6 additions & 0 deletions server/src/main/java/org/elasticsearch/env/BuildVersion.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@
*/
public abstract class BuildVersion {

/**
* Checks if this version can operate properly in a cluster without features
* that are assumed in the currently running Elasticsearch.
*/
public abstract boolean canRemoveAssumedFeatures();

/**
* Check whether this version is on or after a minimum threshold.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,17 @@ final class DefaultBuildVersion extends BuildVersion {
this.version = Version.fromId(versionId);
}

@Override
public boolean canRemoveAssumedFeatures() {
/*
* We can remove assumed features if the node version is the next major version.
* This is because the next major version can only form a cluster with the
* latest minor version of the previous major, so any features introduced before that point
* (that are marked as assumed in the running code version) are automatically met by that version.
*/
return version.major == Version.CURRENT.major + 1;
}

@Override
public boolean onOrAfterMinimumCompatible() {
return Version.CURRENT.minimumCompatibilityVersion().onOrBefore(version);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@
package org.elasticsearch.features;

import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterFeatures;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.logging.LogManager;
import org.elasticsearch.logging.Logger;
Expand Down Expand Up @@ -44,7 +47,6 @@ public class FeatureService {
* as the local node's supported feature set
*/
public FeatureService(List<? extends FeatureSpecification> specs) {

var featureData = FeatureData.createFromSpecifications(specs);
nodeFeatures = featureData.getNodeFeatures();
historicalFeatures = featureData.getHistoricalFeatures();
Expand All @@ -60,12 +62,26 @@ public Map<String, NodeFeature> getNodeFeatures() {
return nodeFeatures;
}

/**
* Returns {@code true} if {@code node} can have assumed features.
*/
public boolean featuresCanBeAssumedForNode(DiscoveryNode node) {
return ClusterFeatures.featuresCanBeAssumedForNode(node);
}

/**
* Returns {@code true} if one or more nodes in {@code nodes} can have assumed features.
*/
public boolean featuresCanBeAssumedForNodes(DiscoveryNodes nodes) {
return ClusterFeatures.featuresCanBeAssumedForNodes(nodes);
}

/**
* Returns {@code true} if all nodes in {@code state} support feature {@code feature}.
*/
@SuppressForbidden(reason = "We need basic feature information from cluster state")
public boolean clusterHasFeature(ClusterState state, NodeFeature feature) {
if (state.clusterFeatures().clusterHasFeature(feature)) {
if (state.clusterFeatures().clusterHasFeature(state.getNodes(), feature)) {
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,17 @@
* A feature published by a node.
*
* @param id The feature id. Must be unique in the node.
* @param assumedAfterNextCompatibilityBoundary
* {@code true} if this feature is removed at the next compatibility boundary (ie next major version),
* and so should be assumed to be true for all nodes after that boundary.
*/
public record NodeFeature(String id) {
public record NodeFeature(String id, boolean assumedAfterNextCompatibilityBoundary) {

public NodeFeature {
Objects.requireNonNull(id);
}

public NodeFeature(String id) {
this(id, false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -294,8 +294,8 @@ protected boolean areFileSettingsApplied(ClusterState clusterState) {
}

@SuppressForbidden(reason = "need to check file settings support on exact cluster state")
private static boolean supportsFileSettings(ClusterState clusterState) {
return clusterState.clusterFeatures().clusterHasFeature(FileSettingsFeatures.FILE_SETTINGS_SUPPORTED);
private boolean supportsFileSettings(ClusterState clusterState) {
return clusterState.clusterFeatures().clusterHasFeature(clusterState.nodes(), FileSettingsFeatures.FILE_SETTINGS_SUPPORTED);
}

private void setReady(boolean ready) {
Expand Down
Loading

0 comments on commit 56f0d44

Please sign in to comment.