Skip to content

Commit

Permalink
Snapshot _status API: Include in-progress snapshots in total shard co…
Browse files Browse the repository at this point in the history
…unt and index filter (opensearch-project#16394)

Signed-off-by: Lakshya Taragi <[email protected]>
  • Loading branch information
ltaragi authored Oct 21, 2024
1 parent 2dfd519 commit 6c7581e
Show file tree
Hide file tree
Showing 4 changed files with 477 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -619,9 +619,9 @@ public void testSnapshotStatusApiFailureForTooManyShardsAcrossSnapshots() throws
);
assertEquals(exception.status(), RestStatus.TOO_MANY_REQUESTS);
assertTrue(
exception.getMessage().endsWith(" is more than the maximum allowed value of shard count [2] for snapshot status request")
exception.getMessage().contains(" is more than the maximum allowed value of shard count [2] for snapshot status request")
);
}, 1, TimeUnit.MINUTES);
});

// across multiple snapshots
assertBusy(() -> {
Expand All @@ -636,13 +636,13 @@ public void testSnapshotStatusApiFailureForTooManyShardsAcrossSnapshots() throws
);
assertEquals(exception.status(), RestStatus.TOO_MANY_REQUESTS);
assertTrue(
exception.getMessage().endsWith(" is more than the maximum allowed value of shard count [2] for snapshot status request")
exception.getMessage().contains(" is more than the maximum allowed value of shard count [2] for snapshot status request")
);
}, 1, TimeUnit.MINUTES);
});

logger.info("Reset MAX_SHARDS_ALLOWED_IN_STATUS_API to default value");
updateSettingsRequest.persistentSettings(Settings.builder().putNull(MAX_SHARDS_ALLOWED_IN_STATUS_API.getKey()));
assertAcked(internalCluster().client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());
}

