From 08ba15d59e13c8d48b2336d684a35ceb1ef0cf33 Mon Sep 17 00:00:00 2001 From: "opensearch-trigger-bot[bot]" <98922864+opensearch-trigger-bot[bot]@users.noreply.github.com> Date: Mon, 4 Sep 2023 12:53:08 +0530 Subject: [PATCH] Add support download latest index metadata from remote (#9477) (#9713) --------- (cherry picked from commit 422dd3c835f7d1a8471dbc7acdb3ddfcbc09e6cc) Signed-off-by: bansvaru Signed-off-by: github-actions[bot] Co-authored-by: github-actions[bot] --- .../repositories/s3/S3BlobContainer.java | 14 +- .../common/blobstore/BlobContainer.java | 14 +- .../remote/RemoteClusterStateService.java | 102 ++++++- .../blobstore/BlobStoreRepository.java | 4 + .../RemoteClusterStateServiceTests.java | 258 +++++++++++++++++- 5 files changed, 376 insertions(+), 16 deletions(-) diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java index 183b5f8fe7ac1..4b3c1bfbd53ea 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java @@ -349,17 +349,13 @@ private static DeleteObjectsRequest bulkDelete(String bucket, List blobs } @Override - public void listBlobsByPrefixInSortedOrder( - String blobNamePrefix, - int limit, - BlobNameSortOrder blobNameSortOrder, - ActionListener> listener - ) { + public List listBlobsByPrefixInSortedOrder(String blobNamePrefix, int limit, BlobNameSortOrder blobNameSortOrder) + throws IOException { // As AWS S3 returns list of keys in Lexicographic order, we don't have to fetch all the keys in order to sort them // We fetch only keys as per the given limit to optimize the fetch. If provided sort order is not Lexicographic, // we fall-back to default implementation of fetching all the keys and sorting them. if (blobNameSortOrder != BlobNameSortOrder.LEXICOGRAPHIC) { - super.listBlobsByPrefixInSortedOrder(blobNamePrefix, limit, blobNameSortOrder, listener); + return super.listBlobsByPrefixInSortedOrder(blobNamePrefix, limit, blobNameSortOrder); } else { if (limit < 0) { throw new IllegalArgumentException("limit should not be a negative value"); @@ -370,9 +366,9 @@ public void listBlobsByPrefixInSortedOrder( .flatMap(listing -> listing.contents().stream()) .map(s3Object -> new PlainBlobMetadata(s3Object.key().substring(keyPath.length()), s3Object.size())) .collect(Collectors.toList()); - listener.onResponse(blobs.subList(0, Math.min(limit, blobs.size()))); + return blobs.subList(0, Math.min(limit, blobs.size())); } catch (final Exception e) { - listener.onFailure(new IOException("Exception when listing blobs by prefix [" + prefix + "]", e)); + throw new IOException("Exception when listing blobs by prefix [" + prefix + "]", e); } } } diff --git a/server/src/main/java/org/opensearch/common/blobstore/BlobContainer.java b/server/src/main/java/org/opensearch/common/blobstore/BlobContainer.java index cba6579190c26..3cdb1ce30b68d 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/BlobContainer.java +++ b/server/src/main/java/org/opensearch/common/blobstore/BlobContainer.java @@ -231,11 +231,19 @@ default void listBlobsByPrefixInSortedOrder( throw new IllegalArgumentException("limit should not be a negative value"); } try { - List blobNames = new ArrayList<>(listBlobsByPrefix(blobNamePrefix).values()); - blobNames.sort(blobNameSortOrder.comparator()); - listener.onResponse(blobNames.subList(0, Math.min(blobNames.size(), limit))); + listener.onResponse(listBlobsByPrefixInSortedOrder(blobNamePrefix, limit, blobNameSortOrder)); } catch (Exception e) { listener.onFailure(e); } } + + default List listBlobsByPrefixInSortedOrder(String blobNamePrefix, int limit, BlobNameSortOrder blobNameSortOrder) + throws IOException { + if (limit < 0) { + throw new IllegalArgumentException("limit should not be a negative value"); + } + List blobNames = new ArrayList<>(listBlobsByPrefix(blobNamePrefix).values()); + blobNames.sort(blobNameSortOrder.comparator()); + return blobNames.subList(0, Math.min(blobNames.size(), limit)); + } } diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index 491c04bab3adb..1008e889f510a 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -15,6 +15,7 @@ import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.Nullable; import org.opensearch.common.blobstore.BlobContainer; +import org.opensearch.common.blobstore.BlobMetadata; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Setting.Property; @@ -35,7 +36,9 @@ import java.util.Base64; import java.util.HashMap; import java.util.List; +import java.util.Locale; import java.util.Map; +import java.util.Objects; import java.util.function.Function; import java.util.function.LongSupplier; import java.util.function.Supplier; @@ -87,7 +90,7 @@ public class RemoteClusterStateService implements Closeable { ); private static final Logger logger = LogManager.getLogger(RemoteClusterStateService.class); - private static final String DELIMITER = "__"; + public static final String DELIMITER = "__"; private final String nodeId; private final Supplier repositoriesService; @@ -367,4 +370,101 @@ private static String indexMetadataFileName(IndexMetadata indexMetadata) { return String.join(DELIMITER, "metadata", String.valueOf(indexMetadata.getVersion()), String.valueOf(System.currentTimeMillis())); } + /** + * Fetch latest index metadata from remote cluster state + * @param clusterUUID uuid of cluster state to refer to in remote + * @param clusterName name of the cluster + * @return {@code Map} latest IndexUUID to IndexMetadata map + */ + public Map getLatestIndexMetadata(String clusterName, String clusterUUID) throws IOException { + Map remoteIndexMetadata = new HashMap<>(); + ClusterMetadataManifest clusterMetadataManifest = getLatestClusterMetadataManifest(clusterName, clusterUUID); + assert Objects.equals(clusterUUID, clusterMetadataManifest.getClusterUUID()) + : "Corrupt ClusterMetadataManifest found. Cluster UUID mismatch."; + for (UploadedIndexMetadata uploadedIndexMetadata : clusterMetadataManifest.getIndices()) { + IndexMetadata indexMetadata = getIndexMetadata(clusterName, clusterUUID, uploadedIndexMetadata); + remoteIndexMetadata.put(uploadedIndexMetadata.getIndexUUID(), indexMetadata); + } + return remoteIndexMetadata; + } + + /** + * Fetch index metadata from remote cluster state + * @param clusterUUID uuid of cluster state to refer to in remote + * @param clusterName name of the cluster + * @param uploadedIndexMetadata {@link UploadedIndexMetadata} contains details about remote location of index metadata + * @return {@link IndexMetadata} + */ + private IndexMetadata getIndexMetadata(String clusterName, String clusterUUID, UploadedIndexMetadata uploadedIndexMetadata) { + try { + return INDEX_METADATA_FORMAT.read( + indexMetadataContainer(clusterName, clusterUUID, uploadedIndexMetadata.getIndexUUID()), + uploadedIndexMetadata.getUploadedFilename(), + blobStoreRepository.getNamedXContentRegistry() + ); + } catch (IOException e) { + throw new IllegalStateException( + String.format(Locale.ROOT, "Error while downloading IndexMetadata - %s", uploadedIndexMetadata.getUploadedFilename()), + e + ); + } + } + + /** + * Fetch latest ClusterMetadataManifest from remote state store + * @param clusterUUID uuid of cluster state to refer to in remote + * @param clusterName name of the cluster + * @return ClusterMetadataManifest + */ + public ClusterMetadataManifest getLatestClusterMetadataManifest(String clusterName, String clusterUUID) { + String latestManifestFileName = getLatestManifestFileName(clusterName, clusterUUID); + return fetchRemoteClusterMetadataManifest(clusterName, clusterUUID, latestManifestFileName); + } + + /** + * Fetch latest ClusterMetadataManifest file from remote state store + * @param clusterUUID uuid of cluster state to refer to in remote + * @param clusterName name of the cluster + * @return latest ClusterMetadataManifest filename + */ + private String getLatestManifestFileName(String clusterName, String clusterUUID) throws IllegalStateException { + try { + /** + * {@link BlobContainer#listBlobsByPrefixInSortedOrder} will get the latest manifest file + * as the manifest file name generated via {@link RemoteClusterStateService#getManifestFileName} ensures + * when sorted in LEXICOGRAPHIC order the latest uploaded manifest file comes on top. + */ + List manifestFilesMetadata = manifestContainer(clusterName, clusterUUID).listBlobsByPrefixInSortedOrder( + "manifest" + DELIMITER, + 1, + BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC + ); + if (manifestFilesMetadata != null && !manifestFilesMetadata.isEmpty()) { + return manifestFilesMetadata.get(0).name(); + } + } catch (IOException e) { + throw new IllegalStateException("Error while fetching latest manifest file for remote cluster state", e); + } + + throw new IllegalStateException(String.format(Locale.ROOT, "Remote Cluster State not found - %s", clusterUUID)); + } + + /** + * Fetch ClusterMetadataManifest from remote state store + * @param clusterUUID uuid of cluster state to refer to in remote + * @param clusterName name of the cluster + * @return ClusterMetadataManifest + */ + private ClusterMetadataManifest fetchRemoteClusterMetadataManifest(String clusterName, String clusterUUID, String filename) + throws IllegalStateException { + try { + return RemoteClusterStateService.CLUSTER_METADATA_MANIFEST_FORMAT.read( + manifestContainer(clusterName, clusterUUID), + filename, + blobStoreRepository.getNamedXContentRegistry() + ); + } catch (IOException e) { + throw new IllegalStateException(String.format(Locale.ROOT, "Error while downloading cluster metadata - %s", filename), e); + } + } } diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java index d382c9e9b1fee..8116002baa34a 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java @@ -805,6 +805,10 @@ public RepositoryMetadata getMetadata() { return metadata; } + public NamedXContentRegistry getNamedXContentRegistry() { + return namedXContentRegistry; + } + public Compressor getCompressor() { return compressor; } diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java index 215673642cce5..d4e090b046760 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java @@ -16,24 +16,35 @@ import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.common.blobstore.BlobContainer; +import org.opensearch.common.blobstore.BlobMetadata; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.blobstore.BlobStore; +import org.opensearch.common.blobstore.support.PlainBlobMetadata; import org.opensearch.common.compress.DeflateCompressor; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; +import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.core.index.Index; +import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata; import org.opensearch.repositories.FilterRepository; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.RepositoryMissingException; import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.test.VersionUtils; import org.junit.Assert; import org.junit.Before; +import java.io.ByteArrayInputStream; +import java.io.FileNotFoundException; import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.function.Supplier; import org.mockito.ArgumentMatchers; @@ -52,6 +63,7 @@ public class RemoteClusterStateServiceTests extends OpenSearchTestCase { private Supplier repositoriesServiceSupplier; private RepositoriesService repositoriesService; private BlobStoreRepository blobStoreRepository; + private BlobStore blobStore; @Before public void setup() { @@ -63,7 +75,10 @@ public void setup() { .put(RemoteClusterStateService.REMOTE_CLUSTER_STATE_REPOSITORY_SETTING.getKey(), "remote_store_repository") .build(); blobStoreRepository = mock(BlobStoreRepository.class); + blobStore = mock(BlobStore.class); + when(blobStoreRepository.blobStore()).thenReturn(blobStore); when(repositoriesService.repository("remote_store_repository")).thenReturn(blobStoreRepository); + when(blobStoreRepository.getNamedXContentRegistry()).thenReturn(new NamedXContentRegistry(new ArrayList<>())); remoteClusterStateService = new RemoteClusterStateService( "test-node-id", repositoriesServiceSupplier, @@ -186,6 +201,201 @@ public void testWriteIncrementalMetadataSuccess() throws IOException { assertThat(manifest.getStateUUID(), is(expectedManifest.getStateUUID())); } + public void testReadLatestMetadataManifestFailedIOException() throws IOException { + final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build(); + + BlobContainer blobContainer = mockBlobStoreObjects(); + when( + blobContainer.listBlobsByPrefixInSortedOrder( + "manifest" + RemoteClusterStateService.DELIMITER, + 1, + BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC + ) + ).thenThrow(IOException.class); + + remoteClusterStateService.ensureRepositorySet(); + Exception e = assertThrows( + IllegalStateException.class, + () -> remoteClusterStateService.getLatestClusterMetadataManifest( + clusterState.getClusterName().value(), + clusterState.metadata().clusterUUID() + ) + ); + assertEquals(e.getMessage(), "Error while fetching latest manifest file for remote cluster state"); + } + + public void testReadLatestMetadataManifestFailedNoManifestFileInRemote() throws IOException { + final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build(); + + BlobContainer blobContainer = mockBlobStoreObjects(); + when( + blobContainer.listBlobsByPrefixInSortedOrder( + "manifest" + RemoteClusterStateService.DELIMITER, + 1, + BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC + ) + ).thenReturn(List.of()); + + remoteClusterStateService.ensureRepositorySet(); + Exception e = assertThrows( + IllegalStateException.class, + () -> remoteClusterStateService.getLatestClusterMetadataManifest( + clusterState.getClusterName().value(), + clusterState.metadata().clusterUUID() + ) + ); + assertEquals(e.getMessage(), "Remote Cluster State not found - " + clusterState.metadata().clusterUUID()); + } + + public void testReadLatestMetadataManifestFailedManifestFileRemoveAfterFetchInRemote() throws IOException { + final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build(); + + BlobContainer blobContainer = mockBlobStoreObjects(); + BlobMetadata blobMetadata = new PlainBlobMetadata("manifestFileName", 1); + when( + blobContainer.listBlobsByPrefixInSortedOrder( + "manifest" + RemoteClusterStateService.DELIMITER, + 1, + BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC + ) + ).thenReturn(Arrays.asList(blobMetadata)); + when(blobContainer.readBlob("manifestFileName")).thenThrow(FileNotFoundException.class); + + remoteClusterStateService.ensureRepositorySet(); + Exception e = assertThrows( + IllegalStateException.class, + () -> remoteClusterStateService.getLatestClusterMetadataManifest( + clusterState.getClusterName().value(), + clusterState.metadata().clusterUUID() + ) + ); + assertEquals(e.getMessage(), "Error while downloading cluster metadata - manifestFileName"); + } + + public void testReadLatestMetadataManifestSuccessButNoIndexMetadata() throws IOException { + final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build(); + final ClusterMetadataManifest expectedManifest = ClusterMetadataManifest.builder() + .indices(List.of()) + .clusterTerm(1L) + .stateVersion(1L) + .stateUUID("state-uuid") + .clusterUUID("cluster-uuid") + .nodeId("nodeA") + .opensearchVersion(VersionUtils.randomOpenSearchVersion(random())) + .build(); + + BlobContainer blobContainer = mockBlobStoreObjects(); + mockBlobContainer(blobContainer, expectedManifest, Map.of()); + + remoteClusterStateService.ensureRepositorySet(); + assertEquals( + remoteClusterStateService.getLatestIndexMetadata(clusterState.getClusterName().value(), clusterState.metadata().clusterUUID()) + .size(), + 0 + ); + } + + public void testReadLatestMetadataManifestSuccessButIndexMetadataFetchIOException() throws IOException { + final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build(); + final UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata("test-index", "index-uuid", "metadata-filename"); + final List indices = List.of(uploadedIndexMetadata); + final ClusterMetadataManifest expectedManifest = ClusterMetadataManifest.builder() + .indices(indices) + .clusterTerm(1L) + .stateVersion(1L) + .stateUUID("state-uuid") + .clusterUUID("cluster-uuid") + .nodeId("nodeA") + .opensearchVersion(VersionUtils.randomOpenSearchVersion(random())) + .build(); + + BlobContainer blobContainer = mockBlobStoreObjects(); + mockBlobContainer(blobContainer, expectedManifest, Map.of()); + when(blobContainer.readBlob(uploadedIndexMetadata.getUploadedFilename() + ".dat")).thenThrow(FileNotFoundException.class); + + remoteClusterStateService.ensureRepositorySet(); + Exception e = assertThrows( + IllegalStateException.class, + () -> remoteClusterStateService.getLatestIndexMetadata( + clusterState.getClusterName().value(), + clusterState.metadata().clusterUUID() + ) + ); + assertEquals(e.getMessage(), "Error while downloading IndexMetadata - " + uploadedIndexMetadata.getUploadedFilename()); + } + + public void testReadLatestMetadataManifestSuccess() throws IOException { + final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build(); + final UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata("test-index", "index-uuid", "metadata-filename"); + final List indices = List.of(uploadedIndexMetadata); + + final ClusterMetadataManifest expectedManifest = ClusterMetadataManifest.builder() + .indices(indices) + .clusterTerm(1L) + .stateVersion(1L) + .stateUUID("state-uuid") + .clusterUUID("cluster-uuid") + .nodeId("nodeA") + .opensearchVersion(VersionUtils.randomOpenSearchVersion(random())) + .build(); + + mockBlobContainer(mockBlobStoreObjects(), expectedManifest, new HashMap<>()); + remoteClusterStateService.ensureRepositorySet(); + final ClusterMetadataManifest manifest = remoteClusterStateService.getLatestClusterMetadataManifest( + clusterState.getClusterName().value(), + clusterState.metadata().clusterUUID() + ); + + assertThat(manifest.getIndices().size(), is(1)); + assertThat(manifest.getIndices().get(0).getIndexName(), is(uploadedIndexMetadata.getIndexName())); + assertThat(manifest.getIndices().get(0).getIndexUUID(), is(uploadedIndexMetadata.getIndexUUID())); + assertThat(manifest.getIndices().get(0).getUploadedFilename(), notNullValue()); + assertThat(manifest.getClusterTerm(), is(expectedManifest.getClusterTerm())); + assertThat(manifest.getStateVersion(), is(expectedManifest.getStateVersion())); + assertThat(manifest.getClusterUUID(), is(expectedManifest.getClusterUUID())); + assertThat(manifest.getStateUUID(), is(expectedManifest.getStateUUID())); + } + + public void testReadLatestIndexMetadataSuccess() throws IOException { + final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build(); + remoteClusterStateService.ensureRepositorySet(); + + final Index index = new Index("test-index", "index-uuid"); + String fileName = "metadata-" + index.getUUID(); + final UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata(index.getName(), index.getUUID(), fileName); + final Settings idxSettings = Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_INDEX_UUID, index.getUUID()) + .build(); + final IndexMetadata indexMetadata = new IndexMetadata.Builder(index.getName()).settings(idxSettings) + .numberOfShards(11) + .numberOfReplicas(10) + .build(); + + final List indices = List.of(uploadedIndexMetadata); + final ClusterMetadataManifest expectedManifest = ClusterMetadataManifest.builder() + .indices(indices) + .clusterTerm(1L) + .stateVersion(1L) + .stateUUID("state-uuid") + .clusterUUID("cluster-uuid") + .nodeId("nodeA") + .opensearchVersion(VersionUtils.randomOpenSearchVersion(random())) + .build(); + + mockBlobContainer(mockBlobStoreObjects(), expectedManifest, Map.of(index.getUUID(), indexMetadata)); + + Map indexMetadataMap = remoteClusterStateService.getLatestIndexMetadata( + clusterState.getClusterName().value(), + clusterState.metadata().clusterUUID() + ); + + assertEquals(indexMetadataMap.size(), 1); + assertEquals(indexMetadataMap.get(index.getUUID()).getIndex().getName(), index.getName()); + assertEquals(indexMetadataMap.get(index.getUUID()).getNumberOfShards(), indexMetadata.getNumberOfShards()); + assertEquals(indexMetadataMap.get(index.getUUID()).getNumberOfReplicas(), indexMetadata.getNumberOfReplicas()); + } + public void testMarkLastStateAsCommittedSuccess() throws IOException { final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build(); mockBlobStoreObjects(); @@ -202,6 +412,7 @@ public void testMarkLastStateAsCommittedSuccess() throws IOException { .stateVersion(1L) .stateUUID("state-uuid") .clusterUUID("cluster-uuid") + .nodeId("nodeA") .build(); assertThat(manifest.getIndices().size(), is(1)); @@ -214,9 +425,7 @@ public void testMarkLastStateAsCommittedSuccess() throws IOException { assertThat(manifest.getStateUUID(), is(expectedManifest.getStateUUID())); } - private void mockBlobStoreObjects() { - final BlobStore blobStore = mock(BlobStore.class); - when(blobStoreRepository.blobStore()).thenReturn(blobStore); + private BlobContainer mockBlobStoreObjects() { final BlobPath blobPath = mock(BlobPath.class); when((blobStoreRepository.basePath())).thenReturn(blobPath); when(blobPath.add(anyString())).thenReturn(blobPath); @@ -225,6 +434,49 @@ private void mockBlobStoreObjects() { when(blobContainer.path()).thenReturn(blobPath); when(blobStore.blobContainer(ArgumentMatchers.any())).thenReturn(blobContainer); when(blobStoreRepository.getCompressor()).thenReturn(new DeflateCompressor()); + return blobContainer; + } + + private void mockBlobContainer( + BlobContainer blobContainer, + ClusterMetadataManifest clusterMetadataManifest, + Map indexMetadataMap + ) throws IOException { + BlobMetadata blobMetadata = new PlainBlobMetadata("manifestFileName", 1); + when( + blobContainer.listBlobsByPrefixInSortedOrder( + "manifest" + RemoteClusterStateService.DELIMITER, + 1, + BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC + ) + ).thenReturn(Arrays.asList(blobMetadata)); + + BytesReference bytes = RemoteClusterStateService.CLUSTER_METADATA_MANIFEST_FORMAT.serialize( + clusterMetadataManifest, + "manifestFileName", + blobStoreRepository.getCompressor() + ); + when(blobContainer.readBlob("manifestFileName")).thenReturn(new ByteArrayInputStream(bytes.streamInput().readAllBytes())); + + clusterMetadataManifest.getIndices().forEach(uploadedIndexMetadata -> { + try { + IndexMetadata indexMetadata = indexMetadataMap.get(uploadedIndexMetadata.getIndexUUID()); + if (indexMetadata == null) { + return; + } + String fileName = uploadedIndexMetadata.getUploadedFilename(); + BytesReference bytesIndexMetadata = RemoteClusterStateService.INDEX_METADATA_FORMAT.serialize( + indexMetadata, + fileName, + blobStoreRepository.getCompressor() + ); + when(blobContainer.readBlob(fileName + ".dat")).thenReturn( + new ByteArrayInputStream(bytesIndexMetadata.streamInput().readAllBytes()) + ); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); } private static ClusterState.Builder generateClusterStateWithOneIndex() {