Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for restoring from snapshot with search replicas #16111

Merged
merged 12 commits into from
Oct 23, 2024
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add support for renaming aliases during snapshot restore ([#16292](https://github.com/opensearch-project/OpenSearch/pull/16292))
- Add method to return dynamic SecureTransportParameters from SecureTransportSettingsProvider interface ([#16387](https://github.com/opensearch-project/OpenSearch/pull/16387))
- URI path filtering support in cluster stats API ([#15938](https://github.com/opensearch-project/OpenSearch/pull/15938))
- [Star Tree - Search] Add support for metric aggregations with/without term query ([15289](https://github.com/opensearch-project/OpenSearch/pull/15289))
- Add support for restoring from snapshot with search replicas ([#16111](https://github.com/opensearch-project/OpenSearch/pull/16111))

### Dependencies
- Bump `com.azure:azure-identity` from 1.13.0 to 1.13.2 ([#15578](https://github.com/opensearch-project/OpenSearch/pull/15578))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.indices.replication;

import org.opensearch.action.search.SearchResponse;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.snapshots.AbstractSnapshotIntegTestCase;
import org.opensearch.snapshots.SnapshotRestoreException;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.util.List;

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class SearchReplicaRestoreIT extends AbstractSnapshotIntegTestCase {

private static final String INDEX_NAME = "test-idx-1";
private static final String RESTORED_INDEX_NAME = INDEX_NAME + "-restored";
private static final String REPOSITORY_NAME = "test-repo";
private static final String SNAPSHOT_NAME = "test-snapshot";
private static final String FS_REPOSITORY_TYPE = "fs";
private static final int DOC_COUNT = 10;

@Override
protected Settings featureFlagSettings() {
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL, true).build();
}

public void testSearchReplicaRestore_WhenSnapshotOnDocRep_RestoreOnDocRepWithSearchReplica() throws Exception {
bootstrapIndexWithOutSearchReplicas(ReplicationType.DOCUMENT);
createRepoAndSnapshot(REPOSITORY_NAME, FS_REPOSITORY_TYPE, SNAPSHOT_NAME, INDEX_NAME);

SnapshotRestoreException exception = expectThrows(
SnapshotRestoreException.class,
() -> restoreSnapshot(
REPOSITORY_NAME,
SNAPSHOT_NAME,
INDEX_NAME,
RESTORED_INDEX_NAME,
Settings.builder()
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.DOCUMENT)
.put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 1)
.build()
)
);
assertTrue(exception.getMessage().contains(getSnapshotExceptionMessage(ReplicationType.DOCUMENT, ReplicationType.DOCUMENT)));
}

public void testSearchReplicaRestore_WhenSnapshotOnDocRep_RestoreOnSegRepWithSearchReplica() throws Exception {
bootstrapIndexWithOutSearchReplicas(ReplicationType.DOCUMENT);
createRepoAndSnapshot(REPOSITORY_NAME, FS_REPOSITORY_TYPE, SNAPSHOT_NAME, INDEX_NAME);

restoreSnapshot(
REPOSITORY_NAME,
SNAPSHOT_NAME,
INDEX_NAME,
RESTORED_INDEX_NAME,
Settings.builder()
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 1)
.build()
);
ensureYellowAndNoInitializingShards(RESTORED_INDEX_NAME);
internalCluster().startDataOnlyNode();
ensureGreen(RESTORED_INDEX_NAME);
assertEquals(1, getNumberOfSearchReplicas(RESTORED_INDEX_NAME));

SearchResponse resp = client().prepareSearch(RESTORED_INDEX_NAME).setQuery(QueryBuilders.matchAllQuery()).get();
assertHitCount(resp, DOC_COUNT);
}

public void testSearchReplicaRestore_WhenSnapshotOnSegRep_RestoreOnDocRepWithSearchReplica() throws Exception {
bootstrapIndexWithOutSearchReplicas(ReplicationType.SEGMENT);
createRepoAndSnapshot(REPOSITORY_NAME, FS_REPOSITORY_TYPE, SNAPSHOT_NAME, INDEX_NAME);

SnapshotRestoreException exception = expectThrows(
SnapshotRestoreException.class,
() -> restoreSnapshot(
REPOSITORY_NAME,
SNAPSHOT_NAME,
INDEX_NAME,
RESTORED_INDEX_NAME,
Settings.builder()
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.DOCUMENT)
.put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 1)
.build()
)
);
assertTrue(exception.getMessage().contains(getSnapshotExceptionMessage(ReplicationType.SEGMENT, ReplicationType.DOCUMENT)));
}

public void testSearchReplicaRestore_WhenSnapshotOnSegRep_RestoreOnSegRepWithSearchReplica() throws Exception {
bootstrapIndexWithOutSearchReplicas(ReplicationType.SEGMENT);
createRepoAndSnapshot(REPOSITORY_NAME, FS_REPOSITORY_TYPE, SNAPSHOT_NAME, INDEX_NAME);

restoreSnapshot(
REPOSITORY_NAME,
SNAPSHOT_NAME,
INDEX_NAME,
RESTORED_INDEX_NAME,
Settings.builder().put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 1).build()
);
ensureYellowAndNoInitializingShards(RESTORED_INDEX_NAME);
internalCluster().startDataOnlyNode();
ensureGreen(RESTORED_INDEX_NAME);
assertEquals(1, getNumberOfSearchReplicas(RESTORED_INDEX_NAME));

SearchResponse resp = client().prepareSearch(RESTORED_INDEX_NAME).setQuery(QueryBuilders.matchAllQuery()).get();
assertHitCount(resp, DOC_COUNT);
}

public void testSearchReplicaRestore_WhenSnapshotOnSegRepWithSearchReplica_RestoreOnDocRep() throws Exception {
bootstrapIndexWithSearchReplicas();
createRepoAndSnapshot(REPOSITORY_NAME, FS_REPOSITORY_TYPE, SNAPSHOT_NAME, INDEX_NAME);

SnapshotRestoreException exception = expectThrows(
SnapshotRestoreException.class,
() -> restoreSnapshot(
REPOSITORY_NAME,
SNAPSHOT_NAME,
INDEX_NAME,
RESTORED_INDEX_NAME,
Settings.builder().put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.DOCUMENT).build()
)
);
assertTrue(exception.getMessage().contains(getSnapshotExceptionMessage(ReplicationType.SEGMENT, ReplicationType.DOCUMENT)));
}

public void testSearchReplicaRestore_WhenSnapshotOnSegRepWithSearchReplica_RestoreOnDocRepWithNoSearchReplica() throws Exception {
bootstrapIndexWithSearchReplicas();
createRepoAndSnapshot(REPOSITORY_NAME, FS_REPOSITORY_TYPE, SNAPSHOT_NAME, INDEX_NAME);

restoreSnapshot(
REPOSITORY_NAME,
SNAPSHOT_NAME,
INDEX_NAME,
RESTORED_INDEX_NAME,
Settings.builder()
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.DOCUMENT)
.put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 0)
.build()
);
ensureGreen(RESTORED_INDEX_NAME);
assertEquals(0, getNumberOfSearchReplicas(RESTORED_INDEX_NAME));

SearchResponse resp = client().prepareSearch(RESTORED_INDEX_NAME).setQuery(QueryBuilders.matchAllQuery()).get();
assertHitCount(resp, DOC_COUNT);
}

private void bootstrapIndexWithOutSearchReplicas(ReplicationType replicationType) throws InterruptedException {
startCluster(2);

Settings settings = Settings.builder()
.put(super.indexSettings())
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
.put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 0)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, replicationType)
.build();

createIndex(INDEX_NAME, settings);
indexRandomDocs(INDEX_NAME, DOC_COUNT);
refresh(INDEX_NAME);
ensureGreen(INDEX_NAME);
}

private void bootstrapIndexWithSearchReplicas() throws InterruptedException {
startCluster(3);

Settings settings = Settings.builder()
.put(super.indexSettings())
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
.put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 1)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.build();

createIndex(INDEX_NAME, settings);
ensureGreen(INDEX_NAME);
for (int i = 0; i < DOC_COUNT; i++) {
client().prepareIndex(INDEX_NAME).setId(String.valueOf(i)).setSource("foo", "bar").get();
}
flushAndRefresh(INDEX_NAME);
}

