From f995b129a658cadbc279b0aa72b807178591cc19 Mon Sep 17 00:00:00 2001 From: bansvaru Date: Mon, 4 Sep 2023 11:51:06 +0530 Subject: [PATCH] Minor refactoring Signed-off-by: bansvaru --- .../remote/RemoteClusterStateService.java | 151 +++++++++--------- 1 file changed, 76 insertions(+), 75 deletions(-) diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index 62ad94bb90bfa..37df8b401359a 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -66,6 +66,10 @@ public class RemoteClusterStateService implements Closeable { public static final int RETAINED_MANIFESTS = 3; + public static final String DELIMITER = "__"; + + private static final Logger logger = LogManager.getLogger(RemoteClusterStateService.class); + public static final ChecksumBlobStoreFormat INDEX_METADATA_FORMAT = new ChecksumBlobStoreFormat<>( "index-metadata", METADATA_NAME_FORMAT, @@ -97,10 +101,6 @@ public class RemoteClusterStateService implements Closeable { Property.NodeScope, Property.Final ); - private static final Logger logger = LogManager.getLogger(RemoteClusterStateService.class); - - public static final String DELIMITER = "__"; - private final String nodeId; private final Supplier repositoriesService; private final Settings settings; @@ -245,6 +245,7 @@ public ClusterMetadataManifest writeIncrementalMetadata( allUploadedIndexMetadata.values().stream().collect(Collectors.toList()), false ); + // TODO: Do we need to trigger delete less frequently? deleteClusterMetadataMarker(clusterState.getClusterName().value(), clusterState.metadata().clusterUUID(), RETAINED_MANIFESTS); final long durationMillis = relativeTimeMillisSupplier.getAsLong() - startTimeMillis; if (durationMillis >= slowWriteLoggingThreshold.getMillis()) { @@ -516,82 +517,82 @@ public void onFailure(Exception e) { public void deleteClusterMetadataMarker(String clusterName, String clusterUUID, int manifestsToRetain) { BlobStoreTransferService transferService = new BlobStoreTransferService(blobStoreRepository.blobStore(), threadpool); - synchronized (this) { - transferService.listAllInSortedOrderAsync( - ThreadPool.Names.REMOTE_PURGE, - getManifestFolderPath(clusterName, clusterUUID), - "manifest", - Integer.MAX_VALUE, - new ActionListener<>() { - int checkedManifestCount = 1; - - @Override - public void onResponse(List blobMetadata) { - Set filesToKeep = new HashSet<>(); - BlobPath staleBlobPath = new BlobPath(); - blobMetadata.forEach(manifestBlobMetadata -> { - ClusterMetadataManifest clusterMetadataManifest = fetchRemoteClusterMetadataManifest( - clusterName, - clusterUUID, - manifestBlobMetadata.name() + // TODO: do we need a synchronized block to avoid concurrent executions + transferService.listAllInSortedOrderAsync( + ThreadPool.Names.REMOTE_PURGE, + getManifestFolderPath(clusterName, clusterUUID), + "manifest", + Integer.MAX_VALUE, + new ActionListener<>() { + int checkedManifestCount = 1; + + @Override + public void onResponse(List blobMetadata) { + Set filesToKeep = new HashSet<>(); + BlobPath staleBlobPath = new BlobPath(); + blobMetadata.forEach(manifestBlobMetadata -> { + ClusterMetadataManifest clusterMetadataManifest = fetchRemoteClusterMetadataManifest( + clusterName, + clusterUUID, + manifestBlobMetadata.name() + ); + if (checkedManifestCount <= manifestsToRetain) { + clusterMetadataManifest.getIndices() + .forEach(uploadedIndexMetadata -> filesToKeep.add(uploadedIndexMetadata.getUploadedFilename())); + } else { + staleBlobPath.add( + getManifestFolderPath(clusterName, clusterUUID).add( + getManifestFileNamePrefix( + clusterMetadataManifest.getClusterTerm(), + clusterMetadataManifest.getStateVersion() + ) + ).toString() ); - if (checkedManifestCount <= manifestsToRetain) { - clusterMetadataManifest.getIndices() - .forEach(uploadedIndexMetadata -> { filesToKeep.add(uploadedIndexMetadata.getUploadedFilename()); }); - } else { - staleBlobPath.add( - getManifestFolderPath(clusterName, clusterUUID).add( - getManifestFileNamePrefix( - clusterMetadataManifest.getClusterTerm(), - clusterMetadataManifest.getStateVersion() - ) - ).toString() - ); - clusterMetadataManifest.getIndices().forEach(uploadedIndexMetadata -> { - if (filesToKeep.contains(uploadedIndexMetadata.getUploadedFilename()) == false) { - staleBlobPath.add(uploadedIndexMetadata.getUploadedFilename()); - } - }); - } - logger.trace(String.format(Locale.ROOT, "Deleting stale files from remote - %s", staleBlobPath)); - checkedManifestCount += 1; - }); - - if (staleBlobPath.toArray().length == 0) { - return; + clusterMetadataManifest.getIndices().forEach(uploadedIndexMetadata -> { + if (filesToKeep.contains(uploadedIndexMetadata.getUploadedFilename()) == false) { + staleBlobPath.add(uploadedIndexMetadata.getUploadedFilename()); + } + }); } + checkedManifestCount += 1; + }); - transferService.deleteAsync(ThreadPool.Names.REMOTE_PURGE, staleBlobPath, new ActionListener<>() { - @Override - public void onResponse(Void unused) { - logger.info( - String.format(Locale.ROOT, "Deleted [%s] stale Remote Cluster Metadata files", staleBlobPath.size()) - ); - } - - @Override - public void onFailure(Exception e) { - logger.error( - new ParameterizedMessage( - "Exception occurred while deleting stale Remote Cluster Metadata files - {}", - staleBlobPath - ) - ); - } - }); + logger.trace(String.format(Locale.ROOT, "Deleting stale files from remote - %s", staleBlobPath)); + if (staleBlobPath.toArray().length == 0) { + logger.trace("No stale Remote Cluster Metadata files found"); + return; } - @Override - public void onFailure(Exception e) { - logger.error( - new ParameterizedMessage( - "Exception occurred while deleting Remote Cluster Metadata for clusterUUIDs {}", - clusterUUID - ) - ); - } + transferService.deleteAsync(ThreadPool.Names.REMOTE_PURGE, staleBlobPath, new ActionListener<>() { + @Override + public void onResponse(Void unused) { + logger.info( + String.format(Locale.ROOT, "Deleted [%s] stale Remote Cluster Metadata files", staleBlobPath.size()) + ); + } + + @Override + public void onFailure(Exception e) { + logger.error( + new ParameterizedMessage( + "Exception occurred while deleting stale Remote Cluster Metadata files - {}", + staleBlobPath + ) + ); + } + }); } - ); - } + + @Override + public void onFailure(Exception e) { + logger.error( + new ParameterizedMessage( + "Exception occurred while deleting Remote Cluster Metadata for clusterUUIDs {}", + clusterUUID + ) + ); + } + } + ); } }