public void testSnapshotStatusForIndexFilter() throws Exception {
Expand All @@ -666,6 +666,7 @@ public void testSnapshotStatusForIndexFilter() throws Exception {
String snapshot = "test-snap-1";
createSnapshot(repositoryName, snapshot, List.of(index1, index2, index3));

// for a completed snapshot
assertBusy(() -> {
SnapshotStatus snapshotsStatus = client().admin()
.cluster()
Expand All @@ -682,6 +683,96 @@ public void testSnapshotStatusForIndexFilter() throws Exception {
}, 1, TimeUnit.MINUTES);
}

public void testSnapshotStatusForIndexFilterForInProgressSnapshot() throws Exception {
String repositoryName = "test-repo";
createRepository(repositoryName, "mock", Settings.builder().put("location", randomRepoPath()).put("block_on_data", true));

logger.info("Create indices");
String index1 = "test-idx-1";
String index2 = "test-idx-2";
String index3 = "test-idx-3";
createIndex(index1, index2, index3);
ensureGreen();

logger.info("Indexing some data");
for (int i = 0; i < 10; i++) {
index(index1, "_doc", Integer.toString(i), "foo", "bar" + i);
index(index2, "_doc", Integer.toString(i), "foo", "baz" + i);
index(index3, "_doc", Integer.toString(i), "foo", "baz" + i);
}
refresh();
String inProgressSnapshot = "test-in-progress-snapshot";

logger.info("Create snapshot");
ActionFuture<CreateSnapshotResponse> createSnapshotResponseActionFuture = startFullSnapshot(repositoryName, inProgressSnapshot);

logger.info("Block data node");
waitForBlockOnAnyDataNode(repositoryName, TimeValue.timeValueMinutes(1));
awaitNumberOfSnapshotsInProgress(1);

// test normal functioning of index filter for in progress snapshot
assertBusy(() -> {
SnapshotStatus snapshotsStatus = client().admin()
.cluster()
.prepareSnapshotStatus(repositoryName)
.setSnapshots(inProgressSnapshot)
.setIndices(index1, index2)
.get()
.getSnapshots()
.get(0);
Map<String, SnapshotIndexStatus> snapshotIndexStatusMap = snapshotsStatus.getIndices();
// Although the snapshot contains 3 indices, the response of status api call only contains results for 2
assertEquals(snapshotIndexStatusMap.size(), 2);
assertEquals(snapshotIndexStatusMap.keySet(), Set.of(index1, index2));
});

// when a non-existent index is requested in the index-filter
assertBusy(() -> {
// failure due to index not found in snapshot
final String nonExistentIndex1 = "non-existent-index-1";
final String nonExistentIndex2 = "non-existent-index-2";
Exception ex = expectThrows(
Exception.class,
() -> client().admin()
.cluster()
.prepareSnapshotStatus(repositoryName)
.setSnapshots(inProgressSnapshot)
.setIndices(index1, index2, nonExistentIndex1, nonExistentIndex2)
.execute()
.actionGet()
);
String cause = String.format(
Locale.ROOT,
"indices [%s] missing in snapshot [%s] of repository [%s]",
String.join(", ", List.of(nonExistentIndex2, nonExistentIndex1)),
inProgressSnapshot,
repositoryName
);
assertEquals(cause, ex.getCause().getMessage());

// no error for ignore_unavailable = true and status response contains only the found indices
SnapshotStatus snapshotsStatus = client().admin()
.cluster()
.prepareSnapshotStatus(repositoryName)
.setSnapshots(inProgressSnapshot)
.setIndices(index1, index2, nonExistentIndex1, nonExistentIndex2)
.setIgnoreUnavailable(true)
.get()
.getSnapshots()
.get(0);

Map<String, SnapshotIndexStatus> snapshotIndexStatusMap = snapshotsStatus.getIndices();
assertEquals(snapshotIndexStatusMap.size(), 2);
assertEquals(snapshotIndexStatusMap.keySet(), Set.of(index1, index2));
});

logger.info("Unblock data node");
unblockAllDataNodes(repositoryName);

logger.info("Wait for snapshot to finish");
waitForCompletion(repositoryName, inProgressSnapshot, TimeValue.timeValueSeconds(60));
}

public void testSnapshotStatusFailuresWithIndexFilter() throws Exception {
String repositoryName = "test-repo";
String index1 = "test-idx-1";
Expand All @@ -705,6 +796,39 @@ public void testSnapshotStatusFailuresWithIndexFilter() throws Exception {
createSnapshot(repositoryName, snapshot1, List.of(index1, index2, index3));
createSnapshot(repositoryName, snapshot2, List.of(index1));

assertBusy(() -> {
// failure due to passing index filter for _all value of repository param
Exception ex = expectThrows(
Exception.class,
() -> client().admin()
.cluster()
.prepareSnapshotStatus("_all")
.setSnapshots(snapshot1)
.setIndices(index1, index2, index3)
.execute()
.actionGet()
);
String cause =
"index list filter is supported only when a single 'repository' is passed, but found 'repository' param = [_all]";
assertTrue(ex.getMessage().contains(cause));
});

assertBusy(() -> {
// failure due to passing index filter for _all value of snapshot param --> gets translated as a blank array
Exception ex = expectThrows(
Exception.class,
() -> client().admin()
.cluster()
.prepareSnapshotStatus(repositoryName)
.setSnapshots()
.setIndices(index1, index2, index3)
.execute()
.actionGet()
);
String cause = "index list filter is supported only when a single 'snapshot' is passed, but found 'snapshot' param = [_all]";
assertTrue(ex.getMessage().contains(cause));
});

assertBusy(() -> {
// failure due to passing index filter for multiple snapshots
ActionRequestValidationException ex = expectThrows(
Expand All @@ -717,9 +841,10 @@ public void testSnapshotStatusFailuresWithIndexFilter() throws Exception {
.execute()
.actionGet()
);
String cause = "index list filter is supported only for a single snapshot";
String cause =
"index list filter is supported only when a single 'snapshot' is passed, but found 'snapshot' param = [[test-snap-1, test-snap-2]]";
assertTrue(ex.getMessage().contains(cause));
}, 1, TimeUnit.MINUTES);
});

assertBusy(() -> {
// failure due to index not found in snapshot
Expand All @@ -743,7 +868,18 @@ public void testSnapshotStatusFailuresWithIndexFilter() throws Exception {
);
assertEquals(cause, ex.getCause().getMessage());

}, 1, TimeUnit.MINUTES);
// no error for ignore_unavailable = true and status response contains only the found indices
SnapshotStatus snapshotsStatus = client().admin()
.cluster()
.prepareSnapshotStatus(repositoryName)
.setSnapshots(snapshot2)
.setIndices(index1, index2, index3)
.setIgnoreUnavailable(true)
.get()
.getSnapshots()
.get(0);
assertEquals(1, snapshotsStatus.getIndices().size());
});

assertBusy(() -> {
// failure due to too many shards requested
Expand All @@ -763,12 +899,148 @@ public void testSnapshotStatusFailuresWithIndexFilter() throws Exception {
.actionGet()
);
assertEquals(ex.status(), RestStatus.TOO_MANY_REQUESTS);
assertTrue(ex.getMessage().endsWith(" is more than the maximum allowed value of shard count [2] for snapshot status request"));
assertTrue(ex.getMessage().contains(" is more than the maximum allowed value of shard count [2] for snapshot status request"));

logger.info("Reset MAX_SHARDS_ALLOWED_IN_STATUS_API to default value");
updateSettingsRequest.persistentSettings(Settings.builder().putNull(MAX_SHARDS_ALLOWED_IN_STATUS_API.getKey()));
assertAcked(internalCluster().client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());
}, 2, TimeUnit.MINUTES);
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());
});
}

