Skip to content

Commit

Permalink
Snapshot Status API changes
Browse files Browse the repository at this point in the history
  • Loading branch information
ltaragi authored and Sachin Kale committed Aug 21, 2024
1 parent 55f1963 commit 939854e
Show file tree
Hide file tree
Showing 30 changed files with 722 additions and 112 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesRequest;
import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesResponse;
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.opensearch.action.admin.cluster.snapshots.status.SnapshotStatus;
import org.opensearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse;
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
Expand All @@ -26,6 +28,7 @@
import java.nio.file.Path;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static org.opensearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING;
Expand Down Expand Up @@ -98,7 +101,7 @@ public void testMigrationDirections() {
assertThrows(IllegalArgumentException.class, () -> client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());
}

public void testNoShallowSnapshotInMixedMode() throws Exception {
public void testSnapshotStatusAPIForRemoteStoreEnabledCluster() throws Exception {
logger.info("Initialize remote cluster");
addRemote = true;
internalCluster().setBootstrapClusterManagerNodeIndex(0);
Expand All @@ -110,8 +113,18 @@ public void testNoShallowSnapshotInMixedMode() throws Exception {
internalCluster().validateClusterFormed();

logger.info("Create remote backed index");
RemoteStoreMigrationShardAllocationBaseTestCase.createIndex("test", 0);
RemoteStoreMigrationShardAllocationBaseTestCase.assertRemoteStoreBackedIndex("test");
String index1 = "index1";
String index2 = "index2";
String index3 = "index3";
RemoteStoreMigrationShardAllocationBaseTestCase.createIndex(index1, 0);
RemoteStoreMigrationShardAllocationBaseTestCase.createIndex(index2, 0);
// RemoteStoreMigrationShardAllocationBaseTestCase.createIndex(index3, 0);

logger.info("--> indexing some data");
for (int i = 0; i < 10; i++) {
index(index1, "_doc", Integer.toString(i), "foo", "bar" + i);
}
refresh();

logger.info("Create shallow snapshot setting enabled repo");
String shallowSnapshotRepoName = "shallow-snapshot-repo-name";
Expand All @@ -131,19 +144,23 @@ public void testNoShallowSnapshotInMixedMode() throws Exception {
SnapshotInfo snapshotInfo1 = RemoteStoreMigrationShardAllocationBaseTestCase.createSnapshot(
shallowSnapshotRepoName,
snapshot1,
"test"
index1,
index2
);
assertEquals(snapshotInfo1.isRemoteStoreIndexShallowCopyEnabled(), true);

logger.info("Set MIXED compatibility mode");
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
updateSettingsRequest.persistentSettings(Settings.builder().put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "mixed"));
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());
assertBusy(() -> {
SnapshotsStatusResponse snapshotsStatusResponse = client().admin()
.cluster()
.prepareSnapshotStatus(shallowSnapshotRepoName)
.setSnapshots(snapshot1)
.setIndices(index1, index2, index3)
.setIgnoreUnavailable(false)
.get();
SnapshotStatus snapshotStatus1 = snapshotsStatusResponse.getSnapshots().get(0);
logger.info("*** current snapshot status - totalSize [{}]", snapshotStatus1.getStats().getTotalSize());
assertNotEquals(0L, snapshotStatus1.getStats().getTotalSize());
}, 1, TimeUnit.MINUTES);

logger.info("Verify that new snapshot is not shallow");
final String snapshot2 = "snapshot2";
SnapshotInfo snapshotInfo2 = RemoteStoreMigrationShardAllocationBaseTestCase.createSnapshot(shallowSnapshotRepoName, snapshot2);
assertEquals(snapshotInfo2.isRemoteStoreIndexShallowCopyEnabled(), false);
}

/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,12 @@

package org.opensearch.snapshots;

import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.opensearch.action.admin.cluster.snapshots.status.SnapshotIndexShardStage;
import org.opensearch.action.admin.cluster.snapshots.status.SnapshotIndexShardStatus;
import org.opensearch.action.admin.cluster.snapshots.status.SnapshotStatus;
import org.opensearch.action.admin.cluster.stats.ClusterStatsIndices;
import org.opensearch.cluster.SnapshotsInProgress;
import org.opensearch.common.action.ActionFuture;
import org.opensearch.common.settings.Settings;
Expand All @@ -46,6 +48,9 @@
import java.nio.file.Path;

import static org.opensearch.remotestore.RemoteStoreBaseIntegTestCase.remoteStoreClusterSettings;
import static org.opensearch.snapshots.SnapshotsService.MAX_SHARDS_ALLOWED_IN_STATUS_API;
import static org.opensearch.snapshots.SnapshotsService.SHALLOW_SNAPSHOT_V2;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
Expand Down Expand Up @@ -78,31 +83,37 @@ public void testStatusAPICallForShallowCopySnapshot() throws Exception {
final String snapshotRepoName = "snapshot-repo-name";
createRepository(snapshotRepoName, "fs", snapshotRepoSettingsForShallowCopy());

final String remoteStoreEnabledIndexName = "remote-index-1";
final String remoteStoreEnabledIndexName = "remote-index";
final String remoteStoreEnabledIndexName1 = "remote-index-1";
final Settings remoteStoreEnabledIndexSettings = getRemoteStoreBackedIndexSettings();
createIndex(remoteStoreEnabledIndexName, remoteStoreEnabledIndexSettings);
createIndex(remoteStoreEnabledIndexName1, remoteStoreEnabledIndexSettings);
ensureGreen();

logger.info("--> indexing some data");
for (int i = 0; i < 100; i++) {
index(remoteStoreEnabledIndexName, "_doc", Integer.toString(i), "foo", "bar" + i);
index(remoteStoreEnabledIndexName1, "_doc", Integer.toString(i), "foo", "bar" + i);
}
refresh();

final String snapshot = "snapshot";
createFullSnapshot(snapshotRepoName, snapshot);
assert (getLockFilesInRemoteStore(remoteStoreEnabledIndexName, remoteStoreRepoName).length == 1);

final SnapshotStatus snapshotStatus = getSnapshotStatus(snapshotRepoName, snapshot);
assertThat(snapshotStatus.getState(), is(SnapshotsInProgress.State.SUCCESS));

// Validating that the incremental file count and incremental file size is zero for shallow copy
final SnapshotIndexShardStatus shallowSnapshotShardState = stateFirstShard(snapshotStatus, remoteStoreEnabledIndexName);
assertThat(shallowSnapshotShardState.getStage(), is(SnapshotIndexShardStage.DONE));
assertThat(shallowSnapshotShardState.getStats().getTotalFileCount(), greaterThan(0));
assertThat(shallowSnapshotShardState.getStats().getTotalSize(), greaterThan(0L));
assertThat(shallowSnapshotShardState.getStats().getIncrementalFileCount(), is(0));
assertThat(shallowSnapshotShardState.getStats().getIncrementalSize(), is(0L));
assertBusy(() -> {
// although no. of shards in snapshot (3) is greater than the max value allowed in a status api call, the request does not fail
// TODO: currently this gives snapshot not found at repository issue --> discuss what to do here
SnapshotStatus snapshotsStatus = client().admin()
.cluster()
.prepareSnapshotStatus(snapshotRepoName)
.setSnapshots(snapshot)
.setIndices(remoteStoreEnabledIndexName1)
.get()
.getSnapshots()
.get(0);
logger.info("*** snapshotsStatus = {}", snapshotsStatus);
});
}

public void testStatusAPIStatsForBackToBackShallowSnapshot() throws Exception {
Expand Down Expand Up @@ -192,6 +203,77 @@ public void testStatusAPICallInProgressShallowSnapshot() throws Exception {
createSnapshotResponseActionFuture.actionGet();
}

public void testStatusAPICallForShallowV2Snapshot() throws Exception {
disableRepoConsistencyCheck("Remote store repository is being used for the test");
internalCluster().startClusterManagerOnlyNode();
internalCluster().startDataOnlyNodes(2);

final String snapshotRepoName = "snapshot-repo-name";
createRepository(snapshotRepoName, "fs", snapshotRepoSettingsForShallowCopy());

final String index1 = "remote-index-1";
final String index2 = "remote-index-2";
final String index3 = "remote-index-3";
final Settings remoteStoreEnabledIndexSettings = getRemoteStoreBackedIndexSettings();
createIndex(index1, remoteStoreEnabledIndexSettings);
createIndex(index2, remoteStoreEnabledIndexSettings);
createIndex(index3, remoteStoreEnabledIndexSettings);
ensureGreen();

logger.info("Indexing some data");
for (int i = 0; i < 50; i++) {
index(index1, "_doc", Integer.toString(i), "foo", "bar" + i);
index(index2, "_doc", Integer.toString(i), "foo", "bar" + i);
index(index3, "_doc", Integer.toString(i), "foo", "bar" + i);
}
logger.info("*** triggering refresh from ");
refresh();

logger.info("Set SNAPSHOT_V2 as true and MAX_SHARDS_ALLOWED_IN_STATUS_API to a low value");
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
updateSettingsRequest.persistentSettings(
Settings.builder().put(MAX_SHARDS_ALLOWED_IN_STATUS_API.getKey(), 2).put(SHALLOW_SNAPSHOT_V2.getKey(), true)
);
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());

final String snapshot = "snapshot";
SnapshotInfo snapshotInfo = createFullSnapshot(snapshotRepoName, snapshot);
assertTrue(snapshotInfo.getPinnedTimestamp() > 0); // to assert creation of a shallow v2 snapshot
logger.info(
"*** primarySize = {}",
client().admin().indices().prepareStats().execute().actionGet().getPrimaries().getStore().size()
);
ClusterStatsIndices clusterStatsIndices = client().admin().cluster().prepareClusterStats().get().getIndicesStats();
logger.info("*** clusterStatsIndices = {}", clusterStatsIndices.toString());
logger.info("*** clusterStatsIndices.getStore().sizeInBytes() = {}", clusterStatsIndices.getStore().sizeInBytes());

logger.info("Indexing some data");
for (int i = 0; i < 50; i++) {
index(index1, "_doc", Integer.toString(i), "foo", "bar" + i);
index(index2, "_doc", Integer.toString(i), "foo", "bar" + i);
index(index3, "_doc", Integer.toString(i), "foo", "bar" + i);
}
logger.info("*** triggering refresh from ");
refresh();

final String snapshot1 = "snapshot1";
SnapshotInfo snapshotInfo1 = createFullSnapshot(snapshotRepoName, snapshot1);
assertTrue(snapshotInfo.getPinnedTimestamp() > 0); // to assert creation of a shallow v2 snapshot
logger.info(
"*** primarySize1 = {}",
client().admin().indices().prepareStats().execute().actionGet().getPrimaries().getStore().size()
);
ClusterStatsIndices clusterStatsIndices1 = client().admin().cluster().prepareClusterStats().get().getIndicesStats();
logger.info("*** clusterStatsIndices1 = {}", clusterStatsIndices.toString());
logger.info("*** clusterStatsIndices.getStore().sizeInBytes()1 = {}", clusterStatsIndices.getStore().sizeInBytes());

logger.info("Reset SNAPSHOT_V2 and MAX_SHARDS_ALLOWED_IN_STATUS_API to default values");
updateSettingsRequest.persistentSettings(
Settings.builder().putNull(MAX_SHARDS_ALLOWED_IN_STATUS_API.getKey()).putNull(SHALLOW_SNAPSHOT_V2.getKey())
);
assertAcked(internalCluster().client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());
}

private static SnapshotIndexShardStatus stateFirstShard(SnapshotStatus snapshotStatus, String indexName) {
return snapshotStatus.getIndices().get(indexName).getShards().get(0);
}
Expand Down
Loading

0 comments on commit 939854e

Please sign in to comment.