Skip to content

Commit

Permalink
Integrate local recovery with remote store seeding during migration
Browse files Browse the repository at this point in the history
Signed-off-by: Bhumika Saini <[email protected]>
  • Loading branch information
Bhumika Saini committed Apr 9, 2024
1 parent 9b0f578 commit 5833afd
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.remotemigration;

import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.opensearch.action.admin.indices.stats.ShardStats;
import org.opensearch.client.Client;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.remote.RemoteSegmentStats;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

import java.util.Map;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RemotePrimaryLocalRecoveryIT extends MigrationBaseTestCase {
public void testLocalRecoveryRollingRestart() throws Exception {
String docRepNode = internalCluster().startNode();
Client client = internalCluster().client(docRepNode);

// create index
client().admin().indices().prepareCreate("idx1").setSettings(indexSettings()).setMapping("field", "type=text").get();
ensureGreen("idx1");

indexBulk("idx1", randomIntBetween(10, 100));
refresh("idx1");

// Index some more docs
indexBulk("idx1", randomIntBetween(10, 100));

ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
updateSettingsRequest.persistentSettings(Settings.builder().put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "mixed"));
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());

// add remote node in mixed mode cluster
addRemote = true;
String remoteNode = internalCluster().startNode();
internalCluster().validateClusterFormed();

updateSettingsRequest = new ClusterUpdateSettingsRequest();
updateSettingsRequest.persistentSettings(Settings.builder().put(MIGRATION_DIRECTION_SETTING.getKey(), "remote_store"));
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());

// rolling restart
final Settings remoteNodeAttributes = remoteStoreClusterSettings(
REPOSITORY_NAME,
segmentRepoPath,
REPOSITORY_2_NAME,
translogRepoPath
);
internalCluster().rollingRestart(new InternalTestCluster.RestartCallback() {
// Update remote attributes
@Override
public Settings onNodeStopped(String nodeName) throws Exception {
return remoteNodeAttributes;
}
});

ensureStableCluster(2);
ensureGreen("idx1");
assertEquals(internalCluster().size(), 2);

// Assert on remote uploads
Map<ShardRouting, ShardStats> shardStatsMap = internalCluster().client().admin().indices().prepareStats("idx1").get().asMap();
DiscoveryNodes discoveryNodes = internalCluster().client().admin().cluster().prepareState().get().getState().getNodes();
shardStatsMap.forEach((shardRouting, shardStats) -> {
if (discoveryNodes.get(shardRouting.currentNodeId()).isRemoteStoreNode()) {
RemoteSegmentStats remoteSegmentStats = shardStats.getStats().getSegments().getRemoteSegmentStats();
assertTrue(remoteSegmentStats.getTotalUploadTime() > 0);
assertTrue(remoteSegmentStats.getUploadBytesSucceeded() > 0);
}
});
}
}
14 changes: 12 additions & 2 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -3702,7 +3702,7 @@ public void startRecovery(
RepositoriesService repositoriesService,
Consumer<MappingMetadata> mappingUpdateConsumer,
IndicesService indicesService
) {
) throws IOException {
// TODO: Create a proper object to encapsulate the recovery context
// all of the current methods here follow a pattern of:
// resolve context which isn't really dependent on the local shards and then async
Expand All @@ -3724,7 +3724,17 @@ public void startRecovery(
switch (recoveryState.getRecoverySource().getType()) {
case EMPTY_STORE:
case EXISTING_STORE:
executeRecovery("from store", recoveryState, recoveryListener, this::recoverFromStore);
if (shouldSeedRemoteStore() && routingEntry().primary()) {
deleteRemoteStoreContents();
// This cleans up remote translog's 0 generation, as we don't want to get that uploaded
sync();
threadPool.executor(ThreadPool.Names.GENERIC)
.execute(() -> { refresh("local recovery during remote store migration"); });
waitForRemoteStoreSync();
logger.info("Remote Store is now seeded for {} after local recovery", shardId());
} else {
executeRecovery("from store", recoveryState, recoveryListener, this::recoverFromStore);
}
break;
case REMOTE_STORE:
executeRecovery("from remote store", recoveryState, recoveryListener, l -> restoreFromRemoteStore(l));
Expand Down

0 comments on commit 5833afd

Please sign in to comment.