private void startCluster(int numOfNodes) {
internalCluster().startClusterManagerOnlyNode();
internalCluster().startDataOnlyNodes(numOfNodes);
}

private void createRepoAndSnapshot(String repositoryName, String repositoryType, String snapshotName, String indexName) {
createRepository(repositoryName, repositoryType, randomRepoPath().toAbsolutePath());
createSnapshot(repositoryName, snapshotName, List.of(indexName));
assertAcked(client().admin().indices().prepareDelete(INDEX_NAME));
assertFalse("index [" + INDEX_NAME + "] should have been deleted", indexExists(INDEX_NAME));
}

private String getSnapshotExceptionMessage(ReplicationType snapshotReplicationType, ReplicationType restoreReplicationType) {
return "snapshot was created with [index.replication.type] as ["
+ snapshotReplicationType
+ "]. "
+ "To restore with [index.replication.type] as ["
+ restoreReplicationType
+ "], "
+ "[index.number_of_search_only_replicas] must be set to [0]";
}

private int getNumberOfSearchReplicas(String index) {
Metadata metadata = client().admin().cluster().prepareState().get().getState().metadata();
return Integer.valueOf(metadata.index(index).getSettings().get(SETTING_NUMBER_OF_SEARCH_REPLICAS));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,17 @@ private Builder initializeAsRestore(
);
}
}
for (int i = 0; i < indexMetadata.getNumberOfSearchOnlyReplicas(); i++) {
indexShardRoutingBuilder.addShard(
ShardRouting.newUnassigned(
shardId,
false,
true,
PeerRecoverySource.INSTANCE, // TODO: Update to remote store if enabled
unassignedInfo
)
);
}
shards.put(shardNumber, indexShardRoutingBuilder.build());
}
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
import org.opensearch.index.store.remote.filecache.FileCacheStats;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.ShardLimitValidator;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.node.remotestore.RemoteStoreNodeAttribute;
import org.opensearch.node.remotestore.RemoteStoreNodeService;
import org.opensearch.repositories.IndexId;
Expand Down Expand Up @@ -120,10 +121,12 @@
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_HISTORY_UUID;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_INDEX_UUID;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_SEGMENT_STORE_REPOSITORY;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_STORE_ENABLED;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_VERSION_CREATED;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_VERSION_UPGRADED;
import static org.opensearch.common.util.FeatureFlags.SEARCHABLE_SNAPSHOT_EXTENDED_COMPATIBILITY;
Expand Down Expand Up @@ -401,6 +404,13 @@ public ClusterState execute(ClusterState currentState) {
overrideSettingsInternal,
ignoreSettingsInternal
);

