diff --git a/CHANGELOG.md b/CHANGELOG.md
index a0b5dd289ec85..a48c4635d8dec 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -12,6 +12,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
 - [Remote Store] Add dynamic cluster settings to set timeout for segments upload to Remote Store ([#13679](https://github.com/opensearch-project/OpenSearch/pull/13679))
 - [Remote Store] Upload translog checkpoint as object metadata to translog.tlog([#13637](https://github.com/opensearch-project/OpenSearch/pull/13637))
 - Add getMetadataFields to MapperService ([#13819](https://github.com/opensearch-project/OpenSearch/pull/13819))
+- [Remote State] Add async remote state deletion task running on an interval, configurable by a setting ([#13131](https://github.com/opensearch-project/OpenSearch/pull/13131))
 
 ### Dependencies
 - Bump `com.github.spullara.mustache.java:compiler` from 0.9.10 to 0.9.13 ([#13329](https://github.com/opensearch-project/OpenSearch/pull/13329), [#13559](https://github.com/opensearch-project/OpenSearch/pull/13559))
diff --git a/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManagerIT.java b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManagerIT.java
new file mode 100644
index 0000000000000..0f441fe01a368
--- /dev/null
+++ b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManagerIT.java
@@ -0,0 +1,145 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.gateway.remote;
+
+import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
+import org.opensearch.common.blobstore.BlobPath;
+import org.opensearch.common.settings.Settings;
+import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase;
+import org.opensearch.repositories.RepositoriesService;
+import org.opensearch.repositories.blobstore.BlobStoreRepository;
+import org.opensearch.test.OpenSearchIntegTestCase;
+import org.junit.Before;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.CLUSTER_STATE_CLEANUP_INTERVAL_DEFAULT;
+import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING;
+import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.RETAINED_MANIFESTS;
+import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.SKIP_CLEANUP_STATE_CHANGES;
+import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
+import static org.opensearch.indices.IndicesService.CLUSTER_DEFAULT_INDEX_REFRESH_INTERVAL_SETTING;
+
+@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
+public class RemoteClusterStateCleanupManagerIT extends RemoteStoreBaseIntegTestCase {
+
+    private static final String INDEX_NAME = "test-index";
+
+    @Before
+    public void setup() {
+        asyncUploadMockFsRepo = false;
+    }
+
+    @Override
+    protected Settings nodeSettings(int nodeOrdinal) {
+        return Settings.builder().put(super.nodeSettings(nodeOrdinal)).put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true).build();
+    }
+
+    private Map<String, Long> initialTestSetup(int shardCount, int replicaCount, int dataNodeCount, int clusterManagerNodeCount) {
+        prepareCluster(clusterManagerNodeCount, dataNodeCount, INDEX_NAME, replicaCount, shardCount);
+        Map<String, Long> indexStats = indexData(1, false, INDEX_NAME);
+        assertEquals(shardCount * (replicaCount + 1), getNumShards(INDEX_NAME).totalNumShards);
+        ensureGreen(INDEX_NAME);
+        return indexStats;
+    }
+
+    public void testRemoteCleanupTaskUpdated() {
+        int shardCount = randomIntBetween(1, 2);
+        int replicaCount = 1;
+        int dataNodeCount = shardCount * (replicaCount + 1);
+        int clusterManagerNodeCount = 1;
+
+        initialTestSetup(shardCount, replicaCount, dataNodeCount, clusterManagerNodeCount);
+        RemoteClusterStateCleanupManager remoteClusterStateCleanupManager = internalCluster().getClusterManagerNodeInstance(
+            RemoteClusterStateCleanupManager.class
+        );
+
+        assertEquals(CLUSTER_STATE_CLEANUP_INTERVAL_DEFAULT, remoteClusterStateCleanupManager.getStaleFileDeletionTask().getInterval());
+        assertTrue(remoteClusterStateCleanupManager.getStaleFileDeletionTask().isScheduled());
+
+        // now disable
+        client().admin()
+            .cluster()
+            .prepareUpdateSettings()
+            .setPersistentSettings(Settings.builder().put(REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING.getKey(), -1))
+            .get();
+
+        assertEquals(-1, remoteClusterStateCleanupManager.getStaleFileDeletionTask().getInterval().getMillis());
+        assertFalse(remoteClusterStateCleanupManager.getStaleFileDeletionTask().isScheduled());
+
+        // now set Clean up interval to 1 min
+        client().admin()
+            .cluster()
+            .prepareUpdateSettings()
+            .setPersistentSettings(Settings.builder().put(REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING.getKey(), "1m"))
+            .get();
+        assertEquals(1, remoteClusterStateCleanupManager.getStaleFileDeletionTask().getInterval().getMinutes());
+    }
+
+    public void testRemoteCleanupDeleteStale() throws Exception {
+        int shardCount = randomIntBetween(1, 2);
+        int replicaCount = 1;
+        int dataNodeCount = shardCount * (replicaCount + 1);
+        int clusterManagerNodeCount = 1;
+
+        initialTestSetup(shardCount, replicaCount, dataNodeCount, clusterManagerNodeCount);
+
+        // set cleanup interval to 100 ms to make the test faster
+        ClusterUpdateSettingsResponse response = client().admin()
+            .cluster()
+            .prepareUpdateSettings()
+            .setPersistentSettings(Settings.builder().put(REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING.getKey(), "100ms"))
+            .get();
+
+        assertTrue(response.isAcknowledged());
+
+        // update cluster state 21 times to ensure that clean up has run after this will upload 42 manifest files
+        // to repository, if manifest files are less than that it means clean up has run
+        updateClusterStateNTimes(RETAINED_MANIFESTS + SKIP_CLEANUP_STATE_CHANGES + 1);
+
+        RepositoriesService repositoriesService = internalCluster().getClusterManagerNodeInstance(RepositoriesService.class);
+        BlobStoreRepository repository = (BlobStoreRepository) repositoriesService.repository(REPOSITORY_NAME);
+        BlobPath baseMetadataPath = repository.basePath()
+            .add(
+                Base64.getUrlEncoder()
+                    .withoutPadding()
+                    .encodeToString(getClusterState().getClusterName().value().getBytes(StandardCharsets.UTF_8))
+            )
+            .add("cluster-state")
+            .add(getClusterState().metadata().clusterUUID());
+        BlobPath manifestContainerPath = baseMetadataPath.add("manifest");
+
+        assertBusy(() -> {
+            int manifestFiles = repository.blobStore().blobContainer(manifestContainerPath).listBlobsByPrefix("manifest").size();
+            logger.info("number of current manifest file: {}", manifestFiles);
+            // we can't guarantee that we have same number of manifest as Retained manifest in our repo as there can be other queued task
+            // other than replica count change which can upload new manifest files, that's why we check that number of manifests is between
+            // Retained manifests and Retained manifests + 2 * Skip cleanup state changes (each cluster state update uploads 2 manifests)
+            assertTrue(
+                "Current number of manifest files: " + manifestFiles,
+                manifestFiles >= RETAINED_MANIFESTS && manifestFiles < RETAINED_MANIFESTS + 2 * SKIP_CLEANUP_STATE_CHANGES
+            );
+        }, 500, TimeUnit.MILLISECONDS);
+    }
+
+    private void updateClusterStateNTimes(int n) {
+        int newReplicaCount = randomIntBetween(0, 3);
+        for (int i = n; i > 0; i--) {
+            ClusterUpdateSettingsResponse response = client().admin()
+                .cluster()
+                .prepareUpdateSettings()
+                .setPersistentSettings(Settings.builder().put(CLUSTER_DEFAULT_INDEX_REFRESH_INTERVAL_SETTING.getKey(), i, TimeUnit.SECONDS))
+                .get();
+            assertTrue(response.isAcknowledged());
+        }
+    }
+}
diff --git a/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateServiceIT.java b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateServiceIT.java
index 42120aa32eb47..ab2f0f0080566 100644
--- a/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateServiceIT.java
+++ b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateServiceIT.java
@@ -10,7 +10,6 @@
 
 import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest;
 import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
-import org.opensearch.cluster.metadata.IndexMetadata;
 import org.opensearch.common.blobstore.BlobPath;
 import org.opensearch.common.settings.Settings;
 import org.opensearch.discovery.DiscoveryStats;
@@ -27,7 +26,6 @@
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
-import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
 import static org.opensearch.gateway.remote.RemoteClusterStateService.COORDINATION_METADATA;
 import static org.opensearch.gateway.remote.RemoteClusterStateService.CUSTOM_METADATA;
 import static org.opensearch.gateway.remote.RemoteClusterStateService.DELIMITER;
@@ -51,16 +49,6 @@ protected Settings nodeSettings(int nodeOrdinal) {
         return Settings.builder().put(super.nodeSettings(nodeOrdinal)).put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true).build();
     }
 
-    private void prepareCluster(int numClusterManagerNodes, int numDataOnlyNodes, String indices, int replicaCount, int shardCount) {
-        internalCluster().startClusterManagerOnlyNodes(numClusterManagerNodes);
-        internalCluster().startDataOnlyNodes(numDataOnlyNodes);
-        for (String index : indices.split(",")) {
-            createIndex(index, remoteStoreIndexSettings(replicaCount, shardCount));
-            ensureYellowAndNoInitializingShards(index);
-            ensureGreen(index);
-        }
-    }
-
     private Map<String, Long> initialTestSetup(int shardCount, int replicaCount, int dataNodeCount, int clusterManagerNodeCount) {
         prepareCluster(clusterManagerNodeCount, dataNodeCount, INDEX_NAME, replicaCount, shardCount);
         Map<String, Long> indexStats = indexData(1, false, INDEX_NAME);
@@ -69,49 +57,6 @@ private Map<String, Long> initialTestSetup(int shardCount, int replicaCount, int
         return indexStats;
     }
 
-    public void testFullClusterRestoreStaleDelete() throws Exception {
-        int shardCount = randomIntBetween(1, 2);
-        int replicaCount = 1;
-        int dataNodeCount = shardCount * (replicaCount + 1);
-        int clusterManagerNodeCount = 1;
-
-        initialTestSetup(shardCount, replicaCount, dataNodeCount, clusterManagerNodeCount);
-        setReplicaCount(0);
-        setReplicaCount(2);
-        setReplicaCount(0);
-        setReplicaCount(1);
-        setReplicaCount(0);
-        setReplicaCount(1);
-        setReplicaCount(0);
-        setReplicaCount(2);
-        setReplicaCount(0);
-
-        RemoteClusterStateService remoteClusterStateService = internalCluster().getClusterManagerNodeInstance(
-            RemoteClusterStateService.class
-        );
-
-        RepositoriesService repositoriesService = internalCluster().getClusterManagerNodeInstance(RepositoriesService.class);
-
-        BlobStoreRepository repository = (BlobStoreRepository) repositoriesService.repository(REPOSITORY_NAME);
-        BlobPath baseMetadataPath = repository.basePath()
-            .add(
-                Base64.getUrlEncoder()
-                    .withoutPadding()
-                    .encodeToString(getClusterState().getClusterName().value().getBytes(StandardCharsets.UTF_8))
-            )
-            .add("cluster-state")
-            .add(getClusterState().metadata().clusterUUID());
-
-        assertEquals(10, repository.blobStore().blobContainer(baseMetadataPath.add("manifest")).listBlobsByPrefix("manifest").size());
-
-        Map<String, IndexMetadata> indexMetadataMap = remoteClusterStateService.getLatestClusterState(
-            cluster().getClusterName(),
-            getClusterState().metadata().clusterUUID()
-        ).getMetadata().getIndices();
-        assertEquals(0, indexMetadataMap.values().stream().findFirst().get().getNumberOfReplicas());
-        assertEquals(shardCount, indexMetadataMap.values().stream().findFirst().get().getNumberOfShards());
-    }
-
     public void testRemoteStateStats() {
         int shardCount = randomIntBetween(1, 2);
         int replicaCount = 1;
@@ -241,12 +186,4 @@ private void validateNodesStatsResponse(NodesStatsResponse nodesStatsResponse) {
         assertNotNull(nodesStatsResponse.getNodes().get(0));
         assertNotNull(nodesStatsResponse.getNodes().get(0).getDiscoveryStats());
     }
-
-    private void setReplicaCount(int replicaCount) {
-        client().admin()
-            .indices()
-            .prepareUpdateSettings(INDEX_NAME)
-            .setSettings(Settings.builder().put(SETTING_NUMBER_OF_REPLICAS, replicaCount))
-            .get();
-    }
 }
diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java
index 740aee69f7d80..64efcee6ef1b5 100644
--- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java
+++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java
@@ -350,4 +350,14 @@ protected void restore(boolean restoreAllShards, String... indices) {
                 PlainActionFuture.newFuture()
             );
     }
+
+    protected void prepareCluster(int numClusterManagerNodes, int numDataOnlyNodes, String indices, int replicaCount, int shardCount) {
+        internalCluster().startClusterManagerOnlyNodes(numClusterManagerNodes);
+        internalCluster().startDataOnlyNodes(numDataOnlyNodes);
+        for (String index : indices.split(",")) {
+            createIndex(index, remoteStoreIndexSettings(replicaCount, shardCount));
+            ensureYellowAndNoInitializingShards(index);
+            ensureGreen(index);
+        }
+    }
 }
diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java
index a2be2ea4510e0..d57ef7e780602 100644
--- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java
+++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java
@@ -104,6 +104,7 @@
 import org.opensearch.gateway.GatewayService;
 import org.opensearch.gateway.PersistedClusterStateService;
 import org.opensearch.gateway.ShardsBatchGatewayAllocator;
+import org.opensearch.gateway.remote.RemoteClusterStateCleanupManager;
 import org.opensearch.gateway.remote.RemoteClusterStateService;
 import org.opensearch.http.HttpTransportSettings;
 import org.opensearch.index.IndexModule;
@@ -711,6 +712,7 @@ public void apply(Settings value, Settings current, Settings previous) {
                 SearchRequestSlowLog.CLUSTER_SEARCH_REQUEST_SLOWLOG_LEVEL,
 
                 // Remote cluster state settings
+                RemoteClusterStateCleanupManager.REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING,
                 RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING,
                 RemoteClusterStateService.INDEX_METADATA_UPLOAD_TIMEOUT_SETTING,
                 RemoteClusterStateService.GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING,
diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManager.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManager.java
new file mode 100644
index 0000000000000..8a106a25e5630
--- /dev/null
+++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManager.java
@@ -0,0 +1,408 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.gateway.remote;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.message.ParameterizedMessage;
+import org.apache.logging.log4j.util.Strings;
+import org.opensearch.cluster.ClusterState;
+import org.opensearch.cluster.service.ClusterApplierService;
+import org.opensearch.cluster.service.ClusterService;
+import org.opensearch.common.blobstore.BlobMetadata;
+import org.opensearch.common.blobstore.BlobPath;
+import org.opensearch.common.settings.ClusterSettings;
+import org.opensearch.common.settings.Setting;
+import org.opensearch.common.unit.TimeValue;
+import org.opensearch.common.util.concurrent.AbstractAsyncTask;
+import org.opensearch.core.action.ActionListener;
+import org.opensearch.index.translog.transfer.BlobStoreTransferService;
+import org.opensearch.threadpool.ThreadPool;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.opensearch.gateway.remote.RemoteClusterStateService.GLOBAL_METADATA_FORMAT;
+import static org.opensearch.gateway.remote.RemoteClusterStateService.GLOBAL_METADATA_PATH_TOKEN;
+import static org.opensearch.gateway.remote.RemoteClusterStateService.INDEX_METADATA_FORMAT;
+import static org.opensearch.gateway.remote.RemoteClusterStateService.INDEX_PATH_TOKEN;
+import static org.opensearch.gateway.remote.RemoteClusterStateService.MANIFEST_FILE_PREFIX;
+import static org.opensearch.gateway.remote.RemoteClusterStateService.MANIFEST_PATH_TOKEN;
+
+/**
+ * A Manager which provides APIs to clean up stale cluster state files and runs an async stale cleanup task
+ *
+ * @opensearch.internal
+ */
+public class RemoteClusterStateCleanupManager implements Closeable {
+
+    public static final int RETAINED_MANIFESTS = 10;
+    public static final int SKIP_CLEANUP_STATE_CHANGES = 10;
+    public static final TimeValue CLUSTER_STATE_CLEANUP_INTERVAL_DEFAULT = TimeValue.timeValueMinutes(5);
+    public static final TimeValue CLUSTER_STATE_CLEANUP_INTERVAL_MINIMUM = TimeValue.MINUS_ONE;
+
+    /**
+     * Setting to specify the interval to do run stale file cleanup job
+     * Min value -1 indicates that the stale file cleanup job should be disabled
+     */
+    public static final Setting<TimeValue> REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING = Setting.timeSetting(
+        "cluster.remote_store.state.cleanup_interval",
+        CLUSTER_STATE_CLEANUP_INTERVAL_DEFAULT,
+        CLUSTER_STATE_CLEANUP_INTERVAL_MINIMUM,
+        Setting.Property.NodeScope,
+        Setting.Property.Dynamic
+    );
+    private static final Logger logger = LogManager.getLogger(RemoteClusterStateCleanupManager.class);
+    private final RemoteClusterStateService remoteClusterStateService;
+    private final RemotePersistenceStats remoteStateStats;
+    private BlobStoreTransferService blobStoreTransferService;
+    private TimeValue staleFileCleanupInterval;
+    private final AtomicBoolean deleteStaleMetadataRunning = new AtomicBoolean(false);
+    private AsyncStaleFileDeletion staleFileDeletionTask;
+    private long lastCleanupAttemptStateVersion;
+    private final ThreadPool threadpool;
+    private final ClusterApplierService clusterApplierService;
+
+    public RemoteClusterStateCleanupManager(RemoteClusterStateService remoteClusterStateService, ClusterService clusterService) {
+        this.remoteClusterStateService = remoteClusterStateService;
+        this.remoteStateStats = remoteClusterStateService.getStats();
+        ClusterSettings clusterSettings = clusterService.getClusterSettings();
+        this.clusterApplierService = clusterService.getClusterApplierService();
+        this.staleFileCleanupInterval = clusterSettings.get(REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING);
+        this.threadpool = remoteClusterStateService.getThreadpool();
+        // initialize with 0, a cleanup will be done when this node is elected master node and version is incremented more than threshold
+        this.lastCleanupAttemptStateVersion = 0;
+        clusterSettings.addSettingsUpdateConsumer(REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING, this::updateCleanupInterval);
+    }
+
+    void start() {
+        staleFileDeletionTask = new AsyncStaleFileDeletion(this);
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (staleFileDeletionTask != null) {
+            staleFileDeletionTask.close();
+        }
+    }
+
+    private BlobStoreTransferService getBlobStoreTransferService() {
+        if (blobStoreTransferService == null) {
+            blobStoreTransferService = new BlobStoreTransferService(remoteClusterStateService.getBlobStore(), threadpool);
+        }
+        return blobStoreTransferService;
+    }
+
+    private void updateCleanupInterval(TimeValue updatedInterval) {
+        this.staleFileCleanupInterval = updatedInterval;
+        logger.info("updated remote state cleanup interval to {}", updatedInterval);
+        // After updating the interval, we need to close the current task and create a new one which will run with updated interval
+        if (staleFileDeletionTask != null && !staleFileDeletionTask.getInterval().equals(updatedInterval)) {
+            staleFileDeletionTask.setInterval(updatedInterval);
+        }
+    }
+
+    // visible for testing
+    void cleanUpStaleFiles() {
+        ClusterState currentAppliedState = clusterApplierService.state();
+        if (currentAppliedState.nodes().isLocalNodeElectedClusterManager()) {
+            long cleanUpAttemptStateVersion = currentAppliedState.version();
+            assert Strings.isNotEmpty(currentAppliedState.getClusterName().value()) : "cluster name is not set";
+            assert Strings.isNotEmpty(currentAppliedState.metadata().clusterUUID()) : "cluster uuid is not set";
+            if (cleanUpAttemptStateVersion - lastCleanupAttemptStateVersion > SKIP_CLEANUP_STATE_CHANGES) {
+                logger.info(
+                    "Cleaning up stale remote state files for cluster [{}] with uuid [{}]. Last clean was done before {} updates",
+                    currentAppliedState.getClusterName().value(),
+                    currentAppliedState.metadata().clusterUUID(),
+                    cleanUpAttemptStateVersion - lastCleanupAttemptStateVersion
+                );
+                this.deleteStaleClusterMetadata(
+                    currentAppliedState.getClusterName().value(),
+                    currentAppliedState.metadata().clusterUUID(),
+                    RETAINED_MANIFESTS
+                );
+                lastCleanupAttemptStateVersion = cleanUpAttemptStateVersion;
+            } else {
+                logger.debug(
+                    "Skipping cleanup of stale remote state files for cluster [{}] with uuid [{}]. Last clean was done before {} updates, which is less than threshold {}",
+                    currentAppliedState.getClusterName().value(),
+                    currentAppliedState.metadata().clusterUUID(),
+                    cleanUpAttemptStateVersion - lastCleanupAttemptStateVersion,
+                    SKIP_CLEANUP_STATE_CHANGES
+                );
+            }
+        } else {
+            logger.debug("Skipping cleanup task as local node is not elected Cluster Manager");
+        }
+    }
+
+    private void addStaleGlobalMetadataPath(String fileName, Set<String> filesToKeep, Set<String> staleGlobalMetadataPaths) {
+        if (!filesToKeep.contains(fileName)) {
+            String[] splitPath = fileName.split("/");
+            staleGlobalMetadataPaths.add(
+                new BlobPath().add(GLOBAL_METADATA_PATH_TOKEN).buildAsString() + GLOBAL_METADATA_FORMAT.blobName(
+                    splitPath[splitPath.length - 1]
+                )
+            );
+        }
+    }
+
+    // visible for testing
+    void deleteClusterMetadata(
+        String clusterName,
+        String clusterUUID,
+        List<BlobMetadata> activeManifestBlobMetadata,
+        List<BlobMetadata> staleManifestBlobMetadata
+    ) {
+        try {
+            Set<String> filesToKeep = new HashSet<>();
+            Set<String> staleManifestPaths = new HashSet<>();
+            Set<String> staleIndexMetadataPaths = new HashSet<>();
+            Set<String> staleGlobalMetadataPaths = new HashSet<>();
+            activeManifestBlobMetadata.forEach(blobMetadata -> {
+                ClusterMetadataManifest clusterMetadataManifest = remoteClusterStateService.fetchRemoteClusterMetadataManifest(
+                    clusterName,
+                    clusterUUID,
+                    blobMetadata.name()
+                );
+                clusterMetadataManifest.getIndices()
+                    .forEach(uploadedIndexMetadata -> filesToKeep.add(uploadedIndexMetadata.getUploadedFilename()));
+                if (clusterMetadataManifest.getCodecVersion() == ClusterMetadataManifest.CODEC_V1) {
+                    filesToKeep.add(clusterMetadataManifest.getGlobalMetadataFileName());
+                } else if (clusterMetadataManifest.getCodecVersion() >= ClusterMetadataManifest.CODEC_V2) {
+                    filesToKeep.add(clusterMetadataManifest.getCoordinationMetadata().getUploadedFilename());
+                    filesToKeep.add(clusterMetadataManifest.getSettingsMetadata().getUploadedFilename());
+                    filesToKeep.add(clusterMetadataManifest.getTemplatesMetadata().getUploadedFilename());
+                    clusterMetadataManifest.getCustomMetadataMap()
+                        .values()
+                        .forEach(attribute -> filesToKeep.add(attribute.getUploadedFilename()));
+                }
+            });
+            staleManifestBlobMetadata.forEach(blobMetadata -> {
+                ClusterMetadataManifest clusterMetadataManifest = remoteClusterStateService.fetchRemoteClusterMetadataManifest(
+                    clusterName,
+                    clusterUUID,
+                    blobMetadata.name()
+                );
+                staleManifestPaths.add(new BlobPath().add(MANIFEST_PATH_TOKEN).buildAsString() + blobMetadata.name());
+                if (clusterMetadataManifest.getCodecVersion() == ClusterMetadataManifest.CODEC_V1) {
+                    addStaleGlobalMetadataPath(clusterMetadataManifest.getGlobalMetadataFileName(), filesToKeep, staleGlobalMetadataPaths);
+                } else if (clusterMetadataManifest.getCodecVersion() >= ClusterMetadataManifest.CODEC_V2) {
+                    addStaleGlobalMetadataPath(
+                        clusterMetadataManifest.getCoordinationMetadata().getUploadedFilename(),
+                        filesToKeep,
+                        staleGlobalMetadataPaths
+                    );
+                    addStaleGlobalMetadataPath(
+                        clusterMetadataManifest.getSettingsMetadata().getUploadedFilename(),
+                        filesToKeep,
+                        staleGlobalMetadataPaths
+                    );
+                    addStaleGlobalMetadataPath(
+                        clusterMetadataManifest.getTemplatesMetadata().getUploadedFilename(),
+                        filesToKeep,
+                        staleGlobalMetadataPaths
+                    );
+                    clusterMetadataManifest.getCustomMetadataMap()
+                        .values()
+                        .forEach(
+                            attribute -> addStaleGlobalMetadataPath(attribute.getUploadedFilename(), filesToKeep, staleGlobalMetadataPaths)
+                        );
+                }
+
+                clusterMetadataManifest.getIndices().forEach(uploadedIndexMetadata -> {
+                    if (filesToKeep.contains(uploadedIndexMetadata.getUploadedFilename()) == false) {
+                        staleIndexMetadataPaths.add(
+                            new BlobPath().add(INDEX_PATH_TOKEN).add(uploadedIndexMetadata.getIndexUUID()).buildAsString()
+                                + INDEX_METADATA_FORMAT.blobName(uploadedIndexMetadata.getUploadedFilename())
+                        );
+                    }
+                });
+            });
+
+            if (staleManifestPaths.isEmpty()) {
+                logger.debug("No stale Remote Cluster Metadata files found");
+                return;
+            }
+
+            deleteStalePaths(clusterName, clusterUUID, new ArrayList<>(staleGlobalMetadataPaths));
+            deleteStalePaths(clusterName, clusterUUID, new ArrayList<>(staleIndexMetadataPaths));
+            deleteStalePaths(clusterName, clusterUUID, new ArrayList<>(staleManifestPaths));
+        } catch (IllegalStateException e) {
+            logger.error("Error while fetching Remote Cluster Metadata manifests", e);
+        } catch (IOException e) {
+            logger.error("Error while deleting stale Remote Cluster Metadata files", e);
+            remoteStateStats.cleanUpAttemptFailed();
+        } catch (Exception e) {
+            logger.error("Unexpected error while deleting stale Remote Cluster Metadata files", e);
+            remoteStateStats.cleanUpAttemptFailed();
+        }
+    }
+
+    /**
+     * Deletes older than last {@code versionsToRetain} manifests. Also cleans up unreferenced IndexMetadata associated with older manifests
+     *
+     * @param clusterName name of the cluster
+     * @param clusterUUID uuid of cluster state to refer to in remote
+     * @param manifestsToRetain no of latest manifest files to keep in remote
+     */
+    // package private for testing
+    void deleteStaleClusterMetadata(String clusterName, String clusterUUID, int manifestsToRetain) {
+        if (deleteStaleMetadataRunning.compareAndSet(false, true) == false) {
+            logger.info("Delete stale cluster metadata task is already in progress.");
+            return;
+        }
+        try {
+            getBlobStoreTransferService().listAllInSortedOrderAsync(
+                ThreadPool.Names.REMOTE_PURGE,
+                remoteClusterStateService.getManifestFolderPath(clusterName, clusterUUID),
+                MANIFEST_FILE_PREFIX,
+                Integer.MAX_VALUE,
+                new ActionListener<>() {
+                    @Override
+                    public void onResponse(List<BlobMetadata> blobMetadata) {
+                        if (blobMetadata.size() > manifestsToRetain) {
+                            deleteClusterMetadata(
+                                clusterName,
+                                clusterUUID,
+                                blobMetadata.subList(0, manifestsToRetain),
+                                blobMetadata.subList(manifestsToRetain, blobMetadata.size())
+                            );
+                        }
+                        deleteStaleMetadataRunning.set(false);
+                    }
+
+                    @Override
+                    public void onFailure(Exception e) {
+                        logger.error(
+                            new ParameterizedMessage(
+                                "Exception occurred while deleting Remote Cluster Metadata for clusterUUIDs {}",
+                                clusterUUID
+                            )
+                        );
+                        deleteStaleMetadataRunning.set(false);
+                    }
+                }
+            );
+        } catch (Exception e) {
+            deleteStaleMetadataRunning.set(false);
+            throw e;
+        }
+    }
+
+    /**
+     * Purges all remote cluster state against provided cluster UUIDs
+     *
+     * @param clusterName name of the cluster
+     * @param clusterUUIDs clusteUUIDs for which the remote state needs to be purged
+     */
+    void deleteStaleUUIDsClusterMetadata(String clusterName, List<String> clusterUUIDs) {
+        clusterUUIDs.forEach(
+            clusterUUID -> getBlobStoreTransferService().deleteAsync(
+                ThreadPool.Names.REMOTE_PURGE,
+                remoteClusterStateService.getCusterMetadataBasePath(clusterName, clusterUUID),
+                new ActionListener<>() {
+                    @Override
+                    public void onResponse(Void unused) {
+                        logger.info("Deleted all remote cluster metadata for cluster UUID - {}", clusterUUID);
+                    }
+
+                    @Override
+                    public void onFailure(Exception e) {
+                        logger.error(
+                            new ParameterizedMessage(
+                                "Exception occurred while deleting all remote cluster metadata for cluster UUID {}",
+                                clusterUUID
+                            ),
+                            e
+                        );
+                        remoteStateStats.cleanUpAttemptFailed();
+                    }
+                }
+            )
+        );
+    }
+
+    // package private for testing
+    void deleteStalePaths(String clusterName, String clusterUUID, List<String> stalePaths) throws IOException {
+        logger.debug(String.format(Locale.ROOT, "Deleting stale files from remote - %s", stalePaths));
+        getBlobStoreTransferService().deleteBlobs(
+            remoteClusterStateService.getCusterMetadataBasePath(clusterName, clusterUUID),
+            stalePaths
+        );
+    }
+
+    /**
+     * Purges all remote cluster state against provided cluster UUIDs
+     * @param clusterState current state of the cluster
+     * @param committedManifest last committed ClusterMetadataManifest
+     */
+    public void deleteStaleClusterUUIDs(ClusterState clusterState, ClusterMetadataManifest committedManifest) {
+        threadpool.executor(ThreadPool.Names.REMOTE_PURGE).execute(() -> {
+            String clusterName = clusterState.getClusterName().value();
+            logger.debug("Deleting stale cluster UUIDs data from remote [{}]", clusterName);
+            Set<String> allClustersUUIDsInRemote;
+            try {
+                allClustersUUIDsInRemote = new HashSet<>(
+                    remoteClusterStateService.getAllClusterUUIDs(clusterState.getClusterName().value())
+                );
+            } catch (IOException e) {
+                logger.info(String.format(Locale.ROOT, "Error while fetching all cluster UUIDs for [%s]", clusterName));
+                return;
+            }
+            // Retain last 2 cluster uuids data
+            allClustersUUIDsInRemote.remove(committedManifest.getClusterUUID());
+            allClustersUUIDsInRemote.remove(committedManifest.getPreviousClusterUUID());
+            deleteStaleUUIDsClusterMetadata(clusterName, new ArrayList<>(allClustersUUIDsInRemote));
+        });
+    }
+
+    public TimeValue getStaleFileCleanupInterval() {
+        return this.staleFileCleanupInterval;
+    }
+
+    AsyncStaleFileDeletion getStaleFileDeletionTask() { // for testing
+        return this.staleFileDeletionTask;
+    }
+
+    RemotePersistenceStats getStats() {
+        return this.remoteStateStats;
+    }
+
+    static final class AsyncStaleFileDeletion extends AbstractAsyncTask {
+        private final RemoteClusterStateCleanupManager remoteClusterStateCleanupManager;
+
+        AsyncStaleFileDeletion(RemoteClusterStateCleanupManager remoteClusterStateCleanupManager) {
+            super(
+                logger,
+                remoteClusterStateCleanupManager.threadpool,
+                remoteClusterStateCleanupManager.getStaleFileCleanupInterval(),
+                true
+            );
+            this.remoteClusterStateCleanupManager = remoteClusterStateCleanupManager;
+            rescheduleIfNecessary();
+        }
+
+        @Override
+        protected boolean mustReschedule() {
+            return true;
+        }
+
+        @Override
+        protected void runInternal() {
+            remoteClusterStateCleanupManager.cleanUpStaleFiles();
+        }
+    }
+}
diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java
index ac821cd15a5b3..9bf1dff2359ca 100644
--- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java
+++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java
@@ -18,11 +18,13 @@
 import org.opensearch.cluster.metadata.IndexMetadata;
 import org.opensearch.cluster.metadata.Metadata;
 import org.opensearch.cluster.metadata.TemplatesMetadata;
+import org.opensearch.cluster.service.ClusterService;
 import org.opensearch.common.CheckedRunnable;
 import org.opensearch.common.Nullable;
 import org.opensearch.common.blobstore.BlobContainer;
 import org.opensearch.common.blobstore.BlobMetadata;
 import org.opensearch.common.blobstore.BlobPath;
+import org.opensearch.common.blobstore.BlobStore;
 import org.opensearch.common.settings.ClusterSettings;
 import org.opensearch.common.settings.Setting;
 import org.opensearch.common.settings.Setting.Property;
@@ -59,7 +61,6 @@
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
 import java.util.function.LongSupplier;
@@ -81,8 +82,6 @@ public class RemoteClusterStateService implements Closeable {
 
     public static final String METADATA_MANIFEST_NAME_FORMAT = "%s";
 
-    public static final int RETAINED_MANIFESTS = 10;
-
     public static final String DELIMITER = "__";
     public static final String CUSTOM_DELIMITER = "--";
 
@@ -207,8 +206,7 @@ public class RemoteClusterStateService implements Closeable {
     private volatile TimeValue indexMetadataUploadTimeout;
     private volatile TimeValue globalMetadataUploadTimeout;
     private volatile TimeValue metadataManifestUploadTimeout;
-
-    private final AtomicBoolean deleteStaleMetadataRunning = new AtomicBoolean(false);
+    private RemoteClusterStateCleanupManager remoteClusterStateCleanupManager;
     private final RemotePersistenceStats remoteStateStats;
     private final String CLUSTER_STATE_UPLOAD_TIME_LOG_STRING = "writing cluster state for version [{}] took [{}ms]";
     private final String METADATA_UPDATE_LOG_STRING = "wrote metadata for [{}] indices and skipped [{}] unchanged "
@@ -232,7 +230,7 @@ public RemoteClusterStateService(
         String nodeId,
         Supplier<RepositoriesService> repositoriesService,
         Settings settings,
-        ClusterSettings clusterSettings,
+        ClusterService clusterService,
         LongSupplier relativeTimeNanosSupplier,
         ThreadPool threadPool,
         List<IndexMetadataUploadListener> indexMetadataUploadListeners
@@ -243,6 +241,7 @@ public RemoteClusterStateService(
         this.settings = settings;
         this.relativeTimeNanosSupplier = relativeTimeNanosSupplier;
         this.threadpool = threadPool;
+        ClusterSettings clusterSettings = clusterService.getClusterSettings();
         this.slowWriteLoggingThreshold = clusterSettings.get(SLOW_WRITE_LOGGING_THRESHOLD);
         this.indexMetadataUploadTimeout = clusterSettings.get(INDEX_METADATA_UPLOAD_TIMEOUT_SETTING);
         this.globalMetadataUploadTimeout = clusterSettings.get(GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING);
@@ -252,16 +251,10 @@ public RemoteClusterStateService(
         clusterSettings.addSettingsUpdateConsumer(GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING, this::setGlobalMetadataUploadTimeout);
         clusterSettings.addSettingsUpdateConsumer(METADATA_MANIFEST_UPLOAD_TIMEOUT_SETTING, this::setMetadataManifestUploadTimeout);
         this.remoteStateStats = new RemotePersistenceStats();
+        this.remoteClusterStateCleanupManager = new RemoteClusterStateCleanupManager(this, clusterService);
         this.indexMetadataUploadListeners = indexMetadataUploadListeners;
     }
 
-    private BlobStoreTransferService getBlobStoreTransferService() {
-        if (blobStoreTransferService == null) {
-            blobStoreTransferService = new BlobStoreTransferService(blobStoreRepository.blobStore(), threadpool);
-        }
-        return blobStoreTransferService;
-    }
-
     /**
      * This method uploads entire cluster state metadata to the configured blob store. For now only index metadata upload is supported. This method should be
      * invoked by the elected cluster manager when the remote cluster state is enabled.
@@ -417,7 +410,6 @@ public ClusterMetadataManifest writeIncrementalMetadata(
                 : previousManifest.getCustomMetadataMap(),
             false
         );
-        deleteStaleClusterMetadata(clusterState.getClusterName().value(), clusterState.metadata().clusterUUID(), RETAINED_MANIFESTS);
 
         final long durationMillis = TimeValue.nsecToMSec(relativeTimeNanosSupplier.getAsLong() - startTimeNanos);
         remoteStateStats.stateSucceeded();
@@ -721,6 +713,10 @@ private CheckedRunnable<IOException> getAsyncMetadataWriteAction(
         );
     }
 
+    public RemoteClusterStateCleanupManager getCleanupManager() {
+        return remoteClusterStateCleanupManager;
+    }
+
     @Nullable
     public ClusterMetadataManifest markLastStateAsCommitted(ClusterState clusterState, ClusterMetadataManifest previousManifest)
         throws IOException {
@@ -740,12 +736,16 @@ public ClusterMetadataManifest markLastStateAsCommitted(ClusterState clusterStat
             previousManifest.getCustomMetadataMap(),
             true
         );
-        deleteStaleClusterUUIDs(clusterState, committedManifest);
+        if (!previousManifest.isClusterUUIDCommitted() && committedManifest.isClusterUUIDCommitted()) {
+            remoteClusterStateCleanupManager.deleteStaleClusterUUIDs(clusterState, committedManifest);
+        }
+
         return committedManifest;
     }
 
     @Override
     public void close() throws IOException {
+        remoteClusterStateCleanupManager.close();
         if (blobStoreRepository != null) {
             IOUtils.close(blobStoreRepository);
         }
@@ -760,6 +760,7 @@ public void start() {
         final Repository repository = repositoriesService.get().repository(remoteStoreRepo);
         assert repository instanceof BlobStoreRepository : "Repository should be instance of BlobStoreRepository";
         blobStoreRepository = (BlobStoreRepository) repository;
+        remoteClusterStateCleanupManager.start();
     }
 
     private ClusterMetadataManifest uploadManifest(
@@ -850,6 +851,14 @@ private void writeMetadataManifest(String clusterName, String clusterUUID, Clust
         );
     }
 
+    ThreadPool getThreadpool() {
+        return threadpool;
+    }
+
+    BlobStore getBlobStore() {
+        return blobStoreRepository.blobStore();
+    }
+
     private BlobContainer indexMetadataContainer(String clusterName, String clusterUUID, String indexUUID) {
         // 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/index/ftqsCnn9TgOX
         return blobStoreRepository.blobStore()
@@ -867,7 +876,7 @@ private BlobContainer manifestContainer(String clusterName, String clusterUUID)
         return blobStoreRepository.blobStore().blobContainer(getManifestFolderPath(clusterName, clusterUUID));
     }
 
-    private BlobPath getCusterMetadataBasePath(String clusterName, String clusterUUID) {
+    BlobPath getCusterMetadataBasePath(String clusterName, String clusterUUID) {
         return blobStoreRepository.basePath().add(encodeString(clusterName)).add(CLUSTER_STATE_PATH_TOKEN).add(clusterUUID);
     }
 
@@ -982,7 +991,7 @@ private static String metadataAttributeFileName(String componentPrefix, Long met
         );
     }
 
-    private BlobPath getManifestFolderPath(String clusterName, String clusterUUID) {
+    BlobPath getManifestFolderPath(String clusterName, String clusterUUID) {
         return getCusterMetadataBasePath(clusterName, clusterUUID).add(MANIFEST_PATH_TOKEN);
     }
 
@@ -1235,7 +1244,7 @@ public String getLastKnownUUIDFromRemote(String clusterName) {
         }
     }
 
-    private Set<String> getAllClusterUUIDs(String clusterName) throws IOException {
+    Set<String> getAllClusterUUIDs(String clusterName) throws IOException {
         Map<String, BlobContainer> clusterUUIDMetadata = clusterUUIDContainer(clusterName).children();
         if (clusterUUIDMetadata == null) {
             return Collections.emptySet();
@@ -1426,7 +1435,7 @@ private Optional<String> getLatestManifestFileName(String clusterName, String cl
      * @param clusterName name of the cluster
      * @return ClusterMetadataManifest
      */
-    private ClusterMetadataManifest fetchRemoteClusterMetadataManifest(String clusterName, String clusterUUID, String filename)
+    ClusterMetadataManifest fetchRemoteClusterMetadataManifest(String clusterName, String clusterUUID, String filename)
         throws IllegalStateException {
         try {
             return getClusterMetadataManifestBlobStoreFormat(filename).read(
@@ -1486,234 +1495,6 @@ public RemoteStateTransferException(String errorDesc, Throwable cause) {
         }
     }
 
-    /**
-     * Purges all remote cluster state against provided cluster UUIDs
-     *
-     * @param clusterName  name of the cluster
-     * @param clusterUUIDs clusteUUIDs for which the remote state needs to be purged
-     */
-    void deleteStaleUUIDsClusterMetadata(String clusterName, List<String> clusterUUIDs) {
-        clusterUUIDs.forEach(clusterUUID -> {
-            getBlobStoreTransferService().deleteAsync(
-                ThreadPool.Names.REMOTE_PURGE,
-                getCusterMetadataBasePath(clusterName, clusterUUID),
-                new ActionListener<>() {
-                    @Override
-                    public void onResponse(Void unused) {
-                        logger.info("Deleted all remote cluster metadata for cluster UUID - {}", clusterUUID);
-                    }
-
-                    @Override
-                    public void onFailure(Exception e) {
-                        logger.error(
-                            new ParameterizedMessage(
-                                "Exception occurred while deleting all remote cluster metadata for cluster UUID {}",
-                                clusterUUID
-                            ),
-                            e
-                        );
-                        remoteStateStats.cleanUpAttemptFailed();
-                    }
-                }
-            );
-        });
-    }
-
-    /**
-     * Deletes older than last {@code versionsToRetain} manifests. Also cleans up unreferenced IndexMetadata associated with older manifests
-     *
-     * @param clusterName       name of the cluster
-     * @param clusterUUID       uuid of cluster state to refer to in remote
-     * @param manifestsToRetain no of latest manifest files to keep in remote
-     */
-    // package private for testing
-    void deleteStaleClusterMetadata(String clusterName, String clusterUUID, int manifestsToRetain) {
-        if (deleteStaleMetadataRunning.compareAndSet(false, true) == false) {
-            logger.info("Delete stale cluster metadata task is already in progress.");
-            return;
-        }
-        try {
-            getBlobStoreTransferService().listAllInSortedOrderAsync(
-                ThreadPool.Names.REMOTE_PURGE,
-                getManifestFolderPath(clusterName, clusterUUID),
-                "manifest",
-                Integer.MAX_VALUE,
-                new ActionListener<>() {
-                    @Override
-                    public void onResponse(List<BlobMetadata> blobMetadata) {
-                        if (blobMetadata.size() > manifestsToRetain) {
-                            deleteClusterMetadata(
-                                clusterName,
-                                clusterUUID,
-                                blobMetadata.subList(0, manifestsToRetain - 1),
-                                blobMetadata.subList(manifestsToRetain - 1, blobMetadata.size())
-                            );
-                        }
-                        deleteStaleMetadataRunning.set(false);
-                    }
-
-                    @Override
-                    public void onFailure(Exception e) {
-                        logger.error(
-                            new ParameterizedMessage(
-                                "Exception occurred while deleting Remote Cluster Metadata for clusterUUIDs {}",
-                                clusterUUID
-                            )
-                        );
-                        deleteStaleMetadataRunning.set(false);
-                    }
-                }
-            );
-        } catch (Exception e) {
-            deleteStaleMetadataRunning.set(false);
-            throw e;
-        }
-    }
-
-    private void deleteClusterMetadata(
-        String clusterName,
-        String clusterUUID,
-        List<BlobMetadata> activeManifestBlobMetadata,
-        List<BlobMetadata> staleManifestBlobMetadata
-    ) {
-        try {
-            Set<String> filesToKeep = new HashSet<>();
-            Set<String> staleManifestPaths = new HashSet<>();
-            Set<String> staleIndexMetadataPaths = new HashSet<>();
-            Set<String> staleGlobalMetadataPaths = new HashSet<>();
-            activeManifestBlobMetadata.forEach(blobMetadata -> {
-                ClusterMetadataManifest clusterMetadataManifest = fetchRemoteClusterMetadataManifest(
-                    clusterName,
-                    clusterUUID,
-                    blobMetadata.name()
-                );
-                clusterMetadataManifest.getIndices()
-                    .forEach(uploadedIndexMetadata -> filesToKeep.add(uploadedIndexMetadata.getUploadedFilename()));
-                if (clusterMetadataManifest.getGlobalMetadataFileName() != null) {
-                    filesToKeep.add(clusterMetadataManifest.getGlobalMetadataFileName());
-                } else {
-                    filesToKeep.add(clusterMetadataManifest.getCoordinationMetadata().getUploadedFilename());
-                    filesToKeep.add(clusterMetadataManifest.getTemplatesMetadata().getUploadedFilename());
-                    filesToKeep.add(clusterMetadataManifest.getSettingsMetadata().getUploadedFilename());
-                    clusterMetadataManifest.getCustomMetadataMap()
-                        .forEach((key, value) -> { filesToKeep.add(value.getUploadedFilename()); });
-                }
-            });
-            staleManifestBlobMetadata.forEach(blobMetadata -> {
-                ClusterMetadataManifest clusterMetadataManifest = fetchRemoteClusterMetadataManifest(
-                    clusterName,
-                    clusterUUID,
-                    blobMetadata.name()
-                );
-                staleManifestPaths.add(new BlobPath().add(MANIFEST_PATH_TOKEN).buildAsString() + blobMetadata.name());
-                if (clusterMetadataManifest.getGlobalMetadataFileName() != null) {
-                    if (filesToKeep.contains(clusterMetadataManifest.getGlobalMetadataFileName()) == false) {
-                        String[] globalMetadataSplitPath = clusterMetadataManifest.getGlobalMetadataFileName().split("/");
-                        staleGlobalMetadataPaths.add(
-                            new BlobPath().add(GLOBAL_METADATA_PATH_TOKEN).buildAsString() + GLOBAL_METADATA_FORMAT.blobName(
-                                globalMetadataSplitPath[globalMetadataSplitPath.length - 1]
-                            )
-                        );
-                    }
-                } else {
-                    if (filesToKeep.contains(clusterMetadataManifest.getCoordinationMetadata().getUploadedFilename()) == false) {
-                        String[] coordinationMetadataSplitPath = clusterMetadataManifest.getCoordinationMetadata()
-                            .getUploadedFilename()
-                            .split("/");
-                        staleGlobalMetadataPaths.add(
-                            new BlobPath().add(GLOBAL_METADATA_PATH_TOKEN).buildAsString() + GLOBAL_METADATA_FORMAT.blobName(
-                                coordinationMetadataSplitPath[coordinationMetadataSplitPath.length - 1]
-                            )
-                        );
-                    }
-                    if (filesToKeep.contains(clusterMetadataManifest.getTemplatesMetadata().getUploadedFilename()) == false) {
-                        String[] templatesMetadataSplitPath = clusterMetadataManifest.getTemplatesMetadata()
-                            .getUploadedFilename()
-                            .split("/");
-                        staleGlobalMetadataPaths.add(
-                            new BlobPath().add(GLOBAL_METADATA_PATH_TOKEN).buildAsString() + GLOBAL_METADATA_FORMAT.blobName(
-                                templatesMetadataSplitPath[templatesMetadataSplitPath.length - 1]
-                            )
-                        );
-                    }
-                    if (filesToKeep.contains(clusterMetadataManifest.getSettingsMetadata().getUploadedFilename()) == false) {
-                        String[] settingsMetadataSplitPath = clusterMetadataManifest.getSettingsMetadata().getUploadedFilename().split("/");
-                        staleGlobalMetadataPaths.add(
-                            new BlobPath().add(GLOBAL_METADATA_PATH_TOKEN).buildAsString() + GLOBAL_METADATA_FORMAT.blobName(
-                                settingsMetadataSplitPath[settingsMetadataSplitPath.length - 1]
-                            )
-                        );
-                    }
-                    clusterMetadataManifest.getCustomMetadataMap().forEach((key, value) -> {
-                        if (filesToKeep.contains(value.getUploadedFilename()) == false) {
-                            String[] customMetadataSplitPath = value.getUploadedFilename().split("/");
-                            staleGlobalMetadataPaths.add(
-                                new BlobPath().add(GLOBAL_METADATA_PATH_TOKEN).buildAsString() + GLOBAL_METADATA_FORMAT.blobName(
-                                    customMetadataSplitPath[customMetadataSplitPath.length - 1]
-                                )
-                            );
-                        }
-                    });
-                }
-
-                clusterMetadataManifest.getIndices().forEach(uploadedIndexMetadata -> {
-                    if (filesToKeep.contains(uploadedIndexMetadata.getUploadedFilename()) == false) {
-                        staleIndexMetadataPaths.add(
-                            new BlobPath().add(INDEX_PATH_TOKEN).add(uploadedIndexMetadata.getIndexUUID()).buildAsString()
-                                + INDEX_METADATA_FORMAT.blobName(uploadedIndexMetadata.getUploadedFilename())
-                        );
-                    }
-                });
-            });
-
-            if (staleManifestPaths.isEmpty()) {
-                logger.debug("No stale Remote Cluster Metadata files found");
-                return;
-            }
-
-            deleteStalePaths(clusterName, clusterUUID, new ArrayList<>(staleGlobalMetadataPaths));
-            deleteStalePaths(clusterName, clusterUUID, new ArrayList<>(staleIndexMetadataPaths));
-            deleteStalePaths(clusterName, clusterUUID, new ArrayList<>(staleManifestPaths));
-        } catch (IllegalStateException e) {
-            logger.error("Error while fetching Remote Cluster Metadata manifests", e);
-        } catch (IOException e) {
-            logger.error("Error while deleting stale Remote Cluster Metadata files", e);
-            remoteStateStats.cleanUpAttemptFailed();
-        } catch (Exception e) {
-            logger.error("Unexpected error while deleting stale Remote Cluster Metadata files", e);
-            remoteStateStats.cleanUpAttemptFailed();
-        }
-    }
-
-    private void deleteStalePaths(String clusterName, String clusterUUID, List<String> stalePaths) throws IOException {
-        logger.debug(String.format(Locale.ROOT, "Deleting stale files from remote - %s", stalePaths));
-        getBlobStoreTransferService().deleteBlobs(getCusterMetadataBasePath(clusterName, clusterUUID), stalePaths);
-    }
-
-    /**
-     * Purges all remote cluster state against provided cluster UUIDs
-     *
-     * @param clusterState      current state of the cluster
-     * @param committedManifest last committed ClusterMetadataManifest
-     */
-    public void deleteStaleClusterUUIDs(ClusterState clusterState, ClusterMetadataManifest committedManifest) {
-        threadpool.executor(ThreadPool.Names.REMOTE_PURGE).execute(() -> {
-            String clusterName = clusterState.getClusterName().value();
-            logger.debug("Deleting stale cluster UUIDs data from remote [{}]", clusterName);
-            Set<String> allClustersUUIDsInRemote;
-            try {
-                allClustersUUIDsInRemote = new HashSet<>(getAllClusterUUIDs(clusterState.getClusterName().value()));
-            } catch (IOException e) {
-                logger.info(String.format(Locale.ROOT, "Error while fetching all cluster UUIDs for [%s]", clusterName));
-                return;
-            }
-            // Retain last 2 cluster uuids data
-            allClustersUUIDsInRemote.remove(committedManifest.getClusterUUID());
-            allClustersUUIDsInRemote.remove(committedManifest.getPreviousClusterUUID());
-            deleteStaleUUIDsClusterMetadata(clusterName, new ArrayList<>(allClustersUUIDsInRemote));
-        });
-    }
-
     public RemotePersistenceStats getStats() {
         return remoteStateStats;
     }
diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java
index 9462aeddbd0e4..04bd31e6a5809 100644
--- a/server/src/main/java/org/opensearch/node/Node.java
+++ b/server/src/main/java/org/opensearch/node/Node.java
@@ -138,6 +138,7 @@
 import org.opensearch.gateway.MetaStateService;
 import org.opensearch.gateway.PersistedClusterStateService;
 import org.opensearch.gateway.ShardsBatchGatewayAllocator;
+import org.opensearch.gateway.remote.RemoteClusterStateCleanupManager;
 import org.opensearch.gateway.remote.RemoteClusterStateService;
 import org.opensearch.http.HttpServerTransport;
 import org.opensearch.identity.IdentityService;
@@ -751,6 +752,7 @@ protected Node(
                 threadPool::relativeTimeInMillis
             );
             final RemoteClusterStateService remoteClusterStateService;
+            final RemoteClusterStateCleanupManager remoteClusterStateCleanupManager;
             final RemoteIndexPathUploader remoteIndexPathUploader;
             if (isRemoteStoreClusterStateEnabled(settings)) {
                 remoteIndexPathUploader = new RemoteIndexPathUploader(
@@ -763,14 +765,16 @@ protected Node(
                     nodeEnvironment.nodeId(),
                     repositoriesServiceReference::get,
                     settings,
-                    clusterService.getClusterSettings(),
+                    clusterService,
                     threadPool::preciseRelativeTimeInNanos,
                     threadPool,
                     List.of(remoteIndexPathUploader)
                 );
+                remoteClusterStateCleanupManager = remoteClusterStateService.getCleanupManager();
             } else {
                 remoteClusterStateService = null;
                 remoteIndexPathUploader = null;
+                remoteClusterStateCleanupManager = null;
             }
 
             // collect engine factory providers from plugins
@@ -1374,6 +1378,7 @@ protected Node(
                 b.bind(MetricsRegistry.class).toInstance(metricsRegistry);
                 b.bind(RemoteClusterStateService.class).toProvider(() -> remoteClusterStateService);
                 b.bind(RemoteIndexPathUploader.class).toProvider(() -> remoteIndexPathUploader);
+                b.bind(RemoteClusterStateCleanupManager.class).toProvider(() -> remoteClusterStateCleanupManager);
                 b.bind(PersistedStateRegistry.class).toInstance(persistedStateRegistry);
                 b.bind(SegmentReplicationStatsTracker.class).toInstance(segmentReplicationStatsTracker);
                 b.bind(SearchRequestOperationsCompositeListenerFactory.class).toInstance(searchRequestOperationsCompositeListenerFactory);
diff --git a/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java b/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java
index 3ba98c44f8d3e..418e6d8de6adb 100644
--- a/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java
+++ b/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java
@@ -462,9 +462,7 @@ public void testDataOnlyNodePersistence() throws Exception {
             });
             when(transportService.getThreadPool()).thenReturn(threadPool);
             ClusterService clusterService = mock(ClusterService.class);
-            when(clusterService.getClusterSettings()).thenReturn(
-                new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)
-            );
+            when(clusterService.getClusterSettings()).thenReturn(new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS));
             final PersistedClusterStateService persistedClusterStateService = new PersistedClusterStateService(
                 nodeEnvironment,
                 xContentRegistry(),
@@ -487,7 +485,7 @@ public void testDataOnlyNodePersistence() throws Exception {
                         nodeEnvironment.nodeId(),
                         repositoriesServiceSupplier,
                         settings,
-                        clusterSettings,
+                        clusterService,
                         () -> 0L,
                         threadPool,
                         List.of(new RemoteIndexPathUploader(threadPool, settings, repositoriesServiceSupplier, clusterSettings))
diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManagerTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManagerTests.java
new file mode 100644
index 0000000000000..24fd1b164a4ff
--- /dev/null
+++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManagerTests.java
@@ -0,0 +1,446 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.gateway.remote;
+
+import org.opensearch.cluster.ClusterName;
+import org.opensearch.cluster.ClusterState;
+import org.opensearch.cluster.metadata.Metadata;
+import org.opensearch.cluster.node.DiscoveryNodes;
+import org.opensearch.cluster.service.ClusterApplierService;
+import org.opensearch.cluster.service.ClusterService;
+import org.opensearch.common.blobstore.BlobContainer;
+import org.opensearch.common.blobstore.BlobMetadata;
+import org.opensearch.common.blobstore.BlobPath;
+import org.opensearch.common.blobstore.BlobStore;
+import org.opensearch.common.blobstore.support.PlainBlobMetadata;
+import org.opensearch.common.settings.ClusterSettings;
+import org.opensearch.common.settings.Settings;
+import org.opensearch.common.util.concurrent.AbstractAsyncTask;
+import org.opensearch.core.action.ActionListener;
+import org.opensearch.repositories.RepositoriesService;
+import org.opensearch.repositories.blobstore.BlobStoreRepository;
+import org.opensearch.repositories.fs.FsRepository;
+import org.opensearch.test.OpenSearchTestCase;
+import org.opensearch.test.VersionUtils;
+import org.opensearch.threadpool.TestThreadPool;
+import org.opensearch.threadpool.ThreadPool;
+import org.junit.After;
+import org.junit.Before;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
+
+import static org.opensearch.gateway.remote.ClusterMetadataManifest.CODEC_V1;
+import static org.opensearch.gateway.remote.ClusterMetadataManifest.CODEC_V2;
+import static org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata;
+import static org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadataAttribute;
+import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.AsyncStaleFileDeletion;
+import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.CLUSTER_STATE_CLEANUP_INTERVAL_DEFAULT;
+import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING;
+import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.RETAINED_MANIFESTS;
+import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.SKIP_CLEANUP_STATE_CHANGES;
+import static org.opensearch.gateway.remote.RemoteClusterStateService.CLUSTER_STATE_PATH_TOKEN;
+import static org.opensearch.gateway.remote.RemoteClusterStateService.COORDINATION_METADATA;
+import static org.opensearch.gateway.remote.RemoteClusterStateService.DELIMITER;
+import static org.opensearch.gateway.remote.RemoteClusterStateService.GLOBAL_METADATA_PATH_TOKEN;
+import static org.opensearch.gateway.remote.RemoteClusterStateService.INDEX_PATH_TOKEN;
+import static org.opensearch.gateway.remote.RemoteClusterStateService.MANIFEST_FILE_PREFIX;
+import static org.opensearch.gateway.remote.RemoteClusterStateService.MANIFEST_PATH_TOKEN;
+import static org.opensearch.gateway.remote.RemoteClusterStateService.SETTING_METADATA;
+import static org.opensearch.gateway.remote.RemoteClusterStateService.TEMPLATES_METADATA;
+import static org.opensearch.gateway.remote.RemoteClusterStateService.encodeString;
+import static org.opensearch.gateway.remote.RemoteClusterStateServiceTests.generateClusterStateWithOneIndex;
+import static org.opensearch.gateway.remote.RemoteClusterStateServiceTests.nodesWithLocalNodeClusterManager;
+import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY;
+import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX;
+import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class RemoteClusterStateCleanupManagerTests extends OpenSearchTestCase {
+    private RemoteClusterStateCleanupManager remoteClusterStateCleanupManager;
+    private Supplier<RepositoriesService> repositoriesServiceSupplier;
+    private RepositoriesService repositoriesService;
+    private BlobStoreRepository blobStoreRepository;
+    private BlobStore blobStore;
+    private ClusterSettings clusterSettings;
+    private ClusterApplierService clusterApplierService;
+    private ClusterState clusterState;
+    private Metadata metadata;
+    private RemoteClusterStateService remoteClusterStateService;
+    private final ThreadPool threadPool = new TestThreadPool(getClass().getName());
+
+    @Before
+    public void setup() {
+        repositoriesServiceSupplier = mock(Supplier.class);
+        repositoriesService = mock(RepositoriesService.class);
+        when(repositoriesServiceSupplier.get()).thenReturn(repositoriesService);
+
+        String stateRepoTypeAttributeKey = String.format(
+            Locale.getDefault(),
+            "node.attr." + REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT,
+            "remote_store_repository"
+        );
+        String stateRepoSettingsAttributeKeyPrefix = String.format(
+            Locale.getDefault(),
+            "node.attr." + REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX,
+            "remote_store_repository"
+        );
+
+        Settings settings = Settings.builder()
+            .put("node.attr." + REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY, "remote_store_repository")
+            .put(stateRepoTypeAttributeKey, FsRepository.TYPE)
+            .put(stateRepoSettingsAttributeKeyPrefix + "location", "randomRepoPath")
+            .put(RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true)
+            .build();
+
+        clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
+        clusterApplierService = mock(ClusterApplierService.class);
+        clusterState = mock(ClusterState.class);
+        metadata = mock(Metadata.class);
+        ClusterService clusterService = mock(ClusterService.class);
+        when(clusterService.getClusterSettings()).thenReturn(clusterSettings);
+        when(clusterState.getClusterName()).thenReturn(new ClusterName("test"));
+        when(metadata.clusterUUID()).thenReturn("testUUID");
+        when(clusterState.metadata()).thenReturn(metadata);
+        when(clusterApplierService.state()).thenReturn(clusterState);
+        when(clusterService.getClusterApplierService()).thenReturn(clusterApplierService);
+
+        blobStoreRepository = mock(BlobStoreRepository.class);
+        blobStore = mock(BlobStore.class);
+        when(blobStoreRepository.blobStore()).thenReturn(blobStore);
+        when(repositoriesService.repository("remote_store_repository")).thenReturn(blobStoreRepository);
+
+        remoteClusterStateService = mock(RemoteClusterStateService.class);
+        when(remoteClusterStateService.getStats()).thenReturn(new RemotePersistenceStats());
+        when(remoteClusterStateService.getThreadpool()).thenReturn(threadPool);
+        when(remoteClusterStateService.getBlobStore()).thenReturn(blobStore);
+        remoteClusterStateCleanupManager = new RemoteClusterStateCleanupManager(remoteClusterStateService, clusterService);
+    }
+
+    @After
+    public void teardown() throws Exception {
+        super.tearDown();
+        remoteClusterStateCleanupManager.close();
+        threadPool.shutdown();
+    }
+
+    public void testDeleteClusterMetadata() throws IOException {
+        String clusterUUID = "clusterUUID";
+        String clusterName = "test-cluster";
+        List<BlobMetadata> inactiveBlobs = Arrays.asList(
+            new PlainBlobMetadata("manifest1.dat", 1L),
+            new PlainBlobMetadata("manifest2.dat", 1L),
+            new PlainBlobMetadata("manifest3.dat", 1L)
+        );
+        List<BlobMetadata> activeBlobs = Arrays.asList(
+            new PlainBlobMetadata("manifest4.dat", 1L),
+            new PlainBlobMetadata("manifest5.dat", 1L)
+        );
+        UploadedIndexMetadata index1Metadata = new UploadedIndexMetadata("index1", "indexUUID1", "index_metadata1");
+        UploadedIndexMetadata index2Metadata = new UploadedIndexMetadata("index2", "indexUUID2", "index_metadata2");
+        UploadedIndexMetadata index1UpdatedMetadata = new UploadedIndexMetadata("index1", "indexUUID1", "index_metadata1_updated");
+        UploadedMetadataAttribute coordinationMetadata = new UploadedMetadataAttribute(COORDINATION_METADATA, "coordination_metadata");
+        UploadedMetadataAttribute templateMetadata = new UploadedMetadataAttribute(TEMPLATES_METADATA, "template_metadata");
+        UploadedMetadataAttribute settingMetadata = new UploadedMetadataAttribute(SETTING_METADATA, "settings_metadata");
+        UploadedMetadataAttribute coordinationMetadataUpdated = new UploadedMetadataAttribute(
+            COORDINATION_METADATA,
+            "coordination_metadata_updated"
+        );
+        UploadedMetadataAttribute templateMetadataUpdated = new UploadedMetadataAttribute(TEMPLATES_METADATA, "template_metadata_updated");
+        UploadedMetadataAttribute settingMetadataUpdated = new UploadedMetadataAttribute(SETTING_METADATA, "settings_metadata_updated");
+        ClusterMetadataManifest manifest1 = ClusterMetadataManifest.builder()
+            .indices(List.of(index1Metadata))
+            .globalMetadataFileName("global_metadata")
+            .clusterTerm(1L)
+            .stateVersion(1L)
+            .codecVersion(CODEC_V1)
+            .stateUUID(randomAlphaOfLength(10))
+            .clusterUUID(clusterUUID)
+            .nodeId("nodeA")
+            .opensearchVersion(VersionUtils.randomOpenSearchVersion(random()))
+            .previousClusterUUID(ClusterState.UNKNOWN_UUID)
+            .committed(true)
+            .build();
+        ClusterMetadataManifest manifest2 = ClusterMetadataManifest.builder(manifest1)
+            .indices(List.of(index1Metadata, index2Metadata))
+            .codecVersion(CODEC_V2)
+            .globalMetadataFileName(null)
+            .coordinationMetadata(coordinationMetadata)
+            .templatesMetadata(templateMetadata)
+            .settingMetadata(settingMetadata)
+            .build();
+        ClusterMetadataManifest manifest3 = ClusterMetadataManifest.builder(manifest2)
+            .indices(List.of(index1UpdatedMetadata, index2Metadata))
+            .settingMetadata(settingMetadataUpdated)
+            .build();
+
+        // active manifest have reference to index1Updated, index2, settingsUpdated, coordinationUpdated, templates, templatesUpdated
+        ClusterMetadataManifest manifest4 = ClusterMetadataManifest.builder(manifest3)
+            .coordinationMetadata(coordinationMetadataUpdated)
+            .build();
+        ClusterMetadataManifest manifest5 = ClusterMetadataManifest.builder(manifest4).templatesMetadata(templateMetadataUpdated).build();
+
+        when(remoteClusterStateService.fetchRemoteClusterMetadataManifest(eq(clusterName), eq(clusterUUID), any())).thenReturn(
+            manifest4,
+            manifest5,
+            manifest1,
+            manifest2,
+            manifest3
+        );
+        BlobContainer container = mock(BlobContainer.class);
+        when(blobStore.blobContainer(any())).thenReturn(container);
+        doNothing().when(container).deleteBlobsIgnoringIfNotExists(any());
+
+        remoteClusterStateCleanupManager.deleteClusterMetadata(clusterName, clusterUUID, activeBlobs, inactiveBlobs);
+        verify(container).deleteBlobsIgnoringIfNotExists(
+            List.of(
+                new BlobPath().add(GLOBAL_METADATA_PATH_TOKEN).buildAsString() + coordinationMetadata.getUploadedFilename() + ".dat",
+                new BlobPath().add(GLOBAL_METADATA_PATH_TOKEN).buildAsString() + settingMetadata.getUploadedFilename() + ".dat",
+                new BlobPath().add(GLOBAL_METADATA_PATH_TOKEN).buildAsString() + "global_metadata.dat"
+            )
+        );
+        verify(container).deleteBlobsIgnoringIfNotExists(
+            List.of(
+                new BlobPath().add(INDEX_PATH_TOKEN).add(index1Metadata.getIndexUUID()).buildAsString()
+                    + index1Metadata.getUploadedFilePath()
+                    + ".dat"
+            )
+        );
+        Set<String> staleManifest = new HashSet<>();
+        inactiveBlobs.forEach(blob -> staleManifest.add(new BlobPath().add(MANIFEST_PATH_TOKEN).buildAsString() + blob.name()));
+        verify(container).deleteBlobsIgnoringIfNotExists(new ArrayList<>(staleManifest));
+    }
+
+    public void testDeleteStaleClusterUUIDs() throws IOException {
+        final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build();
+        ClusterMetadataManifest clusterMetadataManifest = ClusterMetadataManifest.builder()
+            .indices(List.of())
+            .clusterTerm(1L)
+            .stateVersion(1L)
+            .stateUUID(randomAlphaOfLength(10))
+            .clusterUUID("cluster-uuid1")
+            .nodeId("nodeA")
+            .opensearchVersion(VersionUtils.randomOpenSearchVersion(random()))
+            .previousClusterUUID(ClusterState.UNKNOWN_UUID)
+            .committed(true)
+            .build();
+
+        BlobPath blobPath = new BlobPath().add("random-path");
+        BlobContainer uuidContainerContainer = mock(BlobContainer.class);
+        BlobContainer manifest2Container = mock(BlobContainer.class);
+        BlobContainer manifest3Container = mock(BlobContainer.class);
+        when(blobStore.blobContainer(any())).then(invocation -> {
+            BlobPath blobPath1 = invocation.getArgument(0);
+            if (blobPath1.buildAsString().endsWith("cluster-state/")) {
+                return uuidContainerContainer;
+            } else if (blobPath1.buildAsString().contains("cluster-state/cluster-uuid2/")) {
+                return manifest2Container;
+            } else if (blobPath1.buildAsString().contains("cluster-state/cluster-uuid3/")) {
+                return manifest3Container;
+            } else {
+                throw new IllegalArgumentException("Unexpected blob path " + blobPath1);
+            }
+        });
+        when(
+            manifest2Container.listBlobsByPrefixInSortedOrder(
+                MANIFEST_FILE_PREFIX + DELIMITER,
+                Integer.MAX_VALUE,
+                BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC
+            )
+        ).thenReturn(List.of(new PlainBlobMetadata("mainfest2", 1L)));
+        when(
+            manifest3Container.listBlobsByPrefixInSortedOrder(
+                MANIFEST_FILE_PREFIX + DELIMITER,
+                Integer.MAX_VALUE,
+                BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC
+            )
+        ).thenReturn(List.of(new PlainBlobMetadata("mainfest3", 1L)));
+        Set<String> uuids = new HashSet<>(Arrays.asList("cluster-uuid1", "cluster-uuid2", "cluster-uuid3"));
+        when(remoteClusterStateService.getAllClusterUUIDs(any())).thenReturn(uuids);
+        when(remoteClusterStateService.getCusterMetadataBasePath(any(), any())).then(
+            invocationOnMock -> blobPath.add(encodeString(invocationOnMock.getArgument(0)))
+                .add(CLUSTER_STATE_PATH_TOKEN)
+                .add((String) invocationOnMock.getArgument(1))
+        );
+        remoteClusterStateCleanupManager.start();
+        remoteClusterStateCleanupManager.deleteStaleClusterUUIDs(clusterState, clusterMetadataManifest);
+        try {
+            assertBusy(() -> {
+                verify(manifest2Container, times(1)).delete();
+                verify(manifest3Container, times(1)).delete();
+            });
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public void testRemoteStateCleanupFailureStats() throws IOException {
+        BlobContainer blobContainer = mock(BlobContainer.class);
+        doThrow(IOException.class).when(blobContainer).delete();
+        when(blobStore.blobContainer(any())).thenReturn(blobContainer);
+        BlobPath blobPath = new BlobPath().add("random-path");
+        when((blobStoreRepository.basePath())).thenReturn(blobPath);
+        remoteClusterStateCleanupManager.start();
+        remoteClusterStateCleanupManager.deleteStaleUUIDsClusterMetadata("cluster1", List.of("cluster-uuid1"));
+        try {
+            assertBusy(() -> {
+                // wait for stats to get updated
+                assertNotNull(remoteClusterStateCleanupManager.getStats());
+                assertEquals(0, remoteClusterStateCleanupManager.getStats().getSuccessCount());
+                assertEquals(1, remoteClusterStateCleanupManager.getStats().getCleanupAttemptFailedCount());
+            });
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public void testSingleConcurrentExecutionOfStaleManifestCleanup() throws Exception {
+        BlobContainer blobContainer = mock(BlobContainer.class);
+        when(blobStore.blobContainer(any())).thenReturn(blobContainer);
+
+        CountDownLatch latch = new CountDownLatch(1);
+        AtomicInteger callCount = new AtomicInteger(0);
+        doAnswer(invocation -> {
+            callCount.incrementAndGet();
+            if (latch.await(5000, TimeUnit.SECONDS) == false) {
+                throw new Exception("Timed out waiting for delete task queuing to complete");
+            }
+            return null;
+        }).when(blobContainer)
+            .listBlobsByPrefixInSortedOrder(
+                any(String.class),
+                any(int.class),
+                any(BlobContainer.BlobNameSortOrder.class),
+                any(ActionListener.class)
+            );
+
+        remoteClusterStateCleanupManager.start();
+        remoteClusterStateCleanupManager.deleteStaleClusterMetadata("cluster-name", "cluster-uuid", RETAINED_MANIFESTS);
+        remoteClusterStateCleanupManager.deleteStaleClusterMetadata("cluster-name", "cluster-uuid", RETAINED_MANIFESTS);
+
+        latch.countDown();
+        assertBusy(() -> assertEquals(1, callCount.get()));
+    }
+
+    public void testRemoteClusterStateCleanupSetting() {
+        remoteClusterStateCleanupManager.start();
+        // verify default value
+        assertEquals(CLUSTER_STATE_CLEANUP_INTERVAL_DEFAULT, remoteClusterStateCleanupManager.getStaleFileCleanupInterval());
+
+        // verify update interval
+        int cleanupInterval = randomIntBetween(1, 10);
+        Settings newSettings = Settings.builder().put("cluster.remote_store.state.cleanup_interval", cleanupInterval + "s").build();
+        clusterSettings.applySettings(newSettings);
+        assertEquals(cleanupInterval, remoteClusterStateCleanupManager.getStaleFileCleanupInterval().seconds());
+    }
+
+    public void testRemoteCleanupTaskScheduled() {
+        AbstractAsyncTask cleanupTask = remoteClusterStateCleanupManager.getStaleFileDeletionTask();
+        assertNull(cleanupTask);
+        // now the task should be initialized
+        remoteClusterStateCleanupManager.start();
+        assertNotNull(remoteClusterStateCleanupManager.getStaleFileDeletionTask());
+        assertTrue(remoteClusterStateCleanupManager.getStaleFileDeletionTask().mustReschedule());
+        assertEquals(
+            clusterSettings.get(REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING),
+            remoteClusterStateCleanupManager.getStaleFileDeletionTask().getInterval()
+        );
+        assertTrue(remoteClusterStateCleanupManager.getStaleFileDeletionTask().isScheduled());
+        assertFalse(remoteClusterStateCleanupManager.getStaleFileDeletionTask().isClosed());
+    }
+
+    public void testRemoteCleanupSkipsOnOnlyElectedClusterManager() {
+        DiscoveryNodes nodes = mock(DiscoveryNodes.class);
+        when(nodes.isLocalNodeElectedClusterManager()).thenReturn(false);
+        when(clusterState.nodes()).thenReturn(nodes);
+        RemoteClusterStateCleanupManager spyManager = spy(remoteClusterStateCleanupManager);
+        AtomicInteger callCount = new AtomicInteger(0);
+        doAnswer(invocation -> callCount.incrementAndGet()).when(spyManager).deleteStaleClusterMetadata(any(), any(), anyInt());
+        spyManager.cleanUpStaleFiles();
+        assertEquals(0, callCount.get());
+
+        when(nodes.isLocalNodeElectedClusterManager()).thenReturn(true);
+        when(clusterState.version()).thenReturn(randomLongBetween(11, 20));
+        spyManager.cleanUpStaleFiles();
+        assertEquals(1, callCount.get());
+    }
+
+    public void testRemoteCleanupSkipsIfVersionIncrementLessThanThreshold() {
+        DiscoveryNodes nodes = mock(DiscoveryNodes.class);
+        long version = randomLongBetween(1, SKIP_CLEANUP_STATE_CHANGES);
+        when(clusterApplierService.state()).thenReturn(clusterState);
+        when(nodes.isLocalNodeElectedClusterManager()).thenReturn(true);
+        when(clusterState.nodes()).thenReturn(nodes);
+        when(clusterState.version()).thenReturn(version);
+
+        RemoteClusterStateCleanupManager spyManager = spy(remoteClusterStateCleanupManager);
+        AtomicInteger callCount = new AtomicInteger(0);
+        doAnswer(invocation -> callCount.incrementAndGet()).when(spyManager).deleteStaleClusterMetadata(any(), any(), anyInt());
+
+        remoteClusterStateCleanupManager.cleanUpStaleFiles();
+        assertEquals(0, callCount.get());
+    }
+
+    public void testRemoteCleanupCallsDeleteIfVersionIncrementGreaterThanThreshold() {
+        DiscoveryNodes nodes = mock(DiscoveryNodes.class);
+        long version = randomLongBetween(SKIP_CLEANUP_STATE_CHANGES + 1, SKIP_CLEANUP_STATE_CHANGES + 10);
+        when(clusterApplierService.state()).thenReturn(clusterState);
+        when(nodes.isLocalNodeElectedClusterManager()).thenReturn(true);
+        when(clusterState.nodes()).thenReturn(nodes);
+        when(clusterState.version()).thenReturn(version);
+
+        RemoteClusterStateCleanupManager spyManager = spy(remoteClusterStateCleanupManager);
+        AtomicInteger callCount = new AtomicInteger(0);
+        doAnswer(invocation -> callCount.incrementAndGet()).when(spyManager).deleteStaleClusterMetadata(any(), any(), anyInt());
+
+        // using spied cleanup manager so that stubbed deleteStaleClusterMetadata is called
+        spyManager.cleanUpStaleFiles();
+        assertEquals(1, callCount.get());
+    }
+
+    public void testRemoteCleanupSchedulesEvenAfterFailure() {
+        remoteClusterStateCleanupManager.start();
+        RemoteClusterStateCleanupManager spyManager = spy(remoteClusterStateCleanupManager);
+        AtomicInteger callCount = new AtomicInteger(0);
+        doAnswer(invocationOnMock -> {
+            callCount.incrementAndGet();
+            throw new RuntimeException("Test exception");
+        }).when(spyManager).cleanUpStaleFiles();
+        AsyncStaleFileDeletion task = new AsyncStaleFileDeletion(spyManager);
+        assertTrue(task.isScheduled());
+        task.run();
+        // Task is still scheduled after the failure
+        assertTrue(task.isScheduled());
+        assertEquals(1, callCount.get());
+
+        task.run();
+        // Task is still scheduled after the failure
+        assertTrue(task.isScheduled());
+        assertEquals(2, callCount.get());
+    }
+}
diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java
index 1b242b921c0d7..5f0c371a3137e 100644
--- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java
+++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java
@@ -19,6 +19,7 @@
 import org.opensearch.cluster.metadata.Metadata;
 import org.opensearch.cluster.metadata.TemplatesMetadata;
 import org.opensearch.cluster.node.DiscoveryNodes;
+import org.opensearch.cluster.service.ClusterService;
 import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer;
 import org.opensearch.common.blobstore.BlobContainer;
 import org.opensearch.common.blobstore.BlobMetadata;
@@ -72,9 +73,6 @@
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.BiConsumer;
 import java.util.function.Function;
 import java.util.function.Supplier;
@@ -93,7 +91,6 @@
 import static org.opensearch.gateway.remote.RemoteClusterStateService.MANIFEST_CURRENT_CODEC_VERSION;
 import static org.opensearch.gateway.remote.RemoteClusterStateService.MANIFEST_FILE_PREFIX;
 import static org.opensearch.gateway.remote.RemoteClusterStateService.METADATA_FILE_PREFIX;
-import static org.opensearch.gateway.remote.RemoteClusterStateService.RETAINED_MANIFESTS;
 import static org.opensearch.gateway.remote.RemoteClusterStateService.SETTING_METADATA;
 import static org.opensearch.gateway.remote.RemoteClusterStateService.TEMPLATES_METADATA;
 import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY;
@@ -109,13 +106,12 @@
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 public class RemoteClusterStateServiceTests extends OpenSearchTestCase {
 
     private RemoteClusterStateService remoteClusterStateService;
+    private ClusterService clusterService;
     private ClusterSettings clusterSettings;
     private Supplier<RepositoriesService> repositoriesServiceSupplier;
     private RepositoriesService repositoriesService;
@@ -148,6 +144,8 @@ public void setup() {
             .build();
 
         clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
+        clusterService = mock(ClusterService.class);
+        when(clusterService.getClusterSettings()).thenReturn(clusterSettings);
         NamedXContentRegistry xContentRegistry = new NamedXContentRegistry(
             Stream.of(
                 NetworkModule.getNamedXContents().stream(),
@@ -165,7 +163,7 @@ public void setup() {
             "test-node-id",
             repositoriesServiceSupplier,
             settings,
-            clusterSettings,
+            clusterService,
             () -> 0L,
             threadPool,
             List.of(new RemoteIndexPathUploader(threadPool, settings, repositoriesServiceSupplier, clusterSettings))
@@ -187,14 +185,14 @@ public void testFailWriteFullMetadataNonClusterManagerNode() throws IOException
 
     public void testFailInitializationWhenRemoteStateDisabled() {
         final Settings settings = Settings.builder().build();
-        ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
+        when(clusterService.getClusterSettings()).thenReturn(new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS));
         assertThrows(
             AssertionError.class,
             () -> new RemoteClusterStateService(
                 "test-node-id",
                 repositoriesServiceSupplier,
                 settings,
-                clusterSettings,
+                clusterService,
                 () -> 0L,
                 threadPool,
                 List.of(new RemoteIndexPathUploader(threadPool, settings, repositoriesServiceSupplier, clusterSettings))
@@ -1280,72 +1278,6 @@ public void testGetValidPreviousClusterUUIDWhenLastUUIDUncommitted() throws IOEx
         assertThat(previousClusterUUID, equalTo("cluster-uuid2"));
     }
 
-    public void testDeleteStaleClusterUUIDs() throws IOException {
-        final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build();
-        ClusterMetadataManifest clusterMetadataManifest = ClusterMetadataManifest.builder()
-            .indices(List.of())
-            .clusterTerm(1L)
-            .stateVersion(1L)
-            .stateUUID(randomAlphaOfLength(10))
-            .clusterUUID("cluster-uuid1")
-            .nodeId("nodeA")
-            .opensearchVersion(VersionUtils.randomOpenSearchVersion(random()))
-            .previousClusterUUID(ClusterState.UNKNOWN_UUID)
-            .committed(true)
-            .build();
-
-        BlobPath blobPath = new BlobPath().add("random-path");
-        when((blobStoreRepository.basePath())).thenReturn(blobPath);
-        BlobContainer uuidContainerContainer = mock(BlobContainer.class);
-        BlobContainer manifest2Container = mock(BlobContainer.class);
-        BlobContainer manifest3Container = mock(BlobContainer.class);
-        when(blobStore.blobContainer(any())).then(invocation -> {
-            BlobPath blobPath1 = invocation.getArgument(0);
-            if (blobPath1.buildAsString().endsWith("cluster-state/")) {
-                return uuidContainerContainer;
-            } else if (blobPath1.buildAsString().contains("cluster-state/cluster-uuid2/")) {
-                return manifest2Container;
-            } else if (blobPath1.buildAsString().contains("cluster-state/cluster-uuid3/")) {
-                return manifest3Container;
-            } else {
-                throw new IllegalArgumentException("Unexpected blob path " + blobPath1);
-            }
-        });
-        Map<String, BlobContainer> blobMetadataMap = Map.of(
-            "cluster-uuid1",
-            mock(BlobContainer.class),
-            "cluster-uuid2",
-            mock(BlobContainer.class),
-            "cluster-uuid3",
-            mock(BlobContainer.class)
-        );
-        when(uuidContainerContainer.children()).thenReturn(blobMetadataMap);
-        when(
-            manifest2Container.listBlobsByPrefixInSortedOrder(
-                MANIFEST_FILE_PREFIX + DELIMITER,
-                Integer.MAX_VALUE,
-                BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC
-            )
-        ).thenReturn(List.of(new PlainBlobMetadata("mainfest2", 1L)));
-        when(
-            manifest3Container.listBlobsByPrefixInSortedOrder(
-                MANIFEST_FILE_PREFIX + DELIMITER,
-                Integer.MAX_VALUE,
-                BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC
-            )
-        ).thenReturn(List.of(new PlainBlobMetadata("mainfest3", 1L)));
-        remoteClusterStateService.start();
-        remoteClusterStateService.deleteStaleClusterUUIDs(clusterState, clusterMetadataManifest);
-        try {
-            assertBusy(() -> {
-                verify(manifest2Container, times(1)).delete();
-                verify(manifest3Container, times(1)).delete();
-            });
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-
     public void testRemoteStateStats() throws IOException {
         final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build();
         mockBlobStoreObjects();
@@ -1358,26 +1290,6 @@ public void testRemoteStateStats() throws IOException {
         assertEquals(0, remoteClusterStateService.getStats().getFailedCount());
     }
 
-    public void testRemoteStateCleanupFailureStats() throws IOException {
-        BlobContainer blobContainer = mock(BlobContainer.class);
-        doThrow(IOException.class).when(blobContainer).delete();
-        when(blobStore.blobContainer(any())).thenReturn(blobContainer);
-        BlobPath blobPath = new BlobPath().add("random-path");
-        when((blobStoreRepository.basePath())).thenReturn(blobPath);
-        remoteClusterStateService.start();
-        remoteClusterStateService.deleteStaleUUIDsClusterMetadata("cluster1", Arrays.asList("cluster-uuid1"));
-        try {
-            assertBusy(() -> {
-                // wait for stats to get updated
-                assertTrue(remoteClusterStateService.getStats() != null);
-                assertEquals(0, remoteClusterStateService.getStats().getSuccessCount());
-                assertEquals(1, remoteClusterStateService.getStats().getCleanupAttemptFailedCount());
-            });
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-
     public void testFileNames() {
         final Index index = new Index("test-index", "index-uuid");
         final Settings idxSettings = Settings.builder()
@@ -1418,36 +1330,6 @@ private void verifyManifestFileNameWithCodec(int codecVersion) {
         assertThat(splittedName[3], is("P"));
     }
 
-    public void testSingleConcurrentExecutionOfStaleManifestCleanup() throws Exception {
-        BlobContainer blobContainer = mock(BlobContainer.class);
-        BlobPath blobPath = new BlobPath().add("random-path");
-        when((blobStoreRepository.basePath())).thenReturn(blobPath);
-        when(blobStore.blobContainer(any())).thenReturn(blobContainer);
-
-        CountDownLatch latch = new CountDownLatch(1);
-        AtomicInteger callCount = new AtomicInteger(0);
-        doAnswer(invocation -> {
-            callCount.incrementAndGet();
-            if (latch.await(5000, TimeUnit.SECONDS) == false) {
-                throw new Exception("Timed out waiting for delete task queuing to complete");
-            }
-            return null;
-        }).when(blobContainer)
-            .listBlobsByPrefixInSortedOrder(
-                any(String.class),
-                any(int.class),
-                any(BlobContainer.BlobNameSortOrder.class),
-                any(ActionListener.class)
-            );
-
-        remoteClusterStateService.start();
-        remoteClusterStateService.deleteStaleClusterMetadata("cluster-name", "cluster-uuid", RETAINED_MANIFESTS);
-        remoteClusterStateService.deleteStaleClusterMetadata("cluster-name", "cluster-uuid", RETAINED_MANIFESTS);
-
-        latch.countDown();
-        assertBusy(() -> assertEquals(1, callCount.get()));
-    }
-
     public void testIndexMetadataUploadWaitTimeSetting() {
         // verify default value
         assertEquals(
@@ -1891,7 +1773,7 @@ private static ClusterState.Builder generateClusterStateWithGlobalMetadata() {
             );
     }
 
-    private static ClusterState.Builder generateClusterStateWithOneIndex() {
+    static ClusterState.Builder generateClusterStateWithOneIndex() {
         final Index index = new Index("test-index", "index-uuid");
         final Settings idxSettings = Settings.builder()
             .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
@@ -1921,7 +1803,7 @@ private static ClusterState.Builder generateClusterStateWithOneIndex() {
             );
     }
 
-    private static DiscoveryNodes nodesWithLocalNodeClusterManager() {
+    static DiscoveryNodes nodesWithLocalNodeClusterManager() {
         return DiscoveryNodes.builder().clusterManagerNodeId("cluster-manager-id").localNodeId("cluster-manager-id").build();
     }