diff --git a/server/src/main/java/org/opensearch/common/blobstore/BlobPath.java b/server/src/main/java/org/opensearch/common/blobstore/BlobPath.java index c54536e9c46e2..9b4e83c5103af 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/BlobPath.java +++ b/server/src/main/java/org/opensearch/common/blobstore/BlobPath.java @@ -107,4 +107,8 @@ public String toString() { } return sb.toString(); } + + public int size() { + return paths.size(); + } } diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index 41bc73bdf6363..c0d4a626ff83b 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -40,10 +40,12 @@ import java.util.ArrayList; import java.util.Base64; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.function.Function; import java.util.function.LongSupplier; import java.util.function.Supplier; @@ -62,6 +64,8 @@ public class RemoteClusterStateService implements Closeable { public static final String METADATA_MANIFEST_NAME_FORMAT = "%s"; + public static final int RETAINED_MANIFESTS = 3; + public static final ChecksumBlobStoreFormat INDEX_METADATA_FORMAT = new ChecksumBlobStoreFormat<>( "index-metadata", METADATA_NAME_FORMAT, @@ -241,6 +245,7 @@ public ClusterMetadataManifest writeIncrementalMetadata( allUploadedIndexMetadata.values().stream().collect(Collectors.toList()), false ); + deleteClusterMetadataMarker(clusterState.getClusterName().value(), clusterState.metadata().clusterUUID(), RETAINED_MANIFESTS); final long durationMillis = TimeValue.nsecToMSec(relativeTimeNanosSupplier.getAsLong() - startTimeNanos); if (durationMillis >= slowWriteLoggingThreshold.getMillis()) { logger.warn( @@ -343,7 +348,7 @@ private BlobContainer indexMetadataContainer(String clusterName, String clusterU private BlobContainer manifestContainer(String clusterName, String clusterUUID) { // 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/manifest - return blobStoreRepository.blobStore().blobContainer(getCusterMetadataBasePath(clusterName, clusterUUID).add("manifest")); + return blobStoreRepository.blobStore().blobContainer(getManifestFolderPath(clusterName, clusterUUID)); } private BlobPath getCusterMetadataBasePath(String clusterName, String clusterUUID) { @@ -359,19 +364,22 @@ private void setSlowWriteLoggingThreshold(TimeValue slowWriteLoggingThreshold) { private static String getManifestFileName(long term, long version) { // 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/manifest/manifest_2147483642_2147483637_456536447 - return String.join( - DELIMITER, - "manifest", - RemoteStoreUtils.invertLong(term), - RemoteStoreUtils.invertLong(version), - RemoteStoreUtils.invertLong(System.currentTimeMillis()) - ); + return String.join(DELIMITER, getManifestFileNamePrefix(term, version), RemoteStoreUtils.invertLong(System.currentTimeMillis())); + } + + private static String getManifestFileNamePrefix(long term, long version) { + // 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/manifest/manifest_2147483642_2147483637 + return String.join(DELIMITER, "manifest", RemoteStoreUtils.invertLong(term), RemoteStoreUtils.invertLong(version)); } private static String indexMetadataFileName(IndexMetadata indexMetadata) { return String.join(DELIMITER, "metadata", String.valueOf(indexMetadata.getVersion()), String.valueOf(System.currentTimeMillis())); } + private BlobPath getManifestFolderPath(String clusterName, String clusterUUID) { + return getCusterMetadataBasePath(clusterName, clusterUUID).add("manifest"); + } + /** * Fetch latest index metadata from remote cluster state * @param clusterUUID uuid of cluster state to refer to in remote @@ -499,4 +507,92 @@ public void onFailure(Exception e) { } ); } + + /** + * Deletes older than last {@code versionsToRetain} manifests. Also cleans up unreferenced IndexMetadata associated with older manifests + * @param clusterName name of the cluster + * @param clusterUUID uuid of cluster state to refer to in remote + * @param manifestsToRetain no of latest manifest files to keep in remote + */ + public void deleteClusterMetadataMarker(String clusterName, String clusterUUID, int manifestsToRetain) { + BlobStoreTransferService transferService = new BlobStoreTransferService(blobStoreRepository.blobStore(), threadpool); + + synchronized (this) { + transferService.listAllInSortedOrderAsync( + ThreadPool.Names.REMOTE_PURGE, + getManifestFolderPath(clusterName, clusterUUID), + "manifest", + Integer.MAX_VALUE, + new ActionListener<>() { + int checkedManifestCount = 1; + + @Override + public void onResponse(List blobMetadata) { + Set filesToKeep = new HashSet<>(); + BlobPath staleBlobPath = new BlobPath(); + blobMetadata.forEach(manifestBlobMetadata -> { + ClusterMetadataManifest clusterMetadataManifest = fetchRemoteClusterMetadataManifest( + clusterName, + clusterUUID, + manifestBlobMetadata.name() + ); + if (checkedManifestCount <= manifestsToRetain) { + clusterMetadataManifest.getIndices() + .forEach(uploadedIndexMetadata -> { filesToKeep.add(uploadedIndexMetadata.getUploadedFilename()); }); + } else { + staleBlobPath.add( + getManifestFolderPath(clusterName, clusterUUID).add( + getManifestFileNamePrefix( + clusterMetadataManifest.getClusterTerm(), + clusterMetadataManifest.getStateVersion() + ) + ).toString() + ); + clusterMetadataManifest.getIndices().forEach(uploadedIndexMetadata -> { + if (filesToKeep.contains(uploadedIndexMetadata.getUploadedFilename()) == false) { + staleBlobPath.add(uploadedIndexMetadata.getUploadedFilename()); + } + }); + } + logger.trace(String.format(Locale.ROOT, "Deleting stale files from remote - %s", staleBlobPath)); + checkedManifestCount += 1; + }); + + if (staleBlobPath.toArray().length == 0) { + return; + } + + transferService.deleteAsync(ThreadPool.Names.REMOTE_PURGE, staleBlobPath, new ActionListener<>() { + @Override + public void onResponse(Void unused) { + logger.info( + String.format(Locale.ROOT, "Deleted [%s] stale Remote Cluster Metadata files", staleBlobPath.size()) + ); + } + + @Override + public void onFailure(Exception e) { + logger.error( + new ParameterizedMessage( + "Exception occurred while deleting stale Remote Cluster Metadata files - {}", + staleBlobPath + ) + ); + } + }); + } + + @Override + public void onFailure(Exception e) { + logger.error( + new ParameterizedMessage( + "Exception occurred while deleting Remote Cluster Metadata for clusterUUIDs {}", + clusterUUID + ) + ); + } + } + ); + } + } } diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java index 8991b94731247..4d108cc5ef9dd 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java @@ -33,6 +33,7 @@ import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.VersionUtils; +import org.opensearch.threadpool.TestThreadPool; import org.junit.Assert; import org.junit.Before; @@ -48,7 +49,6 @@ import java.util.function.Supplier; import org.mockito.ArgumentMatchers; -import org.opensearch.threadpool.TestThreadPool; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue;