Skip to content

Commit

Permalink
Adding UT
Browse files Browse the repository at this point in the history
Signed-off-by: Shubh Sahu <[email protected]>
  • Loading branch information
Shubh Sahu committed Dec 19, 2024
1 parent cbb68f6 commit b0ea945
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.util.Version;
import org.opensearch.action.StepListener;
import org.opensearch.cluster.ClusterChangedEvent;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.SnapshotsInProgress;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.concurrent.GatedCloseable;
import org.opensearch.common.settings.Settings;
Expand All @@ -20,6 +23,7 @@
import org.opensearch.index.engine.Engine;
import org.opensearch.index.engine.InternalEngine;
import org.opensearch.index.engine.NRTReplicationEngineFactory;
import org.opensearch.index.snapshots.IndexShardSnapshotStatus;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.indices.replication.CheckpointInfoResponse;
Expand All @@ -32,6 +36,11 @@
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.common.ReplicationFailedException;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.snapshots.Snapshot;
import org.opensearch.snapshots.SnapshotId;
import org.opensearch.snapshots.SnapshotShardsService;
import org.opensearch.test.CorruptionUtils;
import org.opensearch.test.junit.annotations.TestLogging;
import org.hamcrest.MatcherAssert;
Expand All @@ -41,6 +50,7 @@
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
Expand All @@ -55,6 +65,8 @@
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -541,6 +553,81 @@ public void onReplicationFailure(
}
}

public void testShallowCopySnapshotForClosedIndexSuccessful() throws Exception {
try (ReplicationGroup shards = createGroup(0, settings)) {
final IndexShard primaryShard = shards.getPrimary();
shards.startAll();
shards.indexDocs(10);
shards.refresh("test");
shards.flush();
shards.assertAllEqual(10);

RepositoriesService repositoriesService = createRepositoriesService();
BlobStoreRepository repository = (BlobStoreRepository) repositoriesService.repository("random");

doAnswer(invocation -> {
IndexShardSnapshotStatus snapshotStatus = invocation.getArgument(5);
long commitGeneration = invocation.getArgument(7);
long startTime = invocation.getArgument(8);
final Map<String, Long> indexFilesToFileLengthMap = invocation.getArgument(9);
ActionListener<String> listener = invocation.getArgument(10);
if (indexFilesToFileLengthMap != null) {
List<String> fileNames = new ArrayList<>(indexFilesToFileLengthMap.keySet());
long indexTotalFileSize = indexFilesToFileLengthMap.values().stream().mapToLong(Long::longValue).sum();
int indexTotalNumberOfFiles = fileNames.size();
snapshotStatus.moveToStarted(startTime, 0, indexTotalNumberOfFiles, 0, indexTotalFileSize);
// Not performing actual snapshot, just modifying the state
snapshotStatus.moveToFinalize(commitGeneration);
snapshotStatus.moveToDone(System.currentTimeMillis(), snapshotStatus.generation());
listener.onResponse(snapshotStatus.generation());
return null;
}
listener.onResponse(snapshotStatus.generation());
return null;
}).when(repository)
.snapshotRemoteStoreIndexShard(any(), any(), any(), any(), any(), any(), anyLong(), anyLong(), anyLong(), any(), any());

final SnapshotShardsService shardsService = getSnapshotShardsService(
primaryShard,
shards.getIndexMetadata(),
true,
repositoriesService
);
final Snapshot snapshot1 = new Snapshot(
randomAlphaOfLength(10),
new SnapshotId(randomAlphaOfLength(5), randomAlphaOfLength(5))
);

// Initialize the shallow copy snapshot
final ClusterState initState = addSnapshotIndex(
clusterService.state(),
snapshot1,
primaryShard,
SnapshotsInProgress.State.INIT,
true
);
shardsService.clusterChanged(new ClusterChangedEvent("test", initState, clusterService.state()));

// start the snapshot
shardsService.clusterChanged(
new ClusterChangedEvent(
"test",
addSnapshotIndex(clusterService.state(), snapshot1, primaryShard, SnapshotsInProgress.State.STARTED, true),
initState
)
);

// Check the snapshot got completed successfully
assertBusy(() -> {
final IndexShardSnapshotStatus.Copy copy = shardsService.currentSnapshotShards(snapshot1)
.get(primaryShard.shardId)
.asCopy();
final IndexShardSnapshotStatus.Stage stage = copy.getStage();
assertEquals(IndexShardSnapshotStatus.Stage.DONE, stage);
});
}
}