validateReplicationTypeRestoreSettings(
snapshot,
metadata.index(index).getSettings().get(SETTING_REPLICATION_TYPE),
snapshotIndexMetadata
);

if (isRemoteSnapshot) {
snapshotIndexMetadata = addSnapshotToIndexSettings(snapshotIndexMetadata, snapshot, snapshotIndexId);
}
Expand Down Expand Up @@ -1303,6 +1313,32 @@ private static void validateSnapshotRestorable(final String repository, final Sn
}
}

// Visible for testing
static void validateReplicationTypeRestoreSettings(Snapshot snapshot, String snapshotReplicationType, IndexMetadata updatedMetadata) {
int restoreNumberOfSearchReplicas = updatedMetadata.getSettings().getAsInt(SETTING_NUMBER_OF_SEARCH_REPLICAS, 0);

if (restoreNumberOfSearchReplicas > 0
&& ReplicationType.DOCUMENT.toString().equals(updatedMetadata.getSettings().get(SETTING_REPLICATION_TYPE))) {
throw new SnapshotRestoreException(
snapshot,
"snapshot was created with ["
+ SETTING_REPLICATION_TYPE
+ "]"
+ " as ["
+ snapshotReplicationType
+ "]."
+ " To restore with ["
+ SETTING_REPLICATION_TYPE
+ "]"
+ " as ["
+ ReplicationType.DOCUMENT
+ "], ["
+ SETTING_NUMBER_OF_SEARCH_REPLICAS
+ "] must be set to [0]"
);
}
}

public static boolean failed(SnapshotInfo snapshot, String index) {
for (SnapshotShardFailure failure : snapshot.shardFailures()) {
if (index.equals(failure.index())) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.routing;

import org.opensearch.Version;
import org.opensearch.cluster.ClusterName;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.common.UUIDs;
import org.opensearch.common.settings.Settings;
import org.opensearch.repositories.IndexId;
import org.opensearch.snapshots.Snapshot;
import org.opensearch.snapshots.SnapshotId;
import org.opensearch.test.OpenSearchTestCase;

import java.util.HashSet;

public class SearchOnlyReplicaRestoreTests extends OpenSearchTestCase {

public void testSearchOnlyReplicasRestored() {
Metadata metadata = Metadata.builder()
.put(
IndexMetadata.builder("test")
.settings(settings(Version.CURRENT))
.numberOfShards(1)
.numberOfReplicas(1)
.numberOfSearchReplicas(1)
)
.build();

IndexMetadata indexMetadata = metadata.index("test");
RecoverySource.SnapshotRecoverySource snapshotRecoverySource = new RecoverySource.SnapshotRecoverySource(
UUIDs.randomBase64UUID(),
new Snapshot("rep1", new SnapshotId("snp1", UUIDs.randomBase64UUID())),
Version.CURRENT,
new IndexId("test", UUIDs.randomBase64UUID(random()))
);

RoutingTable routingTable = RoutingTable.builder().addAsNewRestore(indexMetadata, snapshotRecoverySource, new HashSet<>()).build();

ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
.metadata(metadata)
.routingTable(routingTable)
.build();

IndexShardRoutingTable indexShardRoutingTable = clusterState.routingTable().index("test").shard(0);

assertEquals(1, clusterState.routingTable().index("test").shards().size());
assertEquals(3, indexShardRoutingTable.getShards().size());
assertEquals(1, indexShardRoutingTable.searchOnlyReplicas().size());
}
}
Loading
Loading