public void testSnapshotStatusShardLimitOfResponseForInProgressSnapshot() throws Exception {
logger.info("Create repository");
String repositoryName = "test-repo";
createRepository(
repositoryName,
"mock",
Settings.builder()
.put("location", randomRepoPath())
.put("compress", false)
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)
.put("wait_after_unblock", 200)
);

logger.info("Create indices");
String index1 = "test-idx-1";
String index2 = "test-idx-2";
String index3 = "test-idx-3";
assertAcked(prepareCreate(index1, 1, Settings.builder().put("number_of_shards", 1).put("number_of_replicas", 0)));
assertAcked(prepareCreate(index2, 1, Settings.builder().put("number_of_shards", 1).put("number_of_replicas", 0)));
assertAcked(prepareCreate(index3, 1, Settings.builder().put("number_of_shards", 1).put("number_of_replicas", 0)));
ensureGreen();

logger.info("Index some data");
indexRandomDocs(index1, 10);
indexRandomDocs(index2, 10);
indexRandomDocs(index3, 10);

logger.info("Create completed snapshot");
String completedSnapshot = "test-completed-snapshot";
String blockedNode = blockNodeWithIndex(repositoryName, index1);
client().admin().cluster().prepareCreateSnapshot(repositoryName, completedSnapshot).setWaitForCompletion(false).get();
waitForBlock(blockedNode, repositoryName, TimeValue.timeValueSeconds(60));
unblockNode(repositoryName, blockedNode);
waitForCompletion(repositoryName, completedSnapshot, TimeValue.timeValueSeconds(60));

logger.info("Index some more data");
indexRandomDocs(index1, 10);
indexRandomDocs(index2, 10);
indexRandomDocs(index3, 10);
refresh();

logger.info("Create in-progress snapshot");
String inProgressSnapshot = "test-in-progress-snapshot";
blockedNode = blockNodeWithIndex(repositoryName, index1);
client().admin().cluster().prepareCreateSnapshot(repositoryName, inProgressSnapshot).setWaitForCompletion(false).get();
waitForBlock(blockedNode, repositoryName, TimeValue.timeValueSeconds(60));
List<SnapshotStatus> snapshotStatuses = client().admin()
.cluster()
.prepareSnapshotStatus(repositoryName)
.setSnapshots(inProgressSnapshot, completedSnapshot)
.get()
.getSnapshots();

assertEquals(2, snapshotStatuses.size());
assertEquals(SnapshotsInProgress.State.STARTED, snapshotStatuses.get(0).getState());
assertEquals(SnapshotsInProgress.State.SUCCESS, snapshotStatuses.get(1).getState());

logger.info("Set MAX_SHARDS_ALLOWED_IN_STATUS_API to a low value");
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
updateSettingsRequest.persistentSettings(Settings.builder().put(MAX_SHARDS_ALLOWED_IN_STATUS_API.getKey(), 1));
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());

