Skip to content

Commit

Permalink
Merge branch '2.17' into backport_lates_2.17
Browse files Browse the repository at this point in the history
Signed-off-by: shwetathareja <[email protected]>
  • Loading branch information
shwetathareja authored Sep 4, 2024
2 parents 3894564 + bd5041f commit 2ba1bac
Show file tree
Hide file tree
Showing 40 changed files with 1,580 additions and 113 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Remote Publication] Added checksum validation for cluster state behind a cluster setting ([#15218](https://github.com/opensearch-project/OpenSearch/pull/15218))
- Add support for comma-separated list of index names to be used with Snapshot Status API ([#15409](https://github.com/opensearch-project/OpenSearch/pull/15409))[SnapshotV2] Snapshot Status API changes (#15409))
- ClusterManagerTaskThrottler Improvements ([#15508](https://github.com/opensearch-project/OpenSearch/pull/15508))
- Relax the join validation for Remote State publication ([#15471](https://github.com/opensearch-project/OpenSearch/pull/15471))

### Dependencies
- Bump `netty` from 4.1.111.Final to 4.1.112.Final ([#15081](https://github.com/opensearch-project/OpenSearch/pull/15081))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,10 @@ protected Settings nodeSettings(int nodeOrdinal) {
)
.put("node.attr." + REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY, REMOTE_ROUTING_TABLE_REPO)
.put(REMOTE_PUBLICATION_EXPERIMENTAL, true)
.put(RemoteClusterStateService.REMOTE_CLUSTER_STATE_CHECKSUM_VALIDATION_ENABLED_SETTING.getKey(), true)
.put(
RemoteClusterStateService.REMOTE_CLUSTER_STATE_CHECKSUM_VALIDATION_MODE_SETTING.getKey(),
RemoteClusterStateService.RemoteClusterStateValidationMode.FAILURE
)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,10 @@ protected Settings nodeSettings(int nodeOrdinal) {
.put("node.attr." + REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY, routingTableRepoName)
.put(routingTableRepoTypeAttributeKey, ReloadableFsRepository.TYPE)
.put(routingTableRepoSettingsAttributeKeyPrefix + "location", segmentRepoPath)
.put(RemoteClusterStateService.REMOTE_CLUSTER_STATE_CHECKSUM_VALIDATION_ENABLED_SETTING.getKey(), true)
.put(
RemoteClusterStateService.REMOTE_CLUSTER_STATE_CHECKSUM_VALIDATION_MODE_SETTING.getKey(),
RemoteClusterStateService.RemoteClusterStateValidationMode.FAILURE
)
.build();
}

Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,12 @@
import org.opensearch.index.engine.Segment;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;

/**
* Transport response for retrieving indices segment information
Expand Down Expand Up @@ -86,21 +85,24 @@ public Map<String, IndexSegments> getIndices() {
return indicesSegments;
}
Map<String, IndexSegments> indicesSegments = new HashMap<>();

Set<String> indices = new HashSet<>();
for (ShardSegments shard : shards) {
indices.add(shard.getShardRouting().getIndexName());
if (shards.length == 0) {
this.indicesSegments = indicesSegments;
return indicesSegments;
}

for (String indexName : indices) {
List<ShardSegments> shards = new ArrayList<>();
for (ShardSegments shard : this.shards) {
if (shard.getShardRouting().getIndexName().equals(indexName)) {
shards.add(shard);
}
Arrays.sort(shards, Comparator.comparing(shardSegment -> shardSegment.getShardRouting().getIndexName()));
int startIndexPos = 0;
String startIndexName = shards[startIndexPos].getShardRouting().getIndexName();
for (int i = 0; i < shards.length; i++) {
if (!shards[i].getShardRouting().getIndexName().equals(startIndexName)) {
indicesSegments.put(startIndexName, new IndexSegments(startIndexName, Arrays.copyOfRange(shards, startIndexPos, i)));
startIndexPos = i;
startIndexName = shards[startIndexPos].getShardRouting().getIndexName();
}
indicesSegments.put(indexName, new IndexSegments(indexName, shards.toArray(new ShardSegments[shards.size()])));
}
// Add the last shardSegment from shards list which would have got missed in the loop above
indicesSegments.put(startIndexName, new IndexSegments(startIndexName, Arrays.copyOfRange(shards, startIndexPos, shards.length)));

this.indicesSegments = indicesSegments;
return indicesSegments;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@

import static org.opensearch.cluster.decommission.DecommissionHelper.nodeCommissioned;
import static org.opensearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_CLUSTER_PUBLICATION_REPO_NAME_ATTRIBUTES;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.CompatibilityMode;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.CompatibilityMode.MIXED;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.CompatibilityMode.STRICT;
Expand Down Expand Up @@ -519,7 +520,7 @@ public static void ensureNodesCompatibility(
);
}

ensureRemoteStoreNodesCompatibility(joiningNode, currentNodes, metadata);
ensureRemoteRepositoryCompatibility(joiningNode, currentNodes, metadata);
}

/**
Expand Down Expand Up @@ -552,6 +553,30 @@ public static void ensureNodeCommissioned(DiscoveryNode node, Metadata metadata)
}
}

public static void ensureRemoteRepositoryCompatibility(DiscoveryNode joiningNode, DiscoveryNodes currentNodes, Metadata metadata) {
List<DiscoveryNode> existingNodes = new ArrayList<>(currentNodes.getNodes().values());

boolean isClusterRemoteStoreEnabled = existingNodes.stream().anyMatch(DiscoveryNode::isRemoteStoreNode);
if (isClusterRemoteStoreEnabled || joiningNode.isRemoteStoreNode()) {
ensureRemoteStoreNodesCompatibility(joiningNode, currentNodes, metadata);
} else {
ensureRemoteClusterStateNodesCompatibility(joiningNode, currentNodes);
}
}

private static void ensureRemoteClusterStateNodesCompatibility(DiscoveryNode joiningNode, DiscoveryNodes currentNodes) {
List<DiscoveryNode> existingNodes = new ArrayList<>(currentNodes.getNodes().values());

assert existingNodes.isEmpty() == false;
Optional<DiscoveryNode> remotePublicationNode = existingNodes.stream()
.filter(DiscoveryNode::isRemoteStatePublicationEnabled)
.findFirst();

if (remotePublicationNode.isPresent() && joiningNode.isRemoteStatePublicationEnabled()) {
ensureRepositoryCompatibility(joiningNode, remotePublicationNode.get(), REMOTE_CLUSTER_PUBLICATION_REPO_NAME_ATTRIBUTES);
}
}

/**
* The method ensures homogeneity -
* 1. The joining node has to be a remote store backed if it's joining a remote store backed cluster. Validates
Expand All @@ -567,6 +592,7 @@ public static void ensureNodeCommissioned(DiscoveryNode node, Metadata metadata)
* needs to be modified.
*/
private static void ensureRemoteStoreNodesCompatibility(DiscoveryNode joiningNode, DiscoveryNodes currentNodes, Metadata metadata) {

List<DiscoveryNode> existingNodes = new ArrayList<>(currentNodes.getNodes().values());

assert existingNodes.isEmpty() == false;
Expand Down Expand Up @@ -648,6 +674,23 @@ private static void ensureRemoteStoreNodesCompatibility(
}
}

private static void ensureRepositoryCompatibility(DiscoveryNode joiningNode, DiscoveryNode existingNode, List<String> reposToValidate) {

RemoteStoreNodeAttribute joiningRemoteStoreNodeAttribute = new RemoteStoreNodeAttribute(joiningNode);
RemoteStoreNodeAttribute existingRemoteStoreNodeAttribute = new RemoteStoreNodeAttribute(existingNode);

if (existingRemoteStoreNodeAttribute.equalsForRepositories(joiningRemoteStoreNodeAttribute, reposToValidate) == false) {
throw new IllegalStateException(
"a remote store node ["
+ joiningNode
+ "] is trying to join a remote store cluster with incompatible node attributes in "
+ "comparison with existing node ["
+ existingNode
+ "]"
);
}
}

public static Collection<BiConsumer<DiscoveryNode, ClusterState>> addBuiltInJoinValidators(
Collection<BiConsumer<DiscoveryNode, ClusterState>> onJoinValidators
) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,17 @@
import org.opensearch.common.xcontent.XContentHelper;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.Strings;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.core.index.Index;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.env.Environment;
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.index.IndexService;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.compositeindex.CompositeIndexSettings;
import org.opensearch.index.compositeindex.CompositeIndexValidator;
import org.opensearch.index.compositeindex.datacube.startree.StarTreeIndexSettings;
import org.opensearch.index.mapper.DocumentMapper;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.mapper.MapperService.MergeReason;
Expand Down Expand Up @@ -154,6 +157,7 @@
import static org.opensearch.cluster.metadata.Metadata.DEFAULT_REPLICA_COUNT_SETTING;
import static org.opensearch.cluster.metadata.MetadataIndexTemplateService.findContextTemplateName;
import static org.opensearch.index.IndexModule.INDEX_STORE_TYPE_SETTING;
import static org.opensearch.index.IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING;
import static org.opensearch.indices.IndicesService.CLUSTER_REPLICATION_TYPE_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteDataAttributePresent;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING;
Expand Down Expand Up @@ -1079,6 +1083,7 @@ static Settings aggregateIndexSettings(
validateTranslogRetentionSettings(indexSettings);
validateStoreTypeSettings(indexSettings);
validateRefreshIntervalSettings(request.settings(), clusterSettings);
validateTranslogFlushIntervalSettingsForCompositeIndex(request.settings(), clusterSettings);
validateTranslogDurabilitySettings(request.settings(), clusterSettings, settings);
return indexSettings;
}
Expand Down Expand Up @@ -1766,6 +1771,71 @@ public static void validateTranslogRetentionSettings(Settings indexSettings) {
}
}

/**
* Validates {@code index.translog.flush_threshold_size} is equal or below the {@code indices.composite_index.translog.max_flush_threshold_size}
* for composite indices based on {{@code index.composite_index}}
*
* @param requestSettings settings passed in during index create/update request
* @param clusterSettings cluster setting
*/
public static void validateTranslogFlushIntervalSettingsForCompositeIndex(Settings requestSettings, ClusterSettings clusterSettings) {
if (StarTreeIndexSettings.IS_COMPOSITE_INDEX_SETTING.exists(requestSettings) == false
|| requestSettings.get(StarTreeIndexSettings.IS_COMPOSITE_INDEX_SETTING.getKey()) == null) {
return;
}
ByteSizeValue translogFlushSize = INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.get(requestSettings);
ByteSizeValue compositeIndexMaxFlushSize = clusterSettings.get(
CompositeIndexSettings.COMPOSITE_INDEX_MAX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING
);
if (translogFlushSize.compareTo(compositeIndexMaxFlushSize) > 0) {
throw new IllegalArgumentException(
String.format(
Locale.ROOT,
"You can configure '%s' with upto '%s' for composite index",
INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(),
compositeIndexMaxFlushSize
)
);
}
}

/**
* Validates {@code index.translog.flush_threshold_size} is equal or below the {@code indices.composite_index.translog.max_flush_threshold_size}
* for composite indices based on {{@code index.composite_index}}
* This is used during update index settings flow
*
* @param requestSettings settings passed in during index update request
* @param clusterSettings cluster setting
* @param indexSettings index settings
*/
public static Optional<String> validateTranslogFlushIntervalSettingsForCompositeIndex(
Settings requestSettings,
ClusterSettings clusterSettings,
Settings indexSettings
) {
if (INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.exists(requestSettings) == false
|| requestSettings.get(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey()) == null
|| StarTreeIndexSettings.IS_COMPOSITE_INDEX_SETTING.exists(indexSettings) == false
|| indexSettings.get(StarTreeIndexSettings.IS_COMPOSITE_INDEX_SETTING.getKey()) == null) {
return Optional.empty();
}
ByteSizeValue translogFlushSize = INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.get(requestSettings);
ByteSizeValue compositeIndexMaxFlushSize = clusterSettings.get(
CompositeIndexSettings.COMPOSITE_INDEX_MAX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING
);
if (translogFlushSize.compareTo(compositeIndexMaxFlushSize) > 0) {
return Optional.of(
String.format(
Locale.ROOT,
"You can configure '%s' with upto '%s' for composite index",
INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(),
compositeIndexMaxFlushSize
)
);
}
return Optional.empty();
}

/**
* Validates {@code index.refresh_interval} is equal or below the {@code cluster.minimum.index.refresh_interval}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@

import static org.opensearch.cluster.metadata.MetadataCreateDataStreamService.validateTimestampFieldMapping;
import static org.opensearch.cluster.metadata.MetadataCreateIndexService.validateRefreshIntervalSettings;
import static org.opensearch.cluster.metadata.MetadataCreateIndexService.validateTranslogFlushIntervalSettingsForCompositeIndex;
import static org.opensearch.common.util.concurrent.ThreadContext.ACTION_ORIGIN_TRANSIENT_NAME;
import static org.opensearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason.NO_LONGER_ASSIGNED;

Expand Down Expand Up @@ -1639,6 +1640,7 @@ private void validate(String name, @Nullable Settings settings, List<String> ind

// validate index refresh interval and translog durability settings
validateRefreshIntervalSettings(settings, clusterService.getClusterSettings());
validateTranslogFlushIntervalSettingsForCompositeIndex(settings, clusterService.getClusterSettings());
validateTranslogDurabilitySettingsInTemplate(settings, clusterService.getClusterSettings());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
import static org.opensearch.cluster.metadata.MetadataCreateIndexService.validateOverlap;
import static org.opensearch.cluster.metadata.MetadataCreateIndexService.validateRefreshIntervalSettings;
import static org.opensearch.cluster.metadata.MetadataCreateIndexService.validateTranslogDurabilitySettings;
import static org.opensearch.cluster.metadata.MetadataCreateIndexService.validateTranslogFlushIntervalSettingsForCompositeIndex;
import static org.opensearch.cluster.metadata.MetadataIndexTemplateService.findComponentTemplate;
import static org.opensearch.common.settings.AbstractScopedSettings.ARCHIVED_SETTINGS_PREFIX;
import static org.opensearch.index.IndexSettings.same;
Expand Down Expand Up @@ -221,6 +222,12 @@ public ClusterState execute(ClusterState currentState) {
index.getName()
).ifPresent(validationErrors::add);
}
validateTranslogFlushIntervalSettingsForCompositeIndex(
normalizedSettings,
clusterService.getClusterSettings(),
metadata.getSettings()
).ifPresent(validationErrors::add);

}

if (validationErrors.size() > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,24 @@ public boolean equalsIgnoreGenerationsWithRepoSkip(@Nullable RepositoriesMetadat
.filter(repo -> !reposToSkip.contains(repo.name()))
.collect(Collectors.toList());

return equalsRepository(currentRepositories, otherRepositories);
}

public boolean equalsIgnoreGenerationsForRepo(@Nullable RepositoriesMetadata other, List<String> reposToValidate) {
if (other == null) {
return false;
}
List<RepositoryMetadata> currentRepositories = repositories.stream()
.filter(repo -> reposToValidate.contains(repo.name()))
.collect(Collectors.toList());
List<RepositoryMetadata> otherRepositories = other.repositories.stream()
.filter(repo -> reposToValidate.contains(repo.name()))
.collect(Collectors.toList());

return equalsRepository(currentRepositories, otherRepositories);
}

public static boolean equalsRepository(List<RepositoryMetadata> currentRepositories, List<RepositoryMetadata> otherRepositories) {
if (otherRepositories.size() != currentRepositories.size()) {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_NODE_ATTRIBUTE_KEY_PREFIX;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY;

/**
* A discovery node represents a node that is part of the cluster.
Expand Down Expand Up @@ -554,7 +555,8 @@ public boolean isSearchNode() {
* @return true if the node contains remote store node attributes, false otherwise
*/
public boolean isRemoteStoreNode() {
return this.getAttributes().keySet().stream().anyMatch(key -> key.startsWith(REMOTE_STORE_NODE_ATTRIBUTE_KEY_PREFIX));
return this.getAttributes().keySet().stream().anyMatch(key -> key.equals(REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY))
&& this.getAttributes().keySet().stream().anyMatch(key -> key.equals(REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY));
}

/**
Expand Down
Loading

0 comments on commit 2ba1bac

Please sign in to comment.