Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Remote Cluster State] Remove stale remote cluster state #9719

Merged
Prev Previous commit
Next Next commit
refactoring
Signed-off-by: bansvaru <[email protected]>
  • Loading branch information
linuxpi committed Sep 7, 2023
commit 80fb79807831f55833bcb9fd72b9fd930dff97a6
Original file line number Diff line number Diff line change
@@ -114,6 +114,7 @@ public class RemoteClusterStateService implements Closeable {
private final LongSupplier relativeTimeNanosSupplier;
private final ThreadPool threadpool;
private BlobStoreRepository blobStoreRepository;
private BlobStoreTransferService blobStoreTransferService;
private volatile TimeValue slowWriteLoggingThreshold;

public RemoteClusterStateService(
@@ -134,6 +135,13 @@ public RemoteClusterStateService(
clusterSettings.addSettingsUpdateConsumer(SLOW_WRITE_LOGGING_THRESHOLD, this::setSlowWriteLoggingThreshold);
}

private BlobStoreTransferService getBlobStoreTransferService() {
if (blobStoreTransferService == null) {
blobStoreTransferService = new BlobStoreTransferService(blobStoreRepository.blobStore(), threadpool);
}
return blobStoreTransferService;
}

/**
* This method uploads entire cluster state metadata to the configured blob store. For now only index metadata upload is supported. This method should be
* invoked by the elected cluster manager when the remote cluster state is enabled.
@@ -243,7 +251,7 @@ public ClusterMetadataManifest writeIncrementalMetadata(
previousManifest.getPreviousClusterUUID(),
false
);
deleteClusterMetadataMarker(clusterState.getClusterName().value(), clusterState.metadata().clusterUUID(), RETAINED_MANIFESTS);
deleteStaleClusterMetadata(clusterState.getClusterName().value(), clusterState.metadata().clusterUUID(), RETAINED_MANIFESTS);
final long durationMillis = TimeValue.nsecToMSec(relativeTimeNanosSupplier.getAsLong() - startTimeNanos);
if (durationMillis >= slowWriteLoggingThreshold.getMillis()) {
logger.warn(
@@ -459,10 +467,7 @@ private BlobContainer manifestContainer(String clusterName, String clusterUUID)
}

private BlobPath getCusterMetadataBasePath(String clusterName, String clusterUUID) {
return blobStoreRepository.basePath()
.add(encodeString(clusterName))
.add(CLUSTER_STATE_PATH_TOKEN)
.add(clusterUUID);
return blobStoreRepository.basePath().add(encodeString(clusterName)).add(CLUSTER_STATE_PATH_TOKEN).add(clusterUUID);
}

private BlobContainer clusterUUIDContainer(String clusterName) {
@@ -724,89 +729,103 @@ public IndexMetadataTransferException(String errorDesc, Throwable cause) {
* @param clusterUUID uuid of cluster state to refer to in remote
* @param manifestsToRetain no of latest manifest files to keep in remote
*/
public void deleteClusterMetadataMarker(String clusterName, String clusterUUID, int manifestsToRetain) {
BlobStoreTransferService transferService = new BlobStoreTransferService(blobStoreRepository.blobStore(), threadpool);

synchronized (this) {
transferService.listAllInSortedOrderAsync(
ThreadPool.Names.REMOTE_PURGE,
getManifestFolderPath(clusterName, clusterUUID),
MANIFEST_PATH_TOKEN,
Integer.MAX_VALUE,
new ActionListener<>() {
int evaluatedManifestCount = 1;

@Override
public void onResponse(List<BlobMetadata> blobMetadata) {
Set<String> filesToKeep = new HashSet<>();
List<String> stalePaths = new ArrayList<>();
blobMetadata.forEach(manifestBlobMetadata -> {
ClusterMetadataManifest clusterMetadataManifest = fetchRemoteClusterMetadataManifest(
clusterName,
clusterUUID,
manifestBlobMetadata.name()
);
if (evaluatedManifestCount <= manifestsToRetain) {
clusterMetadataManifest.getIndices()
.forEach(uploadedIndexMetadata -> filesToKeep.add(uploadedIndexMetadata.getUploadedFilename()));
} else {
stalePaths.add(new BlobPath().add(MANIFEST_PATH_TOKEN).buildAsString() + manifestBlobMetadata.name());
clusterMetadataManifest.getIndices().forEach(uploadedIndexMetadata -> {
if (filesToKeep.contains(uploadedIndexMetadata.getUploadedFilename()) == false) {
stalePaths.add(
new BlobPath().add(INDEX_PATH_TOKEN).add(uploadedIndexMetadata.getIndexUUID()).buildAsString()
+ uploadedIndexMetadata.getUploadedFilename()
+ ".dat"
);
}
});
public void deleteStaleClusterMetadata(String clusterName, String clusterUUID, int manifestsToRetain) {
threadpool.executor(ThreadPool.Names.REMOTE_PURGE).execute(() -> {
synchronized (this) {
getBlobStoreTransferService().listAllInSortedOrder(
getManifestFolderPath(clusterName, clusterUUID),
"manifest",
Integer.MAX_VALUE,
new ActionListener<>() {
@Override
public void onResponse(List<BlobMetadata> blobMetadata) {
if (blobMetadata.size() > manifestsToRetain) {
deleteClusterMetadata(
clusterName,
clusterUUID,
blobMetadata.subList(0, manifestsToRetain - 1),
blobMetadata.subList(manifestsToRetain, blobMetadata.size() - 1)
);
}
evaluatedManifestCount += 1;
});

logger.info(String.format(Locale.ROOT, "Deleting stale files from remote - %s", stalePaths));

if (stalePaths.toArray().length == 0) {
logger.trace("No stale Remote Cluster Metadata files found");
return;
}

transferService.deleteBlobsAsync(
ThreadPool.Names.REMOTE_PURGE,
getCusterMetadataBasePath(clusterName, clusterUUID),
stalePaths,
new ActionListener<>() {
@Override
public void onResponse(Void unused) {
logger.info(
String.format(Locale.ROOT, "Deleted [%s] stale Remote Cluster Metadata files", stalePaths.size())
);
}

@Override
public void onFailure(Exception e) {
logger.error(
new ParameterizedMessage(
"Exception occurred while deleting stale Remote Cluster Metadata files - {}",
stalePaths
)
);
}
}
);
@Override
public void onFailure(Exception e) {
logger.error(
new ParameterizedMessage(
"Exception occurred while deleting Remote Cluster Metadata for clusterUUIDs {}",
clusterUUID
)
);
}
}
);
}
});
}

@Override
public void onFailure(Exception e) {
logger.error(
new ParameterizedMessage(
"Exception occurred while deleting Remote Cluster Metadata for clusterUUIDs {}",
clusterUUID
)
);
}
}
private void deleteClusterMetadata(
String clusterName,
String clusterUUID,
List<BlobMetadata> activeManifestBlobMetadata,
List<BlobMetadata> staleManifestBlobMetadata
) {
Set<String> filesToKeep = new HashSet<>();
List<String> stalePaths = new ArrayList<>();
activeManifestBlobMetadata.forEach(manifestBlobMetadata -> {
ClusterMetadataManifest clusterMetadataManifest = fetchRemoteClusterMetadataManifest(
clusterName,
clusterUUID,
manifestBlobMetadata.name()
);
clusterMetadataManifest.getIndices()
.forEach(uploadedIndexMetadata -> filesToKeep.add(uploadedIndexMetadata.getUploadedFilename()));
});
staleManifestBlobMetadata.forEach(manifestBlobMetadata -> {
ClusterMetadataManifest clusterMetadataManifest = fetchRemoteClusterMetadataManifest(
clusterName,
clusterUUID,
manifestBlobMetadata.name()
);
stalePaths.add(new BlobPath().add("manifest").buildAsString() + manifestBlobMetadata.name());
clusterMetadataManifest.getIndices().forEach(uploadedIndexMetadata -> {
if (filesToKeep.contains(uploadedIndexMetadata.getUploadedFilename()) == false) {
stalePaths.add(
new BlobPath().add("index").add(uploadedIndexMetadata.getIndexUUID()).buildAsString()
+ uploadedIndexMetadata.getUploadedFilename()
+ ".dat"
);
}
});
});

if (stalePaths.toArray().length == 0) {
logger.trace("No stale Remote Cluster Metadata files found");
return;
}

deleteStalePaths(clusterName, clusterUUID, stalePaths);
}

private void deleteStalePaths(String clusterName, String clusterUUID, List<String> stalePaths) {
logger.debug(String.format(Locale.ROOT, "Deleting stale files from remote - %s", stalePaths));
getBlobStoreTransferService().deleteBlobsAsync(
ThreadPool.Names.REMOTE_PURGE,
getCusterMetadataBasePath(clusterName, clusterUUID),
stalePaths,
new ActionListener<>() {
@Override
public void onResponse(Void unused) {
logger.info(String.format(Locale.ROOT, "Deleted [%s] stale Remote Cluster Metadata files", stalePaths.size()));
}

@Override
public void onFailure(Exception e) {
logger.error(
new ParameterizedMessage("Exception occurred while deleting stale Remote Cluster Metadata files - {}", stalePaths)
);
}
}
);
}
}
Original file line number Diff line number Diff line change
@@ -36,7 +36,6 @@
import org.opensearch.snapshots.RestoreInfo;
import org.opensearch.snapshots.RestoreService;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;