diff --git a/CHANGELOG.md b/CHANGELOG.md index 9f439e48ecab7..23ab886bc5420 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -74,6 +74,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Skip remote-repositories validations for node-joins when RepositoriesService is not in sync with cluster-state ([#16763](https://github.com/opensearch-project/OpenSearch/pull/16763)) - Fix _list/shards API failing when closed indices are present ([#16606](https://github.com/opensearch-project/OpenSearch/pull/16606)) - Fix remote shards balance ([#15335](https://github.com/opensearch-project/OpenSearch/pull/15335)) +- Fix Shallow copy snapshot failures on closed index ([#16868](https://github.com/opensearch-project/OpenSearch/pull/16868)) ### Security diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java index ebb911c739eb3..27893986a411b 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java @@ -39,6 +39,9 @@ import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.plugins.Plugin; +import org.opensearch.repositories.blobstore.BlobStoreRepository; +import org.opensearch.snapshots.SnapshotInfo; +import org.opensearch.snapshots.SnapshotState; import org.opensearch.test.InternalTestCluster; import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.transport.MockTransportService; @@ -1078,4 +1081,67 @@ public void testCloseIndexWithNoOpSyncAndFlushForAsyncTranslog() throws Interrup Thread.sleep(10000); ensureGreen(INDEX_NAME); } + + public void testSuccessfulShallowV1SnapshotPostIndexClose() throws Exception { + internalCluster().startClusterManagerOnlyNode(); + internalCluster().startDataOnlyNodes(1); + createIndex(INDEX_NAME, remoteStoreIndexSettings(0, 10000L, -1)); + ensureGreen(INDEX_NAME); + + ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); + updateSettingsRequest.persistentSettings(Settings.builder().put(CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.getKey(), "0ms")); + + assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); + + logger.info("Create shallow snapshot setting enabled repo"); + String shallowSnapshotRepoName = "shallow-snapshot-repo-name"; + Path shallowSnapshotRepoPath = randomRepoPath(); + Settings.Builder settings = Settings.builder() + .put("location", shallowSnapshotRepoPath) + .put(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), Boolean.TRUE); + createRepository(shallowSnapshotRepoName, "fs", settings); + + for (int i = 0; i < 10; i++) { + indexBulk(INDEX_NAME, 1); + } + flushAndRefresh(INDEX_NAME); + + logger.info("Verify shallow snapshot created before close"); + final String snapshot1 = "snapshot1"; + SnapshotInfo snapshotInfo1 = internalCluster().client() + .admin() + .cluster() + .prepareCreateSnapshot(shallowSnapshotRepoName, snapshot1) + .setIndices(INDEX_NAME) + .setWaitForCompletion(true) + .get() + .getSnapshotInfo(); + + assertEquals(SnapshotState.SUCCESS, snapshotInfo1.state()); + assertTrue(snapshotInfo1.successfulShards() > 0); + assertEquals(0, snapshotInfo1.failedShards()); + + for (int i = 0; i < 10; i++) { + indexBulk(INDEX_NAME, 1); + } + + // close index + client().admin().indices().close(Requests.closeIndexRequest(INDEX_NAME)).actionGet(); + Thread.sleep(1000); + logger.info("Verify shallow snapshot created after close"); + final String snapshot2 = "snapshot2"; + + SnapshotInfo snapshotInfo2 = internalCluster().client() + .admin() + .cluster() + .prepareCreateSnapshot(shallowSnapshotRepoName, snapshot2) + .setIndices(INDEX_NAME) + .setWaitForCompletion(true) + .get() + .getSnapshotInfo(); + + assertEquals(SnapshotState.SUCCESS, snapshotInfo2.state()); + assertTrue(snapshotInfo2.successfulShards() > 0); + assertEquals(0, snapshotInfo2.failedShards()); + } } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index eb3999718ca5b..e747ad38dba88 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -1624,6 +1624,26 @@ public org.apache.lucene.util.Version minimumCompatibleVersion() { return luceneVersion == null ? indexSettings.getIndexVersionCreated().luceneVersion : luceneVersion; } + /** + * reads the last metadata file from remote store and fetches files present in commit and their sizes. + * @return Tuple(Tuple(primaryTerm, commitGeneration), indexFilesToFileLengthMap) + * @throws IOException + */ + + public Tuple, Map> acquireLastRemoteUploadedIndexCommit() throws IOException { + if (!indexSettings.isAssignedOnRemoteNode()) { + throw new IllegalStateException("Index is not assigned on Remote Node"); + } + RemoteSegmentMetadata lastUploadedMetadata = getRemoteDirectory().readLatestMetadataFile(); + final Map indexFilesToFileLengthMap = lastUploadedMetadata.getMetadata() + .entrySet() + .stream() + .collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().getLength())); + long primaryTerm = lastUploadedMetadata.getPrimaryTerm(); + long commitGeneration = lastUploadedMetadata.getGeneration(); + return new Tuple<>(new Tuple<>(primaryTerm, commitGeneration), indexFilesToFileLengthMap); + } + /** * Creates a new {@link IndexCommit} snapshot from the currently running engine. All resources referenced by this * commit won't be freed until the commit / snapshot is closed. diff --git a/server/src/main/java/org/opensearch/repositories/FilterRepository.java b/server/src/main/java/org/opensearch/repositories/FilterRepository.java index 188d61e00f206..d5a844dbb9055 100644 --- a/server/src/main/java/org/opensearch/repositories/FilterRepository.java +++ b/server/src/main/java/org/opensearch/repositories/FilterRepository.java @@ -237,7 +237,9 @@ public void snapshotRemoteStoreIndexShard( String shardStateIdentifier, IndexShardSnapshotStatus snapshotStatus, long primaryTerm, + long commitGeneration, long startTime, + Map indexFilesToFileLengthMap, ActionListener listener ) { in.snapshotRemoteStoreIndexShard( @@ -248,7 +250,9 @@ public void snapshotRemoteStoreIndexShard( shardStateIdentifier, snapshotStatus, primaryTerm, + commitGeneration, startTime, + indexFilesToFileLengthMap, listener ); } diff --git a/server/src/main/java/org/opensearch/repositories/Repository.java b/server/src/main/java/org/opensearch/repositories/Repository.java index 138bc13140aea..5b91baff2c43c 100644 --- a/server/src/main/java/org/opensearch/repositories/Repository.java +++ b/server/src/main/java/org/opensearch/repositories/Repository.java @@ -406,11 +406,13 @@ default void snapshotRemoteStoreIndexShard( Store store, SnapshotId snapshotId, IndexId indexId, - IndexCommit snapshotIndexCommit, + @Nullable IndexCommit snapshotIndexCommit, @Nullable String shardStateIdentifier, IndexShardSnapshotStatus snapshotStatus, long primaryTerm, + long commitGeneration, long startTime, + @Nullable Map indexFilesToFileLengthMap, ActionListener listener ) { throw new UnsupportedOperationException(); diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java index 998ae5e4791b7..e6c6fabd848a4 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java @@ -3753,13 +3753,20 @@ public void snapshotRemoteStoreIndexShard( String shardStateIdentifier, IndexShardSnapshotStatus snapshotStatus, long primaryTerm, + long commitGeneration, long startTime, + Map indexFilesToFileLengthMap, ActionListener listener ) { if (isReadOnly()) { listener.onFailure(new RepositoryException(metadata.name(), "cannot snapshot shard on a readonly repository")); return; } + if (snapshotIndexCommit == null && indexFilesToFileLengthMap == null) { + listener.onFailure(new RepositoryException(metadata.name(), "both snapshot index commit and index files map cannot be null")); + return; + } + final ShardId shardId = store.shardId(); try { final String generation = snapshotStatus.generation(); @@ -3767,13 +3774,21 @@ public void snapshotRemoteStoreIndexShard( final BlobContainer shardContainer = shardContainer(indexId, shardId); long indexTotalFileSize = 0; - // local store is being used here to fetch the files metadata instead of remote store as currently - // remote store is mirroring the local store. - List fileNames = new ArrayList<>(snapshotIndexCommit.getFileNames()); - Store.MetadataSnapshot commitSnapshotMetadata = store.getMetadata(snapshotIndexCommit); - for (String fileName : fileNames) { - indexTotalFileSize += commitSnapshotMetadata.get(fileName).length(); + List fileNames; + + if (snapshotIndexCommit != null) { + // local store is being used here to fetch the files metadata instead of remote store as currently + // remote store is mirroring the local store. + fileNames = new ArrayList<>(snapshotIndexCommit.getFileNames()); + Store.MetadataSnapshot commitSnapshotMetadata = store.getMetadata(snapshotIndexCommit); + for (String fileName : fileNames) { + indexTotalFileSize += commitSnapshotMetadata.get(fileName).length(); + } + } else { + fileNames = new ArrayList<>(indexFilesToFileLengthMap.keySet()); + indexTotalFileSize = indexFilesToFileLengthMap.values().stream().mapToLong(Long::longValue).sum(); } + int indexTotalNumberOfFiles = fileNames.size(); snapshotStatus.moveToStarted( @@ -3784,7 +3799,7 @@ public void snapshotRemoteStoreIndexShard( indexTotalFileSize ); - final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.moveToFinalize(snapshotIndexCommit.getGeneration()); + final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.moveToFinalize(commitGeneration); // now create and write the commit point logger.trace("[{}] [{}] writing shard snapshot file", shardId, snapshotId); @@ -3795,7 +3810,7 @@ public void snapshotRemoteStoreIndexShard( snapshotId.getName(), lastSnapshotStatus.getIndexVersion(), primaryTerm, - snapshotIndexCommit.getGeneration(), + commitGeneration, lastSnapshotStatus.getStartTime(), threadPool.absoluteTimeInMillis() - lastSnapshotStatus.getStartTime(), indexTotalNumberOfFiles, diff --git a/server/src/main/java/org/opensearch/snapshots/SnapshotShardsService.java b/server/src/main/java/org/opensearch/snapshots/SnapshotShardsService.java index 8da36bbb8d4bd..ffcee94cd6f95 100644 --- a/server/src/main/java/org/opensearch/snapshots/SnapshotShardsService.java +++ b/server/src/main/java/org/opensearch/snapshots/SnapshotShardsService.java @@ -44,9 +44,11 @@ import org.opensearch.cluster.SnapshotsInProgress.ShardSnapshotStatus; import org.opensearch.cluster.SnapshotsInProgress.ShardState; import org.opensearch.cluster.SnapshotsInProgress.State; +import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Nullable; +import org.opensearch.common.collect.Tuple; import org.opensearch.common.concurrent.GatedCloseable; import org.opensearch.common.lifecycle.AbstractLifecycleComponent; import org.opensearch.common.settings.Settings; @@ -74,7 +76,6 @@ import org.opensearch.transport.TransportService; import java.io.IOException; -import java.nio.file.NoSuchFileException; import java.util.HashMap; import java.util.Iterator; import java.util.Map; @@ -371,7 +372,9 @@ private void snapshot( ActionListener listener ) { try { - final IndexShard indexShard = indicesService.indexServiceSafe(shardId.getIndex()).getShardOrNull(shardId.id()); + final IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); + final IndexShard indexShard = indexService.getShardOrNull(shardId.id()); + final boolean closedIndex = indexService.getMetadata().getState() == IndexMetadata.State.CLOSE; if (indexShard.routingEntry().primary() == false) { throw new IndexShardSnapshotFailedException(shardId, "snapshot should be performed only on primary"); } @@ -398,24 +401,39 @@ private void snapshot( if (remoteStoreIndexShallowCopy && indexShard.indexSettings().isRemoteStoreEnabled()) { long startTime = threadPool.relativeTimeInMillis(); long primaryTerm = indexShard.getOperationPrimaryTerm(); - // we flush first to make sure we get the latest writes snapshotted - wrappedSnapshot = indexShard.acquireLastIndexCommitAndRefresh(true); - IndexCommit snapshotIndexCommit = wrappedSnapshot.get(); - long commitGeneration = snapshotIndexCommit.getGeneration(); + long commitGeneration = 0L; + Map indexFilesToFileLengthMap = null; + IndexCommit snapshotIndexCommit = null; + try { + if (closedIndex) { + final Tuple, Map> tuple = indexShard.acquireLastRemoteUploadedIndexCommit(); + primaryTerm = tuple.v1().v1(); + commitGeneration = tuple.v1().v2(); + indexFilesToFileLengthMap = tuple.v2(); + } else { + wrappedSnapshot = indexShard.acquireLastIndexCommitAndRefresh(true); + snapshotIndexCommit = wrappedSnapshot.get(); + commitGeneration = snapshotIndexCommit.getGeneration(); + } indexShard.acquireLockOnCommitData(snapshot.getSnapshotId().getUUID(), primaryTerm, commitGeneration); - } catch (NoSuchFileException e) { - wrappedSnapshot.close(); - logger.warn( - "Exception while acquiring lock on primaryTerm = {} and generation = {}", - primaryTerm, - commitGeneration - ); - indexShard.flush(new FlushRequest(shardId.getIndexName()).force(true)); - wrappedSnapshot = indexShard.acquireLastIndexCommit(false); - snapshotIndexCommit = wrappedSnapshot.get(); - commitGeneration = snapshotIndexCommit.getGeneration(); - indexShard.acquireLockOnCommitData(snapshot.getSnapshotId().getUUID(), primaryTerm, commitGeneration); + } catch (IOException e) { + if (closedIndex) { + logger.warn("Exception while reading latest metadata file from remote store"); + throw e; + } else { + wrappedSnapshot.close(); + logger.warn( + "Exception while acquiring lock on primaryTerm = {} and generation = {}", + primaryTerm, + commitGeneration + ); + indexShard.flush(new FlushRequest(shardId.getIndexName()).force(true)); + wrappedSnapshot = indexShard.acquireLastIndexCommit(false); + snapshotIndexCommit = wrappedSnapshot.get(); + commitGeneration = snapshotIndexCommit.getGeneration(); + indexShard.acquireLockOnCommitData(snapshot.getSnapshotId().getUUID(), primaryTerm, commitGeneration); + } } try { repository.snapshotRemoteStoreIndexShard( @@ -423,11 +441,13 @@ private void snapshot( snapshot.getSnapshotId(), indexId, snapshotIndexCommit, - getShardStateId(indexShard, snapshotIndexCommit), + null, snapshotStatus, primaryTerm, + commitGeneration, startTime, - ActionListener.runBefore(listener, wrappedSnapshot::close) + indexFilesToFileLengthMap, + closedIndex ? listener : ActionListener.runBefore(listener, wrappedSnapshot::close) ); } catch (IndexShardSnapshotFailedException e) { logger.error( diff --git a/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java index 57a561bc8f2a3..4d85a3c491af8 100644 --- a/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java @@ -12,6 +12,9 @@ import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.util.Version; import org.opensearch.action.StepListener; +import org.opensearch.cluster.ClusterChangedEvent; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.SnapshotsInProgress; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.concurrent.GatedCloseable; import org.opensearch.common.settings.Settings; @@ -20,6 +23,7 @@ import org.opensearch.index.engine.Engine; import org.opensearch.index.engine.InternalEngine; import org.opensearch.index.engine.NRTReplicationEngineFactory; +import org.opensearch.index.snapshots.IndexShardSnapshotStatus; import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.indices.replication.CheckpointInfoResponse; @@ -32,6 +36,11 @@ import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.indices.replication.common.ReplicationFailedException; import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.repositories.RepositoriesService; +import org.opensearch.repositories.blobstore.BlobStoreRepository; +import org.opensearch.snapshots.Snapshot; +import org.opensearch.snapshots.SnapshotId; +import org.opensearch.snapshots.SnapshotShardsService; import org.opensearch.test.CorruptionUtils; import org.opensearch.test.junit.annotations.TestLogging; import org.hamcrest.MatcherAssert; @@ -41,6 +50,7 @@ import java.nio.channels.FileChannel; import java.nio.file.Path; import java.nio.file.StandardOpenOption; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; import java.util.List; @@ -55,6 +65,8 @@ import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -541,6 +553,81 @@ public void onReplicationFailure( } } + public void testShallowCopySnapshotForClosedIndexSuccessful() throws Exception { + try (ReplicationGroup shards = createGroup(0, settings)) { + final IndexShard primaryShard = shards.getPrimary(); + shards.startAll(); + shards.indexDocs(10); + shards.refresh("test"); + shards.flush(); + shards.assertAllEqual(10); + + RepositoriesService repositoriesService = createRepositoriesService(); + BlobStoreRepository repository = (BlobStoreRepository) repositoriesService.repository("random"); + + doAnswer(invocation -> { + IndexShardSnapshotStatus snapshotStatus = invocation.getArgument(5); + long commitGeneration = invocation.getArgument(7); + long startTime = invocation.getArgument(8); + final Map indexFilesToFileLengthMap = invocation.getArgument(9); + ActionListener listener = invocation.getArgument(10); + if (indexFilesToFileLengthMap != null) { + List fileNames = new ArrayList<>(indexFilesToFileLengthMap.keySet()); + long indexTotalFileSize = indexFilesToFileLengthMap.values().stream().mapToLong(Long::longValue).sum(); + int indexTotalNumberOfFiles = fileNames.size(); + snapshotStatus.moveToStarted(startTime, 0, indexTotalNumberOfFiles, 0, indexTotalFileSize); + // Not performing actual snapshot, just modifying the state + snapshotStatus.moveToFinalize(commitGeneration); + snapshotStatus.moveToDone(System.currentTimeMillis(), snapshotStatus.generation()); + listener.onResponse(snapshotStatus.generation()); + return null; + } + listener.onResponse(snapshotStatus.generation()); + return null; + }).when(repository) + .snapshotRemoteStoreIndexShard(any(), any(), any(), any(), any(), any(), anyLong(), anyLong(), anyLong(), any(), any()); + + final SnapshotShardsService shardsService = getSnapshotShardsService( + primaryShard, + shards.getIndexMetadata(), + true, + repositoriesService + ); + final Snapshot snapshot1 = new Snapshot( + randomAlphaOfLength(10), + new SnapshotId(randomAlphaOfLength(5), randomAlphaOfLength(5)) + ); + + // Initialize the shallow copy snapshot + final ClusterState initState = addSnapshotIndex( + clusterService.state(), + snapshot1, + primaryShard, + SnapshotsInProgress.State.INIT, + true + ); + shardsService.clusterChanged(new ClusterChangedEvent("test", initState, clusterService.state())); + + // start the snapshot + shardsService.clusterChanged( + new ClusterChangedEvent( + "test", + addSnapshotIndex(clusterService.state(), snapshot1, primaryShard, SnapshotsInProgress.State.STARTED, true), + initState + ) + ); + + // Check the snapshot got completed successfully + assertBusy(() -> { + final IndexShardSnapshotStatus.Copy copy = shardsService.currentSnapshotShards(snapshot1) + .get(primaryShard.shardId) + .asCopy(); + final IndexShardSnapshotStatus.Stage stage = copy.getStage(); + assertEquals(IndexShardSnapshotStatus.Stage.DONE, stage); + }); + } + } + private RemoteStoreReplicationSource getRemoteStoreReplicationSource(IndexShard shard, Runnable postGetFilesRunnable) { return new RemoteStoreReplicationSource(shard) { @Override diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java index 2311fc582616f..f4f94baabd7b0 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java @@ -68,6 +68,7 @@ import org.opensearch.indices.replication.common.ReplicationState; import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.repositories.IndexId; +import org.opensearch.repositories.RepositoriesService; import org.opensearch.snapshots.Snapshot; import org.opensearch.snapshots.SnapshotId; import org.opensearch.snapshots.SnapshotInfoTests; @@ -892,10 +893,21 @@ public void testSnapshotWhileFailoverIncomplete() throws Exception { replicateSegments(primaryShard, shards.getReplicas()); shards.assertAllEqual(10); - final SnapshotShardsService shardsService = getSnapshotShardsService(replicaShard); + final SnapshotShardsService shardsService = getSnapshotShardsService( + replicaShard, + shards.getIndexMetadata(), + false, + createRepositoriesService() + ); final Snapshot snapshot = new Snapshot(randomAlphaOfLength(10), new SnapshotId(randomAlphaOfLength(5), randomAlphaOfLength(5))); - final ClusterState initState = addSnapshotIndex(clusterService.state(), snapshot, replicaShard, SnapshotsInProgress.State.INIT); + final ClusterState initState = addSnapshotIndex( + clusterService.state(), + snapshot, + replicaShard, + SnapshotsInProgress.State.INIT, + false + ); shardsService.clusterChanged(new ClusterChangedEvent("test", initState, clusterService.state())); CountDownLatch latch = new CountDownLatch(1); @@ -907,7 +919,7 @@ public void testSnapshotWhileFailoverIncomplete() throws Exception { shardsService.clusterChanged( new ClusterChangedEvent( "test", - addSnapshotIndex(clusterService.state(), snapshot, replicaShard, SnapshotsInProgress.State.STARTED), + addSnapshotIndex(clusterService.state(), snapshot, replicaShard, SnapshotsInProgress.State.STARTED, false), initState ) ); @@ -956,21 +968,30 @@ public void testComputeReplicationCheckpointNullInfosReturnsEmptyCheckpoint() th } } - private SnapshotShardsService getSnapshotShardsService(IndexShard replicaShard) { + protected SnapshotShardsService getSnapshotShardsService( + IndexShard indexShard, + IndexMetadata indexMetadata, + boolean closedIdx, + RepositoriesService repositoriesService + ) { final TransportService transportService = mock(TransportService.class); when(transportService.getThreadPool()).thenReturn(threadPool); final IndicesService indicesService = mock(IndicesService.class); final IndexService indexService = mock(IndexService.class); when(indicesService.indexServiceSafe(any())).thenReturn(indexService); - when(indexService.getShardOrNull(anyInt())).thenReturn(replicaShard); - return new SnapshotShardsService(settings, clusterService, createRepositoriesService(), transportService, indicesService); + when(indexService.getShardOrNull(anyInt())).thenReturn(indexShard); + when(indexService.getMetadata()).thenReturn( + new IndexMetadata.Builder(indexMetadata).state(closedIdx ? IndexMetadata.State.CLOSE : IndexMetadata.State.OPEN).build() + ); + return new SnapshotShardsService(settings, clusterService, repositoriesService, transportService, indicesService); } - private ClusterState addSnapshotIndex( + protected ClusterState addSnapshotIndex( ClusterState state, Snapshot snapshot, IndexShard shard, - SnapshotsInProgress.State snapshotState + SnapshotsInProgress.State snapshotState, + boolean shallowCopySnapshot ) { final Map shardsBuilder = new HashMap<>(); ShardRouting shardRouting = shard.shardRouting; @@ -991,7 +1012,7 @@ private ClusterState addSnapshotIndex( null, SnapshotInfoTests.randomUserMetadata(), VersionUtils.randomVersion(random()), - false + shallowCopySnapshot ); return ClusterState.builder(state) .putCustom(SnapshotsInProgress.TYPE, SnapshotsInProgress.of(Collections.singletonList(entry))) diff --git a/server/src/test/java/org/opensearch/repositories/RepositoriesServiceTests.java b/server/src/test/java/org/opensearch/repositories/RepositoriesServiceTests.java index 4cd822c7d583b..1ec6d320762f2 100644 --- a/server/src/test/java/org/opensearch/repositories/RepositoriesServiceTests.java +++ b/server/src/test/java/org/opensearch/repositories/RepositoriesServiceTests.java @@ -774,7 +774,9 @@ public void snapshotRemoteStoreIndexShard( String shardStateIdentifier, IndexShardSnapshotStatus snapshotStatus, long primaryTerm, + long commitGeneration, long startTime, + Map indexFilesToFileLengthMap, ActionListener listener ) { diff --git a/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java b/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java index a5dc13c334513..062ebd2051f6e 100644 --- a/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java @@ -289,6 +289,10 @@ protected EngineConfigFactory getEngineConfigFactory(IndexSettings indexSettings return new EngineConfigFactory(indexSettings); } + public IndexMetadata getIndexMetadata() { + return indexMetadata; + } + public int indexDocs(final int numOfDoc) throws Exception { for (int doc = 0; doc < numOfDoc; doc++) { final IndexRequest indexRequest = new IndexRequest(index.getName()).id(Integer.toString(docId.incrementAndGet()))