Skip to content

Commit

Permalink
Add support to delete stale manifest files for provided cluster UUID
Browse files Browse the repository at this point in the history
Signed-off-by: bansvaru <[email protected]>
  • Loading branch information
linuxpi committed Sep 5, 2023
1 parent 6ff233e commit e0f68c8
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -107,4 +107,8 @@ public String toString() {
}
return sb.toString();
}

public int size() {
return paths.size();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,12 @@
import java.util.ArrayList;
import java.util.Base64;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Function;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
Expand All @@ -62,6 +64,8 @@ public class RemoteClusterStateService implements Closeable {

public static final String METADATA_MANIFEST_NAME_FORMAT = "%s";

public static final int RETAINED_MANIFESTS = 3;

public static final ChecksumBlobStoreFormat<IndexMetadata> INDEX_METADATA_FORMAT = new ChecksumBlobStoreFormat<>(
"index-metadata",
METADATA_NAME_FORMAT,
Expand Down Expand Up @@ -241,6 +245,7 @@ public ClusterMetadataManifest writeIncrementalMetadata(
allUploadedIndexMetadata.values().stream().collect(Collectors.toList()),
false
);
deleteClusterMetadataMarker(clusterState.getClusterName().value(), clusterState.metadata().clusterUUID(), RETAINED_MANIFESTS);
final long durationMillis = TimeValue.nsecToMSec(relativeTimeNanosSupplier.getAsLong() - startTimeNanos);
if (durationMillis >= slowWriteLoggingThreshold.getMillis()) {
logger.warn(
Expand Down Expand Up @@ -343,7 +348,7 @@ private BlobContainer indexMetadataContainer(String clusterName, String clusterU

private BlobContainer manifestContainer(String clusterName, String clusterUUID) {
// 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/manifest
return blobStoreRepository.blobStore().blobContainer(getCusterMetadataBasePath(clusterName, clusterUUID).add("manifest"));
return blobStoreRepository.blobStore().blobContainer(getManifestFolderPath(clusterName, clusterUUID));
}

private BlobPath getCusterMetadataBasePath(String clusterName, String clusterUUID) {
Expand All @@ -359,19 +364,22 @@ private void setSlowWriteLoggingThreshold(TimeValue slowWriteLoggingThreshold) {

private static String getManifestFileName(long term, long version) {
// 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/manifest/manifest_2147483642_2147483637_456536447
return String.join(
DELIMITER,
"manifest",
RemoteStoreUtils.invertLong(term),
RemoteStoreUtils.invertLong(version),
RemoteStoreUtils.invertLong(System.currentTimeMillis())
);
return String.join(DELIMITER, getManifestFileNamePrefix(term, version), RemoteStoreUtils.invertLong(System.currentTimeMillis()));
}

private static String getManifestFileNamePrefix(long term, long version) {
// 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/manifest/manifest_2147483642_2147483637
return String.join(DELIMITER, "manifest", RemoteStoreUtils.invertLong(term), RemoteStoreUtils.invertLong(version));
}

private static String indexMetadataFileName(IndexMetadata indexMetadata) {
return String.join(DELIMITER, "metadata", String.valueOf(indexMetadata.getVersion()), String.valueOf(System.currentTimeMillis()));
}

private BlobPath getManifestFolderPath(String clusterName, String clusterUUID) {
return getCusterMetadataBasePath(clusterName, clusterUUID).add("manifest");
}

/**
* Fetch latest index metadata from remote cluster state
* @param clusterUUID uuid of cluster state to refer to in remote
Expand Down Expand Up @@ -499,4 +507,92 @@ public void onFailure(Exception e) {
}
);
}

/**
* Deletes older than last {@code versionsToRetain} manifests. Also cleans up unreferenced IndexMetadata associated with older manifests
* @param clusterName name of the cluster
* @param clusterUUID uuid of cluster state to refer to in remote
* @param manifestsToRetain no of latest manifest files to keep in remote
*/
public void deleteClusterMetadataMarker(String clusterName, String clusterUUID, int manifestsToRetain) {
BlobStoreTransferService transferService = new BlobStoreTransferService(blobStoreRepository.blobStore(), threadpool);

synchronized (this) {
transferService.listAllInSortedOrderAsync(
ThreadPool.Names.REMOTE_PURGE,
getManifestFolderPath(clusterName, clusterUUID),
"manifest",
Integer.MAX_VALUE,
new ActionListener<>() {
int checkedManifestCount = 1;

@Override
public void onResponse(List<BlobMetadata> blobMetadata) {
Set<String> filesToKeep = new HashSet<>();
BlobPath staleBlobPath = new BlobPath();
blobMetadata.forEach(manifestBlobMetadata -> {
ClusterMetadataManifest clusterMetadataManifest = fetchRemoteClusterMetadataManifest(
clusterName,
clusterUUID,
manifestBlobMetadata.name()
);
if (checkedManifestCount <= manifestsToRetain) {
clusterMetadataManifest.getIndices()
.forEach(uploadedIndexMetadata -> { filesToKeep.add(uploadedIndexMetadata.getUploadedFilename()); });
} else {
staleBlobPath.add(
getManifestFolderPath(clusterName, clusterUUID).add(
getManifestFileNamePrefix(
clusterMetadataManifest.getClusterTerm(),
clusterMetadataManifest.getStateVersion()
)
).toString()
);
clusterMetadataManifest.getIndices().forEach(uploadedIndexMetadata -> {
if (filesToKeep.contains(uploadedIndexMetadata.getUploadedFilename()) == false) {
staleBlobPath.add(uploadedIndexMetadata.getUploadedFilename());
}
});
}
logger.trace(String.format(Locale.ROOT, "Deleting stale files from remote - %s", staleBlobPath));
checkedManifestCount += 1;
});

if (staleBlobPath.toArray().length == 0) {
return;
}

transferService.deleteAsync(ThreadPool.Names.REMOTE_PURGE, staleBlobPath, new ActionListener<>() {
@Override
public void onResponse(Void unused) {
logger.info(
String.format(Locale.ROOT, "Deleted [%s] stale Remote Cluster Metadata files", staleBlobPath.size())
);
}

@Override
public void onFailure(Exception e) {
logger.error(
new ParameterizedMessage(
"Exception occurred while deleting stale Remote Cluster Metadata files - {}",
staleBlobPath
)
);
}
});
}

@Override
public void onFailure(Exception e) {
logger.error(
new ParameterizedMessage(
"Exception occurred while deleting Remote Cluster Metadata for clusterUUIDs {}",
clusterUUID
)
);
}
}
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.test.VersionUtils;
import org.opensearch.threadpool.TestThreadPool;
import org.junit.Assert;
import org.junit.Before;

Expand All @@ -48,7 +49,6 @@
import java.util.function.Supplier;

import org.mockito.ArgumentMatchers;
import org.opensearch.threadpool.TestThreadPool;

import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
Expand Down

0 comments on commit e0f68c8

Please sign in to comment.