private RemoteStoreReplicationSource getRemoteStoreReplicationSource(IndexShard shard, Runnable postGetFilesRunnable) {
return new RemoteStoreReplicationSource(shard) {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import org.opensearch.indices.replication.common.ReplicationState;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.repositories.IndexId;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.snapshots.Snapshot;
import org.opensearch.snapshots.SnapshotId;
import org.opensearch.snapshots.SnapshotInfoTests;
Expand Down Expand Up @@ -892,10 +893,21 @@ public void testSnapshotWhileFailoverIncomplete() throws Exception {
replicateSegments(primaryShard, shards.getReplicas());
shards.assertAllEqual(10);

final SnapshotShardsService shardsService = getSnapshotShardsService(replicaShard, shards.getIndexMetadata());
final SnapshotShardsService shardsService = getSnapshotShardsService(
replicaShard,
shards.getIndexMetadata(),
false,
createRepositoriesService()
);
final Snapshot snapshot = new Snapshot(randomAlphaOfLength(10), new SnapshotId(randomAlphaOfLength(5), randomAlphaOfLength(5)));

final ClusterState initState = addSnapshotIndex(clusterService.state(), snapshot, replicaShard, SnapshotsInProgress.State.INIT);
final ClusterState initState = addSnapshotIndex(
clusterService.state(),
snapshot,
replicaShard,
SnapshotsInProgress.State.INIT,
false
);
shardsService.clusterChanged(new ClusterChangedEvent("test", initState, clusterService.state()));

CountDownLatch latch = new CountDownLatch(1);
Expand All @@ -907,7 +919,7 @@ public void testSnapshotWhileFailoverIncomplete() throws Exception {
shardsService.clusterChanged(
new ClusterChangedEvent(
"test",
addSnapshotIndex(clusterService.state(), snapshot, replicaShard, SnapshotsInProgress.State.STARTED),
addSnapshotIndex(clusterService.state(), snapshot, replicaShard, SnapshotsInProgress.State.STARTED, false),
initState
)
);
Expand Down Expand Up @@ -956,22 +968,30 @@ public void testComputeReplicationCheckpointNullInfosReturnsEmptyCheckpoint() th
}
}

private SnapshotShardsService getSnapshotShardsService(IndexShard replicaShard, IndexMetadata indexMetadata) {
protected SnapshotShardsService getSnapshotShardsService(
IndexShard indexShard,
IndexMetadata indexMetadata,
boolean closedIdx,
RepositoriesService repositoriesService
) {
final TransportService transportService = mock(TransportService.class);
when(transportService.getThreadPool()).thenReturn(threadPool);
final IndicesService indicesService = mock(IndicesService.class);
final IndexService indexService = mock(IndexService.class);
when(indicesService.indexServiceSafe(any())).thenReturn(indexService);
when(indexService.getShardOrNull(anyInt())).thenReturn(replicaShard);
when(indexService.getMetadata()).thenReturn(indexMetadata);
return new SnapshotShardsService(settings, clusterService, createRepositoriesService(), transportService, indicesService);
when(indexService.getShardOrNull(anyInt())).thenReturn(indexShard);
when(indexService.getMetadata()).thenReturn(
new IndexMetadata.Builder(indexMetadata).state(closedIdx ? IndexMetadata.State.CLOSE : IndexMetadata.State.OPEN).build()
);
return new SnapshotShardsService(settings, clusterService, repositoriesService, transportService, indicesService);
}

private ClusterState addSnapshotIndex(
protected ClusterState addSnapshotIndex(
ClusterState state,
Snapshot snapshot,
IndexShard shard,
SnapshotsInProgress.State snapshotState
SnapshotsInProgress.State snapshotState,
boolean shallowCopySnapshot
) {
final Map<ShardId, SnapshotsInProgress.ShardSnapshotStatus> shardsBuilder = new HashMap<>();
ShardRouting shardRouting = shard.shardRouting;
Expand All @@ -992,7 +1012,7 @@ private ClusterState addSnapshotIndex(
null,
SnapshotInfoTests.randomUserMetadata(),
VersionUtils.randomVersion(random()),
false
shallowCopySnapshot
);
return ClusterState.builder(state)
.putCustom(SnapshotsInProgress.TYPE, SnapshotsInProgress.of(Collections.singletonList(entry)))
Expand Down

0 comments on commit b0ea945

Please sign in to comment.