diff --git a/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateServiceIT.java b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateServiceIT.java index a6c725ed47f2c..2fa8226169777 100644 --- a/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateServiceIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateServiceIT.java @@ -9,9 +9,15 @@ package org.opensearch.gateway.remote; import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.settings.Settings; import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase; +import org.opensearch.repositories.RepositoriesService; +import org.opensearch.repositories.blobstore.BlobStoreRepository; +import org.junit.Before; +import java.nio.charset.StandardCharsets; +import java.util.Base64; import java.util.Map; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; @@ -22,6 +28,11 @@ public class RemoteClusterStateServiceIT extends RemoteStoreBaseIntegTestCase { private static String INDEX_NAME = "test-index"; + @Before + public void setup() { + setupRepo(); + } + @Override protected Settings nodeSettings(int nodeOrdinal) { return Settings.builder() @@ -55,8 +66,7 @@ public void testFullClusterRestoreStaleDelete() throws Exception { int dataNodeCount = shardCount * (replicaCount + 1); int clusterManagerNodeCount = 1; - // Step - 1 index some data to generate files in remote directory - Map indexStats = initialTestSetup(shardCount, replicaCount, dataNodeCount, 1); + initialTestSetup(shardCount, replicaCount, dataNodeCount, clusterManagerNodeCount); setReplicaCount(0); setReplicaCount(1); setReplicaCount(0); @@ -70,21 +80,34 @@ public void testFullClusterRestoreStaleDelete() throws Exception { RemoteClusterStateService remoteClusterStateService = internalCluster().getClusterManagerNodeInstance( RemoteClusterStateService.class ); + + RepositoriesService repositoriesService = internalCluster().getClusterManagerNodeInstance(RepositoriesService.class); + + BlobStoreRepository repository = (BlobStoreRepository) repositoriesService.repository(REPOSITORY_NAME); + BlobPath baseMetadataPath = repository.basePath() + .add( + Base64.getUrlEncoder() + .withoutPadding() + .encodeToString(getClusterState().getClusterName().value().getBytes(StandardCharsets.UTF_8)) + ) + .add("cluster-state") + .add(getClusterState().metadata().clusterUUID()); + + assertEquals(repository.blobStore().blobContainer(baseMetadataPath.add("manifest")).listBlobsByPrefix("manifest").size(), 4); + Map indexMetadataMap = remoteClusterStateService.getLatestIndexMetadata( cluster().getClusterName(), getClusterState().metadata().clusterUUID() ); assertEquals(indexMetadataMap.values().stream().findFirst().get().getNumberOfReplicas(), 0); + assertEquals(indexMetadataMap.values().stream().findFirst().get().getNumberOfShards(), shardCount); } private void setReplicaCount(int replicaCount) { - // Step 3 - Reduce shard limits to hit shard limit with less no of shards client().admin() .indices() .prepareUpdateSettings(INDEX_NAME) .setSettings(Settings.builder().put(SETTING_NUMBER_OF_REPLICAS, replicaCount)) .get(); } - - } diff --git a/server/src/main/java/org/opensearch/common/blobstore/BlobPath.java b/server/src/main/java/org/opensearch/common/blobstore/BlobPath.java index 9b4e83c5103af..c54536e9c46e2 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/BlobPath.java +++ b/server/src/main/java/org/opensearch/common/blobstore/BlobPath.java @@ -107,8 +107,4 @@ public String toString() { } return sb.toString(); } - - public int size() { - return paths.size(); - } } diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index 563adb13c1afa..697bb2a264ce7 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -387,7 +387,9 @@ private BlobPath getManifestFolderPath(String clusterName, String clusterUUID) { * @param clusterName name of the cluster * @return {@code Map} latest IndexUUID to IndexMetadata map */ - public Map getLatestIndexMetadata(String clusterName, String clusterUUID) throws IOException { + public Map getLatestIndexMetadata(String clusterName, String clusterUUID) { + ensureRepositorySet(); + Map remoteIndexMetadata = new HashMap<>(); ClusterMetadataManifest clusterMetadataManifest = getLatestClusterMetadataManifest(clusterName, clusterUUID); assert Objects.equals(clusterUUID, clusterMetadataManifest.getClusterUUID()) @@ -484,29 +486,34 @@ private ClusterMetadataManifest fetchRemoteClusterMetadataManifest(String cluste * @param clusterName name of the cluster * @param clusterUUIDs clusterUUID's for which remote cluster metadata needs to be purged. */ - public void deleteMetadataAsync(String clusterName, List clusterUUIDs) { - (new BlobStoreTransferService(blobStoreRepository.blobStore(), threadpool)).deleteAsync( - ThreadPool.Names.REMOTE_PURGE, - clusterUUIDs.stream() - .map(clusterUUID -> getCusterMetadataBasePath(clusterName, clusterUUID).toString()) - .collect(Collectors.toList()), - new ActionListener<>() { - @Override - public void onResponse(Void unused) { - logger.info("Deleted Remote Cluster Metadata for clusterUUIDs {}", clusterUUIDs); - } + private void deleteMetadataAsync(String clusterName, List clusterUUIDs) { + // TODO: add tests and make public + ensureRepositorySet(); - @Override - public void onFailure(Exception e) { - logger.error( - new ParameterizedMessage( - "Exception occurred while deleting Remote Cluster Metadata for clusterUUIDs {}", - clusterUUIDs - ) - ); + BlobStoreTransferService blobStoreTransferService = new BlobStoreTransferService(blobStoreRepository.blobStore(), threadpool); + clusterUUIDs.forEach(clusterUUID -> { + blobStoreTransferService.deleteAsync( + ThreadPool.Names.REMOTE_PURGE, + getCusterMetadataBasePath(clusterName, clusterUUID), + new ActionListener<>() { + @Override + public void onResponse(Void unused) { + logger.info("Deleted Remote Cluster Metadata for clusterUUIDs {}", clusterUUIDs); + } + + @Override + public void onFailure(Exception e) { + logger.error( + new ParameterizedMessage( + "Exception occurred while deleting Remote Cluster Metadata for clusterUUIDs {}", + clusterUUIDs + ), + e + ); + } } - } - ); + ); + }); } /** @@ -518,85 +525,86 @@ public void onFailure(Exception e) { public void deleteClusterMetadataMarker(String clusterName, String clusterUUID, int manifestsToRetain) { BlobStoreTransferService transferService = new BlobStoreTransferService(blobStoreRepository.blobStore(), threadpool); - // TODO: do we need a synchronized block to avoid concurrent executions - transferService.listAllInSortedOrderAsync( - ThreadPool.Names.REMOTE_PURGE, - getManifestFolderPath(clusterName, clusterUUID), - "manifest", - Integer.MAX_VALUE, - new ActionListener<>() { - int evaluatedManifestCount = 1; - - @Override - public void onResponse(List blobMetadata) { - Set filesToKeep = new HashSet<>(); - List stalePaths = new ArrayList<>(); - blobMetadata.forEach(manifestBlobMetadata -> { - ClusterMetadataManifest clusterMetadataManifest = fetchRemoteClusterMetadataManifest( - clusterName, - clusterUUID, - manifestBlobMetadata.name() - ); - if (evaluatedManifestCount <= manifestsToRetain) { - clusterMetadataManifest.getIndices() - .forEach(uploadedIndexMetadata -> filesToKeep.add(uploadedIndexMetadata.getUploadedFilename())); - } else { - stalePaths.add(new BlobPath().add("manifest").buildAsString() + manifestBlobMetadata.name()); - clusterMetadataManifest.getIndices().forEach(uploadedIndexMetadata -> { - if (filesToKeep.contains(uploadedIndexMetadata.getUploadedFilename()) == false) { - stalePaths.add( - new BlobPath().add("index").add(uploadedIndexMetadata.getIndexUUID()).buildAsString() - + uploadedIndexMetadata.getUploadedFilename() - + ".dat" - ); - } - }); - } - evaluatedManifestCount += 1; - }); + synchronized (this) { + transferService.listAllInSortedOrderAsync( + ThreadPool.Names.REMOTE_PURGE, + getManifestFolderPath(clusterName, clusterUUID), + "manifest", + Integer.MAX_VALUE, + new ActionListener<>() { + int evaluatedManifestCount = 1; + + @Override + public void onResponse(List blobMetadata) { + Set filesToKeep = new HashSet<>(); + List stalePaths = new ArrayList<>(); + blobMetadata.forEach(manifestBlobMetadata -> { + ClusterMetadataManifest clusterMetadataManifest = fetchRemoteClusterMetadataManifest( + clusterName, + clusterUUID, + manifestBlobMetadata.name() + ); + if (evaluatedManifestCount <= manifestsToRetain) { + clusterMetadataManifest.getIndices() + .forEach(uploadedIndexMetadata -> filesToKeep.add(uploadedIndexMetadata.getUploadedFilename())); + } else { + stalePaths.add(new BlobPath().add("manifest").buildAsString() + manifestBlobMetadata.name()); + clusterMetadataManifest.getIndices().forEach(uploadedIndexMetadata -> { + if (filesToKeep.contains(uploadedIndexMetadata.getUploadedFilename()) == false) { + stalePaths.add( + new BlobPath().add("index").add(uploadedIndexMetadata.getIndexUUID()).buildAsString() + + uploadedIndexMetadata.getUploadedFilename() + + ".dat" + ); + } + }); + } + evaluatedManifestCount += 1; + }); - logger.info(String.format(Locale.ROOT, "Deleting stale files from remote - %s", stalePaths)); + logger.info(String.format(Locale.ROOT, "Deleting stale files from remote - %s", stalePaths)); - if (stalePaths.toArray().length == 0) { - logger.trace("No stale Remote Cluster Metadata files found"); - return; - } + if (stalePaths.toArray().length == 0) { + logger.trace("No stale Remote Cluster Metadata files found"); + return; + } - transferService.deleteBlobsAsync( - ThreadPool.Names.REMOTE_PURGE, - getCusterMetadataBasePath(clusterName, clusterUUID), - stalePaths, - new ActionListener<>() { - @Override - public void onResponse(Void unused) { - logger.info( - String.format(Locale.ROOT, "Deleted [%s] stale Remote Cluster Metadata files", stalePaths.size()) - ); - } + transferService.deleteBlobsAsync( + ThreadPool.Names.REMOTE_PURGE, + getCusterMetadataBasePath(clusterName, clusterUUID), + stalePaths, + new ActionListener<>() { + @Override + public void onResponse(Void unused) { + logger.info( + String.format(Locale.ROOT, "Deleted [%s] stale Remote Cluster Metadata files", stalePaths.size()) + ); + } - @Override - public void onFailure(Exception e) { - logger.error( - new ParameterizedMessage( - "Exception occurred while deleting stale Remote Cluster Metadata files - {}", - stalePaths - ) - ); + @Override + public void onFailure(Exception e) { + logger.error( + new ParameterizedMessage( + "Exception occurred while deleting stale Remote Cluster Metadata files - {}", + stalePaths + ) + ); + } } - } - ); - } + ); + } - @Override - public void onFailure(Exception e) { - logger.error( - new ParameterizedMessage( - "Exception occurred while deleting Remote Cluster Metadata for clusterUUIDs {}", - clusterUUID - ) - ); + @Override + public void onFailure(Exception e) { + logger.error( + new ParameterizedMessage( + "Exception occurred while deleting Remote Cluster Metadata for clusterUUIDs {}", + clusterUUID + ) + ); + } } - } - ); + ); + } } }