// shard limit exceeded due to inProgress snapshot alone @ without index-filter
assertBusy(() -> {
CircuitBreakingException exception = expectThrows(
CircuitBreakingException.class,
() -> client().admin()
.cluster()
.prepareSnapshotStatus(repositoryName)
.setSnapshots(inProgressSnapshot)
.execute()
.actionGet()
);
assertEquals(exception.status(), RestStatus.TOO_MANY_REQUESTS);
assertTrue(
exception.getMessage().contains(" is more than the maximum allowed value of shard count [1] for snapshot status request")
);
});

// shard limit exceeded due to inProgress snapshot alone @ with index-filter
assertBusy(() -> {
CircuitBreakingException exception = expectThrows(
CircuitBreakingException.class,
() -> client().admin()
.cluster()
.prepareSnapshotStatus(repositoryName)
.setSnapshots(inProgressSnapshot)
.setIndices(index1, index2)
.execute()
.actionGet()
);
assertEquals(exception.status(), RestStatus.TOO_MANY_REQUESTS);
assertTrue(
exception.getMessage().contains(" is more than the maximum allowed value of shard count [1] for snapshot status request")
);
});

logger.info("Set MAX_SHARDS_ALLOWED_IN_STATUS_API to a slightly higher value");
updateSettingsRequest.persistentSettings(Settings.builder().put(MAX_SHARDS_ALLOWED_IN_STATUS_API.getKey(), 5));
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());

// shard limit exceeded due to passing for inProgress but failing for current + completed

assertBusy(() -> {
SnapshotStatus inProgressSnapshotStatus = client().admin()
.cluster()
.prepareSnapshotStatus(repositoryName)
.setSnapshots(inProgressSnapshot)
.get()
.getSnapshots()
.get(0);
assertEquals(3, inProgressSnapshotStatus.getShards().size());

CircuitBreakingException exception = expectThrows(
CircuitBreakingException.class,
() -> client().admin()
.cluster()
.prepareSnapshotStatus(repositoryName)
.setSnapshots(inProgressSnapshot, completedSnapshot)
.execute()
.actionGet()
);
assertEquals(exception.status(), RestStatus.TOO_MANY_REQUESTS);
assertTrue(
exception.getMessage().contains(" is more than the maximum allowed value of shard count [5] for snapshot status request")
);
});

unblockNode(repositoryName, blockedNode);
waitForCompletion(repositoryName, inProgressSnapshot, TimeValue.timeValueSeconds(60));

logger.info("Reset MAX_SHARDS_ALLOWED_IN_STATUS_API to default value");
updateSettingsRequest.persistentSettings(Settings.builder().putNull(MAX_SHARDS_ALLOWED_IN_STATUS_API.getKey()));
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());
}

private static SnapshotIndexShardStatus stateFirstShard(SnapshotStatus snapshotStatus, String indexName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.opensearch.core.common.io.stream.StreamOutput;

import java.io.IOException;
import java.util.Arrays;

import static org.opensearch.action.ValidateActions.addValidationError;

Expand Down Expand Up @@ -124,8 +125,20 @@ public ActionRequestValidationException validate() {
if (snapshots == null) {
validationException = addValidationError("snapshots is null", validationException);
}
if (indices.length != 0 && snapshots.length != 1) {
validationException = addValidationError("index list filter is supported only for a single snapshot", validationException);
if (indices.length != 0) {
if (repository.equals("_all")) {
String error =
"index list filter is supported only when a single 'repository' is passed, but found 'repository' param = [_all]";
validationException = addValidationError(error, validationException);
}
if (snapshots.length != 1) {
// snapshot param was '_all' (length = 0) or a list of snapshots (length > 1)
String snapshotParamValue = snapshots.length == 0 ? "_all" : Arrays.toString(snapshots);
String error = "index list filter is supported only when a single 'snapshot' is passed, but found 'snapshot' param = ["
+ snapshotParamValue
+ "]";
validationException = addValidationError(error, validationException);
}
}
return validationException;
}
Expand Down
Loading

0 comments on commit 6c7581e

Please sign in to comment.