diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index 980c2f9cf3ce4..41bc73bdf6363 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -10,24 +10,29 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.Version; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.Nullable; import org.opensearch.common.blobstore.BlobContainer; import org.opensearch.common.blobstore.BlobMetadata; +import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Setting.Property; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.io.IOUtils; +import org.opensearch.core.action.ActionListener; import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata; import org.opensearch.index.remote.RemoteStoreUtils; +import org.opensearch.index.translog.transfer.BlobStoreTransferService; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.Repository; import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat; +import org.opensearch.threadpool.ThreadPool; import java.io.Closeable; import java.io.IOException; @@ -96,6 +101,7 @@ public class RemoteClusterStateService implements Closeable { private final Supplier repositoriesService; private final Settings settings; private final LongSupplier relativeTimeNanosSupplier; + private final ThreadPool threadpool; private BlobStoreRepository blobStoreRepository; private volatile TimeValue slowWriteLoggingThreshold; @@ -104,13 +110,15 @@ public RemoteClusterStateService( Supplier repositoriesService, Settings settings, ClusterSettings clusterSettings, - LongSupplier relativeTimeNanosSupplier + LongSupplier relativeTimeNanosSupplier, + ThreadPool threadPool ) { assert REMOTE_CLUSTER_STATE_ENABLED_SETTING.get(settings) == true : "Remote cluster state is not enabled"; this.nodeId = nodeId; this.repositoriesService = repositoriesService; this.settings = settings; this.relativeTimeNanosSupplier = relativeTimeNanosSupplier; + this.threadpool = threadPool; this.slowWriteLoggingThreshold = clusterSettings.get(SLOW_WRITE_LOGGING_THRESHOLD); clusterSettings.addSettingsUpdateConsumer(SLOW_WRITE_LOGGING_THRESHOLD, this::setSlowWriteLoggingThreshold); } @@ -330,26 +338,19 @@ private void writeMetadataManifest(String clusterName, String clusterUUID, Clust private BlobContainer indexMetadataContainer(String clusterName, String clusterUUID, String indexUUID) { // 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/index/ftqsCnn9TgOX return blobStoreRepository.blobStore() - .blobContainer( - blobStoreRepository.basePath() - .add(Base64.getUrlEncoder().withoutPadding().encodeToString(clusterName.getBytes(StandardCharsets.UTF_8))) - .add("cluster-state") - .add(clusterUUID) - .add("index") - .add(indexUUID) - ); + .blobContainer(getCusterMetadataBasePath(clusterName, clusterUUID).add("index").add(indexUUID)); } private BlobContainer manifestContainer(String clusterName, String clusterUUID) { // 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/manifest - return blobStoreRepository.blobStore() - .blobContainer( - blobStoreRepository.basePath() - .add(Base64.getUrlEncoder().withoutPadding().encodeToString(clusterName.getBytes(StandardCharsets.UTF_8))) - .add("cluster-state") - .add(clusterUUID) - .add("manifest") - ); + return blobStoreRepository.blobStore().blobContainer(getCusterMetadataBasePath(clusterName, clusterUUID).add("manifest")); + } + + private BlobPath getCusterMetadataBasePath(String clusterName, String clusterUUID) { + return blobStoreRepository.basePath() + .add(Base64.getUrlEncoder().withoutPadding().encodeToString(clusterName.getBytes(StandardCharsets.UTF_8))) + .add("cluster-state") + .add(clusterUUID); } private void setSlowWriteLoggingThreshold(TimeValue slowWriteLoggingThreshold) { @@ -468,4 +469,34 @@ private ClusterMetadataManifest fetchRemoteClusterMetadataManifest(String cluste throw new IllegalStateException(String.format(Locale.ROOT, "Error while downloading cluster metadata - %s", filename), e); } } + + /** + * Deletes entire cluster metadata content for given cluster UUIDs in remote + * @param clusterName name of the cluster + * @param clusterUUIDs clusterUUID's for which remote cluster metadata needs to be purged. + */ + public void deleteMetadataAsync(String clusterName, List clusterUUIDs) { + (new BlobStoreTransferService(blobStoreRepository.blobStore(), threadpool)).deleteAsync( + ThreadPool.Names.REMOTE_PURGE, + clusterUUIDs.stream() + .map(clusterUUID -> getCusterMetadataBasePath(clusterName, clusterUUID).toString()) + .collect(Collectors.toList()), + new ActionListener<>() { + @Override + public void onResponse(Void unused) { + logger.info("Deleted Remote Cluster Metadata for clusterUUIDs {}", clusterUUIDs); + } + + @Override + public void onFailure(Exception e) { + logger.error( + new ParameterizedMessage( + "Exception occurred while deleting Remote Cluster Metadata for clusterUUIDs {}", + clusterUUIDs + ) + ); + } + } + ); + } } diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java index d4e090b046760..8991b94731247 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java @@ -48,6 +48,7 @@ import java.util.function.Supplier; import org.mockito.ArgumentMatchers; +import org.opensearch.threadpool.TestThreadPool; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; @@ -84,6 +85,7 @@ public void setup() { repositoriesServiceSupplier, settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + new TestThreadPool(getClass().getName()), () -> 0L ); } @@ -103,6 +105,7 @@ public void testFailInitializationWhenRemoteStateDisabled() throws IOException { repositoriesServiceSupplier, settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + new TestThreadPool(getClass().getName()), () -> 0L ) );