Skip to content

Commit

Permalink
Delete stale index routing files
Browse files Browse the repository at this point in the history
Fix stale file blob path for indices routing
  • Loading branch information
Shailendra Singh committed May 27, 2024
1 parent a96304d commit f6f7639
Showing 1 changed file with 24 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@
import org.opensearch.core.action.ActionListener;
import org.opensearch.index.translog.transfer.BlobStoreTransferService;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.cluster.routing.remote.RemoteRoutingTableService;

import java.io.Closeable;
import java.io.IOException;
import java.sql.Blob;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -171,6 +173,7 @@ void deleteClusterMetadata(
Set<String> staleManifestPaths = new HashSet<>();
Set<String> staleIndexMetadataPaths = new HashSet<>();
Set<String> staleGlobalMetadataPaths = new HashSet<>();
Set<String> staleIndexRoutingPaths = new HashSet<>();
activeManifestBlobMetadata.forEach(blobMetadata -> {
ClusterMetadataManifest clusterMetadataManifest = remoteClusterStateService.getRemoteManifestManager().fetchRemoteClusterMetadataManifest(
clusterName,
Expand All @@ -187,6 +190,10 @@ void deleteClusterMetadata(
filesToKeep.add(clusterMetadataManifest.getTemplatesMetadata().getUploadedFilename());
clusterMetadataManifest.getCustomMetadataMap().values().forEach(attribute -> filesToKeep.add(attribute.getUploadedFilename()));
}
if (clusterMetadataManifest.getCodecVersion() >= ClusterMetadataManifest.CODEC_V3) {
clusterMetadataManifest.getIndicesRouting()
.forEach(uploadedIndicesRouting -> filesToKeep.add(uploadedIndicesRouting.getUploadedFilename()));
}
});
staleManifestBlobMetadata.forEach(blobMetadata -> {
ClusterMetadataManifest clusterMetadataManifest = remoteClusterStateService.getRemoteManifestManager().fetchRemoteClusterMetadataManifest(
Expand All @@ -205,6 +212,14 @@ void deleteClusterMetadata(
addStaleGlobalMetadataPath(attribute.getUploadedFilename(), filesToKeep, staleGlobalMetadataPaths);
});
}
if (clusterMetadataManifest.getCodecVersion() >= ClusterMetadataManifest.CODEC_V3) {
clusterMetadataManifest.getIndicesRouting().forEach(uploadedIndicesRouting -> {
if (filesToKeep.contains(uploadedIndicesRouting.getUploadedFilename()) == false){
staleIndexRoutingPaths.add(new BlobPath().buildAsString() + uploadedIndicesRouting.getUploadedFilename());
logger.debug("Indices routing paths in stale manifest: {}", uploadedIndicesRouting.getUploadedFilename());
}
});
}

clusterMetadataManifest.getIndices().forEach(uploadedIndexMetadata -> {
if (filesToKeep.contains(uploadedIndexMetadata.getUploadedFilename()) == false) {
Expand All @@ -224,6 +239,7 @@ void deleteClusterMetadata(
deleteStalePaths(clusterName, clusterUUID, new ArrayList<>(staleGlobalMetadataPaths));
deleteStalePaths(clusterName, clusterUUID, new ArrayList<>(staleIndexMetadataPaths));
deleteStalePaths(clusterName, clusterUUID, new ArrayList<>(staleManifestPaths));
deleteStaleIndexRoutingPaths(new ArrayList<>(staleIndexRoutingPaths));
} catch (IllegalStateException e) {
logger.error("Error while fetching Remote Cluster Metadata manifests", e);
} catch (IOException e) {
Expand Down Expand Up @@ -328,6 +344,14 @@ void deleteStalePaths(String clusterName, String clusterUUID, List<String> stale
);
}

void deleteStaleIndexRoutingPaths(List<String> stalePaths) throws IOException {
logger.debug(String.format(Locale.ROOT, "Deleting stale index routing files from remote - %s", stalePaths));
getBlobStoreTransferService().deleteBlobs(
BlobPath.cleanPath(),
stalePaths
);
}

/**
* Purges all remote cluster state against provided cluster UUIDs
* @param clusterState current state of the cluster
Expand Down

0 comments on commit f6f7639

Please sign in to comment.