Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limit RW separation to remote store enabled clusters and update recovery flow #16760

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

### Changed
- Indexed IP field supports `terms_query` with more than 1025 IP masks [#16391](https://github.com/opensearch-project/OpenSearch/pull/16391)
- Limit reader writer separation to remote store enabled clusters [#16760](https://github.com/opensearch-project/OpenSearch/pull/16760)

### Deprecated
- Performing update operation with default pipeline or final pipeline is deprecated ([#16712](https://github.com/opensearch-project/OpenSearch/pull/16712))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.util.List;
Expand All @@ -23,7 +24,7 @@
import static org.opensearch.cluster.routing.allocation.decider.SearchReplicaAllocationDecider.SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class SearchReplicaFilteringAllocationIT extends OpenSearchIntegTestCase {
public class SearchReplicaFilteringAllocationIT extends RemoteStoreBaseIntegTestCase {

@Override
protected Settings featureFlagSettings() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,42 +13,32 @@
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.junit.After;
import org.junit.Before;

import java.nio.file.Path;

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class SearchReplicaReplicationIT extends SegmentReplicationBaseIT {

private static final String REPOSITORY_NAME = "test-remote-store-repo";
protected Path absolutePath;

private Boolean useRemoteStore;

@Before
public void randomizeRemoteStoreEnabled() {
useRemoteStore = randomBoolean();
}

@Override
protected Settings nodeSettings(int nodeOrdinal) {
if (useRemoteStore) {
if (absolutePath == null) {
absolutePath = randomRepoPath().toAbsolutePath();
}
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(remoteStoreClusterSettings(REPOSITORY_NAME, absolutePath))
.build();
if (absolutePath == null) {
absolutePath = randomRepoPath().toAbsolutePath();
}
return super.nodeSettings(nodeOrdinal);
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(remoteStoreClusterSettings(REPOSITORY_NAME, absolutePath))
.build();
}

@After
public void teardown() {
if (useRemoteStore) {
clusterAdmin().prepareCleanupRepository(REPOSITORY_NAME).get();
}
clusterAdmin().prepareCleanupRepository(REPOSITORY_NAME).get();

}

@Override
Expand Down Expand Up @@ -82,4 +72,39 @@ public void testReplication() throws Exception {
waitForSearchableDocs(docCount, primary, replica);
}

public void testRecoveryAfterDocsIndexed() throws Exception {
internalCluster().startClusterManagerOnlyNode();
final String primary = internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
final int docCount = 10;
for (int i = 0; i < docCount; i++) {
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get();
}
refresh(INDEX_NAME);

final String replica = internalCluster().startDataOnlyNode();
ensureGreen(INDEX_NAME);
assertDocCounts(10, replica);

client().admin()
.indices()
.prepareUpdateSettings(INDEX_NAME)
.setSettings(Settings.builder().put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 0))
.get();

ensureGreen(INDEX_NAME);

client().admin()
.indices()
.prepareUpdateSettings(INDEX_NAME)
.setSettings(Settings.builder().put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 1))
.get();
ensureGreen(INDEX_NAME);
assertDocCounts(10, replica);

internalCluster().restartNode(replica);
ensureGreen(INDEX_NAME);
assertDocCounts(10, replica);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.snapshots.AbstractSnapshotIntegTestCase;
import org.opensearch.remotestore.RemoteSnapshotIT;
import org.opensearch.snapshots.SnapshotRestoreException;
import org.opensearch.test.OpenSearchIntegTestCase;

Expand All @@ -26,7 +26,7 @@
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class SearchReplicaRestoreIT extends AbstractSnapshotIntegTestCase {
public class SearchReplicaRestoreIT extends RemoteSnapshotIT {

private static final String INDEX_NAME = "test-idx-1";
private static final String RESTORED_INDEX_NAME = INDEX_NAME + "-restored";
Expand All @@ -40,49 +40,6 @@ protected Settings featureFlagSettings() {
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL, true).build();
}

public void testSearchReplicaRestore_WhenSnapshotOnDocRep_RestoreOnDocRepWithSearchReplica() throws Exception {
bootstrapIndexWithOutSearchReplicas(ReplicationType.DOCUMENT);
createRepoAndSnapshot(REPOSITORY_NAME, FS_REPOSITORY_TYPE, SNAPSHOT_NAME, INDEX_NAME);

SnapshotRestoreException exception = expectThrows(
SnapshotRestoreException.class,
() -> restoreSnapshot(
REPOSITORY_NAME,
SNAPSHOT_NAME,
INDEX_NAME,
RESTORED_INDEX_NAME,
Settings.builder()
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.DOCUMENT)
.put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 1)
.build()
)
);
assertTrue(exception.getMessage().contains(getSnapshotExceptionMessage(ReplicationType.DOCUMENT, ReplicationType.DOCUMENT)));
}

public void testSearchReplicaRestore_WhenSnapshotOnDocRep_RestoreOnSegRepWithSearchReplica() throws Exception {
bootstrapIndexWithOutSearchReplicas(ReplicationType.DOCUMENT);
createRepoAndSnapshot(REPOSITORY_NAME, FS_REPOSITORY_TYPE, SNAPSHOT_NAME, INDEX_NAME);

restoreSnapshot(
REPOSITORY_NAME,
SNAPSHOT_NAME,
INDEX_NAME,
RESTORED_INDEX_NAME,
Settings.builder()
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 1)
.build()
);
ensureYellowAndNoInitializingShards(RESTORED_INDEX_NAME);
internalCluster().startDataOnlyNode();
ensureGreen(RESTORED_INDEX_NAME);
assertEquals(1, getNumberOfSearchReplicas(RESTORED_INDEX_NAME));

SearchResponse resp = client().prepareSearch(RESTORED_INDEX_NAME).setQuery(QueryBuilders.matchAllQuery()).get();
assertHitCount(resp, DOC_COUNT);
}

public void testSearchReplicaRestore_WhenSnapshotOnSegRep_RestoreOnDocRepWithSearchReplica() throws Exception {
bootstrapIndexWithOutSearchReplicas(ReplicationType.SEGMENT);
createRepoAndSnapshot(REPOSITORY_NAME, FS_REPOSITORY_TYPE, SNAPSHOT_NAME, INDEX_NAME);
Expand Down Expand Up @@ -140,27 +97,6 @@ public void testSearchReplicaRestore_WhenSnapshotOnSegRepWithSearchReplica_Resto
assertTrue(exception.getMessage().contains(getSnapshotExceptionMessage(ReplicationType.SEGMENT, ReplicationType.DOCUMENT)));
}

public void testSearchReplicaRestore_WhenSnapshotOnSegRepWithSearchReplica_RestoreOnDocRepWithNoSearchReplica() throws Exception {
bootstrapIndexWithSearchReplicas();
createRepoAndSnapshot(REPOSITORY_NAME, FS_REPOSITORY_TYPE, SNAPSHOT_NAME, INDEX_NAME);

restoreSnapshot(
REPOSITORY_NAME,
SNAPSHOT_NAME,
INDEX_NAME,
RESTORED_INDEX_NAME,
Settings.builder()
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.DOCUMENT)
.put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 0)
.build()
);
ensureGreen(RESTORED_INDEX_NAME);
assertEquals(0, getNumberOfSearchReplicas(RESTORED_INDEX_NAME));

SearchResponse resp = client().prepareSearch(RESTORED_INDEX_NAME).setQuery(QueryBuilders.matchAllQuery()).get();
assertHitCount(resp, DOC_COUNT);
}

private void bootstrapIndexWithOutSearchReplicas(ReplicationType replicationType) throws InterruptedException {
startCluster(2);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;

Expand All @@ -31,7 +32,7 @@
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class SearchOnlyReplicaIT extends OpenSearchIntegTestCase {
public class SearchOnlyReplicaIT extends RemoteStoreBaseIntegTestCase {

private static final String TEST_INDEX = "test_index";

Expand All @@ -55,35 +56,6 @@ public Settings indexSettings() {
.build();
}

public void testCreateDocRepFails() {
Settings settings = Settings.builder().put(indexSettings()).put(SETTING_REPLICATION_TYPE, ReplicationType.DOCUMENT).build();

IllegalArgumentException illegalArgumentException = expectThrows(
IllegalArgumentException.class,
() -> createIndex(TEST_INDEX, settings)
);
assertEquals(expectedFailureMessage, illegalArgumentException.getMessage());
}

public void testUpdateDocRepFails() {
Settings settings = Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(SETTING_REPLICATION_TYPE, ReplicationType.DOCUMENT)
.build();
// create succeeds
createIndex(TEST_INDEX, settings);

// update fails
IllegalArgumentException illegalArgumentException = expectThrows(IllegalArgumentException.class, () -> {
client().admin()
.indices()
.prepareUpdateSettings(TEST_INDEX)
.setSettings(Settings.builder().put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 1))
.get();
});
assertEquals(expectedFailureMessage, illegalArgumentException.getMessage());
}

public void testFailoverWithSearchReplica_WithWriterReplicas() throws IOException {
int numSearchReplicas = 1;
int numWriterReplicas = 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1096,14 +1096,9 @@ static Settings aggregateIndexSettings(
private static void updateSearchOnlyReplicas(Settings requestSettings, Settings.Builder builder) {
if (INDEX_NUMBER_OF_SEARCH_REPLICAS_SETTING.exists(builder) && builder.get(SETTING_NUMBER_OF_SEARCH_REPLICAS) != null) {
if (INDEX_NUMBER_OF_SEARCH_REPLICAS_SETTING.get(requestSettings) > 0
&& ReplicationType.parseString(builder.get(INDEX_REPLICATION_TYPE_SETTING.getKey())).equals(ReplicationType.DOCUMENT)) {
&& Boolean.parseBoolean(builder.get(SETTING_REMOTE_STORE_ENABLED)) == false) {
throw new IllegalArgumentException(
"To set "
+ SETTING_NUMBER_OF_SEARCH_REPLICAS
+ ", "
+ INDEX_REPLICATION_TYPE_SETTING.getKey()
+ " must be set to "
+ ReplicationType.SEGMENT
"To set " + SETTING_NUMBER_OF_SEARCH_REPLICAS + ", " + SETTING_REMOTE_STORE_ENABLED + " must be set to true"
);
}
builder.put(SETTING_NUMBER_OF_SEARCH_REPLICAS, INDEX_NUMBER_OF_SEARCH_REPLICAS_SETTING.get(requestSettings));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@
import org.opensearch.index.IndexSettings;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.ShardLimitValidator;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
Expand All @@ -77,8 +76,8 @@
import java.util.Set;

import static org.opensearch.action.support.ContextPreservingActionListener.wrapPreservingContext;
import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_REPLICATION_TYPE_SETTING;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_STORE_ENABLED;
import static org.opensearch.cluster.metadata.MetadataCreateIndexService.validateOverlap;
import static org.opensearch.cluster.metadata.MetadataCreateIndexService.validateRefreshIntervalSettings;
import static org.opensearch.cluster.metadata.MetadataCreateIndexService.validateTranslogDurabilitySettings;
Expand Down Expand Up @@ -538,14 +537,12 @@ public ClusterState execute(ClusterState currentState) {
private void validateSearchReplicaCountSettings(Settings requestSettings, Index[] indices, ClusterState currentState) {
final int updatedNumberOfSearchReplicas = IndexMetadata.INDEX_NUMBER_OF_SEARCH_REPLICAS_SETTING.get(requestSettings);
if (updatedNumberOfSearchReplicas > 0) {
if (Arrays.stream(indices).allMatch(index -> currentState.metadata().isSegmentReplicationEnabled(index.getName())) == false) {
if (Arrays.stream(indices)
.allMatch(
index -> currentState.metadata().index(index.getName()).getSettings().getAsBoolean(SETTING_REMOTE_STORE_ENABLED, false)
) == false) {
throw new IllegalArgumentException(
"To set "
+ SETTING_NUMBER_OF_SEARCH_REPLICAS
+ ", "
+ INDEX_REPLICATION_TYPE_SETTING.getKey()
+ " must be set to "
+ ReplicationType.SEGMENT
"To set " + SETTING_NUMBER_OF_SEARCH_REPLICAS + ", " + SETTING_REMOTE_STORE_ENABLED + " must be set to true"
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -575,13 +575,7 @@ private Builder initializeAsRestore(
}
for (int i = 0; i < indexMetadata.getNumberOfSearchOnlyReplicas(); i++) {
indexShardRoutingBuilder.addShard(
ShardRouting.newUnassigned(
shardId,
false,
true,
PeerRecoverySource.INSTANCE, // TODO: Update to remote store if enabled
unassignedInfo
)
ShardRouting.newUnassigned(shardId, false, true, EmptyStoreRecoverySource.INSTANCE, unassignedInfo)
);
}
shards.put(shardNumber, indexShardRoutingBuilder.build());
Expand Down Expand Up @@ -624,13 +618,7 @@ private Builder initializeEmpty(IndexMetadata indexMetadata, UnassignedInfo unas
}
for (int i = 0; i < indexMetadata.getNumberOfSearchOnlyReplicas(); i++) {
indexShardRoutingBuilder.addShard(
ShardRouting.newUnassigned(
shardId,
false,
true,
PeerRecoverySource.INSTANCE, // TODO: Update to remote store if enabled
unassignedInfo
)
ShardRouting.newUnassigned(shardId, false, true, EmptyStoreRecoverySource.INSTANCE, unassignedInfo)
);
}
shards.put(shardNumber, indexShardRoutingBuilder.build());
Expand Down Expand Up @@ -665,7 +653,7 @@ public Builder addSearchReplica() {
shardId,
false,
true,
PeerRecoverySource.INSTANCE, // TODO: Change to remote store if enabled
EmptyStoreRecoverySource.INSTANCE,
new UnassignedInfo(UnassignedInfo.Reason.REPLICA_ADDED, null)
);
shards.put(shardNumber, new IndexShardRoutingTable.Builder(shards.get(shard.id())).addShard(shard).build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,21 @@
if (shard.initializing()) {
allInitializingShards.add(shard);
}
if (shard.isSearchOnly()) {
mch2 marked this conversation as resolved.
Show resolved Hide resolved
// mark search only shards as initializing or assigned, but do not add to
// the allAllocationId set. Cluster Manager will filter out search replica aIds in
// the in-sync set that is sent to primaries, but they are still included in the routing table.
// This ensures the primaries do not validate these ids exist in tracking nor are included
// in the unavailableInSyncShards set.
if (shard.relocating()) {
allInitializingShards.add(shard.getTargetRelocatingShard());
assignedShards.add(shard.getTargetRelocatingShard());

Check warning on line 145 in server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java#L144-L145

Added lines #L144 - L145 were not covered by tests
}
if (shard.assignedToNode()) {
assignedShards.add(shard);
}
continue;
}
if (shard.relocating()) {
// create the target initializing shard routing on the node the shard is relocating to
allInitializingShards.add(shard.getTargetRelocatingShard());
Expand Down
Loading
Loading