diff --git a/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManagerIT.java b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManagerIT.java new file mode 100644 index 0000000000000..56c8fb94eed91 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManagerIT.java @@ -0,0 +1,145 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote; + +import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse; +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.settings.Settings; +import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase; +import org.opensearch.repositories.RepositoriesService; +import org.opensearch.repositories.blobstore.BlobStoreRepository; +import org.opensearch.test.OpenSearchIntegTestCase; +import org.junit.Before; + +import java.nio.charset.StandardCharsets; +import java.util.Base64; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.CLUSTER_STATE_CLEANUP_INTERVAL_DEFAULT; +import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING; +import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.RETAINED_MANIFESTS; +import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.SKIP_CLEANUP_STATE_CHANGES; +import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING; +import static org.opensearch.indices.IndicesService.CLUSTER_DEFAULT_INDEX_REFRESH_INTERVAL_SETTING; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) +public class RemoteClusterStateCleanupManagerIT extends RemoteStoreBaseIntegTestCase { + + private static final String INDEX_NAME = "test-index"; + + @Before + public void setup() { + asyncUploadMockFsRepo = false; + } + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + return Settings.builder().put(super.nodeSettings(nodeOrdinal)).put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true).build(); + } + + private Map initialTestSetup(int shardCount, int replicaCount, int dataNodeCount, int clusterManagerNodeCount) { + prepareCluster(clusterManagerNodeCount, dataNodeCount, INDEX_NAME, replicaCount, shardCount); + Map indexStats = indexData(1, false, INDEX_NAME); + assertEquals(shardCount * (replicaCount + 1), getNumShards(INDEX_NAME).totalNumShards); + ensureGreen(INDEX_NAME); + return indexStats; + } + + public void testRemoteCleanupTaskUpdated() { + int shardCount = randomIntBetween(1, 2); + int replicaCount = 1; + int dataNodeCount = shardCount * (replicaCount + 1); + int clusterManagerNodeCount = 1; + + initialTestSetup(shardCount, replicaCount, dataNodeCount, clusterManagerNodeCount); + RemoteClusterStateCleanupManager remoteClusterStateCleanupManager = internalCluster().getClusterManagerNodeInstance( + RemoteClusterStateCleanupManager.class + ); + + assertEquals(CLUSTER_STATE_CLEANUP_INTERVAL_DEFAULT, remoteClusterStateCleanupManager.getStaleFileDeletionTask().getInterval()); + assertTrue(remoteClusterStateCleanupManager.getStaleFileDeletionTask().isScheduled()); + + // now disable + client().admin() + .cluster() + .prepareUpdateSettings() + .setPersistentSettings(Settings.builder().put(REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING.getKey(), -1)) + .get(); + + assertEquals(-1, remoteClusterStateCleanupManager.getStaleFileDeletionTask().getInterval().getMillis()); + assertFalse(remoteClusterStateCleanupManager.getStaleFileDeletionTask().isScheduled()); + + // now set Clean up interval to 1 min + client().admin() + .cluster() + .prepareUpdateSettings() + .setPersistentSettings(Settings.builder().put(REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING.getKey(), "1m")) + .get(); + assertEquals(1, remoteClusterStateCleanupManager.getStaleFileDeletionTask().getInterval().getMinutes()); + } + + public void testRemoteCleanupDeleteStale() throws Exception { + int shardCount = randomIntBetween(1, 2); + int replicaCount = 1; + int dataNodeCount = shardCount * (replicaCount + 1); + int clusterManagerNodeCount = 1; + + initialTestSetup(shardCount, replicaCount, dataNodeCount, clusterManagerNodeCount); + + // update cluster state 21 times to ensure that clean up has run after this will upload 42 manifest files + // to repository, if manifest files are less than that it means clean up has run + updateClusterStateNTimes(RETAINED_MANIFESTS + SKIP_CLEANUP_STATE_CHANGES + 1); + + RepositoriesService repositoriesService = internalCluster().getClusterManagerNodeInstance(RepositoriesService.class); + BlobStoreRepository repository = (BlobStoreRepository) repositoriesService.repository(REPOSITORY_NAME); + BlobPath baseMetadataPath = repository.basePath() + .add( + Base64.getUrlEncoder() + .withoutPadding() + .encodeToString(getClusterState().getClusterName().value().getBytes(StandardCharsets.UTF_8)) + ) + .add("cluster-state") + .add(getClusterState().metadata().clusterUUID()); + BlobPath manifestContainerPath = baseMetadataPath.add("manifest"); + + // set cleanup interval to 100 ms to make the test faster + ClusterUpdateSettingsResponse response = client().admin() + .cluster() + .prepareUpdateSettings() + .setPersistentSettings(Settings.builder().put(REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING.getKey(), "100ms")) + .get(); + + assertTrue(response.isAcknowledged()); + + assertBusy(() -> { + int manifestFiles = repository.blobStore().blobContainer(manifestContainerPath).listBlobsByPrefix("manifest").size(); + logger.info("number of current manifest file: {}", manifestFiles); + // we can't guarantee that we have same number of manifest as Retained manifest in our repo as there can be other queued task + // other than replica count change which can upload new manifest files, that's why we check that number of manifests is between + // Retained manifests and Retained manifests + 2 * Skip cleanup state changes (each cluster state update uploads 2 manifests) + assertTrue( + "Current number of manifest files: " + manifestFiles, + manifestFiles >= RETAINED_MANIFESTS && manifestFiles < RETAINED_MANIFESTS + 2 * SKIP_CLEANUP_STATE_CHANGES + ); + }, 500, TimeUnit.MILLISECONDS); + } + + private void updateClusterStateNTimes(int n) { + int newReplicaCount = randomIntBetween(0, 3); + for (int i = n; i > 0; i--) { + ClusterUpdateSettingsResponse response = client().admin() + .cluster() + .prepareUpdateSettings() + .setPersistentSettings(Settings.builder().put(CLUSTER_DEFAULT_INDEX_REFRESH_INTERVAL_SETTING.getKey(), i, TimeUnit.SECONDS)) + .get(); + assertTrue(response.isAcknowledged()); + } + } +} diff --git a/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateServiceIT.java b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateServiceIT.java index 42120aa32eb47..ab2f0f0080566 100644 --- a/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateServiceIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateServiceIT.java @@ -10,7 +10,6 @@ import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest; import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse; -import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.settings.Settings; import org.opensearch.discovery.DiscoveryStats; @@ -27,7 +26,6 @@ import java.util.function.Function; import java.util.stream.Collectors; -import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; import static org.opensearch.gateway.remote.RemoteClusterStateService.COORDINATION_METADATA; import static org.opensearch.gateway.remote.RemoteClusterStateService.CUSTOM_METADATA; import static org.opensearch.gateway.remote.RemoteClusterStateService.DELIMITER; @@ -51,16 +49,6 @@ protected Settings nodeSettings(int nodeOrdinal) { return Settings.builder().put(super.nodeSettings(nodeOrdinal)).put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true).build(); } - private void prepareCluster(int numClusterManagerNodes, int numDataOnlyNodes, String indices, int replicaCount, int shardCount) { - internalCluster().startClusterManagerOnlyNodes(numClusterManagerNodes); - internalCluster().startDataOnlyNodes(numDataOnlyNodes); - for (String index : indices.split(",")) { - createIndex(index, remoteStoreIndexSettings(replicaCount, shardCount)); - ensureYellowAndNoInitializingShards(index); - ensureGreen(index); - } - } - private Map initialTestSetup(int shardCount, int replicaCount, int dataNodeCount, int clusterManagerNodeCount) { prepareCluster(clusterManagerNodeCount, dataNodeCount, INDEX_NAME, replicaCount, shardCount); Map indexStats = indexData(1, false, INDEX_NAME); @@ -69,49 +57,6 @@ private Map initialTestSetup(int shardCount, int replicaCount, int return indexStats; } - public void testFullClusterRestoreStaleDelete() throws Exception { - int shardCount = randomIntBetween(1, 2); - int replicaCount = 1; - int dataNodeCount = shardCount * (replicaCount + 1); - int clusterManagerNodeCount = 1; - - initialTestSetup(shardCount, replicaCount, dataNodeCount, clusterManagerNodeCount); - setReplicaCount(0); - setReplicaCount(2); - setReplicaCount(0); - setReplicaCount(1); - setReplicaCount(0); - setReplicaCount(1); - setReplicaCount(0); - setReplicaCount(2); - setReplicaCount(0); - - RemoteClusterStateService remoteClusterStateService = internalCluster().getClusterManagerNodeInstance( - RemoteClusterStateService.class - ); - - RepositoriesService repositoriesService = internalCluster().getClusterManagerNodeInstance(RepositoriesService.class); - - BlobStoreRepository repository = (BlobStoreRepository) repositoriesService.repository(REPOSITORY_NAME); - BlobPath baseMetadataPath = repository.basePath() - .add( - Base64.getUrlEncoder() - .withoutPadding() - .encodeToString(getClusterState().getClusterName().value().getBytes(StandardCharsets.UTF_8)) - ) - .add("cluster-state") - .add(getClusterState().metadata().clusterUUID()); - - assertEquals(10, repository.blobStore().blobContainer(baseMetadataPath.add("manifest")).listBlobsByPrefix("manifest").size()); - - Map indexMetadataMap = remoteClusterStateService.getLatestClusterState( - cluster().getClusterName(), - getClusterState().metadata().clusterUUID() - ).getMetadata().getIndices(); - assertEquals(0, indexMetadataMap.values().stream().findFirst().get().getNumberOfReplicas()); - assertEquals(shardCount, indexMetadataMap.values().stream().findFirst().get().getNumberOfShards()); - } - public void testRemoteStateStats() { int shardCount = randomIntBetween(1, 2); int replicaCount = 1; @@ -241,12 +186,4 @@ private void validateNodesStatsResponse(NodesStatsResponse nodesStatsResponse) { assertNotNull(nodesStatsResponse.getNodes().get(0)); assertNotNull(nodesStatsResponse.getNodes().get(0).getDiscoveryStats()); } - - private void setReplicaCount(int replicaCount) { - client().admin() - .indices() - .prepareUpdateSettings(INDEX_NAME) - .setSettings(Settings.builder().put(SETTING_NUMBER_OF_REPLICAS, replicaCount)) - .get(); - } } diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java index 740aee69f7d80..64efcee6ef1b5 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java @@ -350,4 +350,14 @@ protected void restore(boolean restoreAllShards, String... indices) { PlainActionFuture.newFuture() ); } + + protected void prepareCluster(int numClusterManagerNodes, int numDataOnlyNodes, String indices, int replicaCount, int shardCount) { + internalCluster().startClusterManagerOnlyNodes(numClusterManagerNodes); + internalCluster().startDataOnlyNodes(numDataOnlyNodes); + for (String index : indices.split(",")) { + createIndex(index, remoteStoreIndexSettings(replicaCount, shardCount)); + ensureYellowAndNoInitializingShards(index); + ensureGreen(index); + } + } } diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 7814518af471b..297fc98764d07 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -104,6 +104,7 @@ import org.opensearch.gateway.GatewayService; import org.opensearch.gateway.PersistedClusterStateService; import org.opensearch.gateway.ShardsBatchGatewayAllocator; +import org.opensearch.gateway.remote.RemoteClusterStateCleanupManager; import org.opensearch.gateway.remote.RemoteClusterStateService; import org.opensearch.http.HttpTransportSettings; import org.opensearch.index.IndexModule; @@ -711,6 +712,7 @@ public void apply(Settings value, Settings current, Settings previous) { SearchRequestSlowLog.CLUSTER_SEARCH_REQUEST_SLOWLOG_LEVEL, // Remote cluster state settings + RemoteClusterStateCleanupManager.REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING, RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING, RemoteClusterStateService.INDEX_METADATA_UPLOAD_TIMEOUT_SETTING, RemoteClusterStateService.GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING, diff --git a/server/src/main/java/org/opensearch/common/util/concurrent/AbstractAsyncTask.java b/server/src/main/java/org/opensearch/common/util/concurrent/AbstractAsyncTask.java index 7c599476e263d..0b9dc6136ce60 100644 --- a/server/src/main/java/org/opensearch/common/util/concurrent/AbstractAsyncTask.java +++ b/server/src/main/java/org/opensearch/common/util/concurrent/AbstractAsyncTask.java @@ -34,6 +34,7 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; import org.opensearch.threadpool.Scheduler; import org.opensearch.threadpool.ThreadPool; @@ -106,7 +107,23 @@ public synchronized void rescheduleIfNecessary() { if (logger.isTraceEnabled()) { logger.trace("scheduling {} every {}", toString(), interval); } - cancellable = threadPool.schedule(this, interval, getThreadPool()); + try { + cancellable = threadPool.schedule(this, interval, getThreadPool()); + } catch (OpenSearchRejectedExecutionException e) { + if (e.isExecutorShutdown()) { + logger.debug( + new ParameterizedMessage( + "could not schedule execution of [{}] after [{}] on [{}] as executor is shut down", + this, + interval, + getThreadPool() + ), + e + ); + } else { + throw e; + } + } isScheduledOrRunning = true; } else { logger.trace("scheduled {} disabled", toString()); diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManager.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManager.java new file mode 100644 index 0000000000000..2fca239b10efd --- /dev/null +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManager.java @@ -0,0 +1,413 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.logging.log4j.util.Strings; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.service.ClusterApplierService; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.blobstore.BlobMetadata; +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.concurrent.AbstractAsyncTask; +import org.opensearch.core.action.ActionListener; +import org.opensearch.index.translog.transfer.BlobStoreTransferService; +import org.opensearch.threadpool.ThreadPool; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Locale; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.opensearch.gateway.remote.RemoteClusterStateService.GLOBAL_METADATA_FORMAT; +import static org.opensearch.gateway.remote.RemoteClusterStateService.GLOBAL_METADATA_PATH_TOKEN; +import static org.opensearch.gateway.remote.RemoteClusterStateService.INDEX_METADATA_FORMAT; +import static org.opensearch.gateway.remote.RemoteClusterStateService.INDEX_PATH_TOKEN; +import static org.opensearch.gateway.remote.RemoteClusterStateService.MANIFEST_FILE_PREFIX; +import static org.opensearch.gateway.remote.RemoteClusterStateService.MANIFEST_PATH_TOKEN; + +/** + * A Manager which provides APIs to clean up stale cluster state files and runs an async stale cleanup task + * + * @opensearch.internal + */ +public class RemoteClusterStateCleanupManager implements Closeable { + + public static final int RETAINED_MANIFESTS = 10; + public static final int SKIP_CLEANUP_STATE_CHANGES = 10; + public static final TimeValue CLUSTER_STATE_CLEANUP_INTERVAL_DEFAULT = TimeValue.timeValueMinutes(5); + public static final TimeValue CLUSTER_STATE_CLEANUP_INTERVAL_MINIMUM = TimeValue.MINUS_ONE; + + /** + * Setting to specify the interval to do run stale file cleanup job + * Min value -1 indicates that the stale file cleanup job should be disabled + */ + public static final Setting REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING = Setting.timeSetting( + "cluster.remote_store.state.cleanup_interval", + CLUSTER_STATE_CLEANUP_INTERVAL_DEFAULT, + CLUSTER_STATE_CLEANUP_INTERVAL_MINIMUM, + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + private static final Logger logger = LogManager.getLogger(RemoteClusterStateCleanupManager.class); + private final RemoteClusterStateService remoteClusterStateService; + private final RemotePersistenceStats remoteStateStats; + private BlobStoreTransferService blobStoreTransferService; + private TimeValue staleFileCleanupInterval; + private final AtomicBoolean deleteStaleMetadataRunning = new AtomicBoolean(false); + private volatile AsyncStaleFileDeletion staleFileDeletionTask; + private long lastCleanupAttemptStateVersion; + private final ThreadPool threadpool; + private final ClusterApplierService clusterApplierService; + + public RemoteClusterStateCleanupManager(RemoteClusterStateService remoteClusterStateService, ClusterService clusterService) { + this.remoteClusterStateService = remoteClusterStateService; + this.remoteStateStats = remoteClusterStateService.getStats(); + ClusterSettings clusterSettings = clusterService.getClusterSettings(); + this.clusterApplierService = clusterService.getClusterApplierService(); + this.staleFileCleanupInterval = clusterSettings.get(REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING); + this.threadpool = remoteClusterStateService.getThreadpool(); + // initialize with 0, a cleanup will be done when this node is elected master node and version is incremented more than threshold + this.lastCleanupAttemptStateVersion = 0; + clusterSettings.addSettingsUpdateConsumer(REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING, this::updateCleanupInterval); + } + + void start() { + staleFileDeletionTask = new AsyncStaleFileDeletion(this); + } + + @Override + public void close() throws IOException { + if (staleFileDeletionTask != null) { + staleFileDeletionTask.close(); + } + } + + private BlobStoreTransferService getBlobStoreTransferService() { + if (blobStoreTransferService == null) { + blobStoreTransferService = new BlobStoreTransferService(remoteClusterStateService.getBlobStore(), threadpool); + } + return blobStoreTransferService; + } + + private void updateCleanupInterval(TimeValue updatedInterval) { + this.staleFileCleanupInterval = updatedInterval; + logger.info("updated remote state cleanup interval to {}", updatedInterval); + // After updating the interval, we need to close the current task and create a new one which will run with updated interval + if (staleFileDeletionTask != null && !staleFileDeletionTask.getInterval().equals(updatedInterval)) { + staleFileDeletionTask.setInterval(updatedInterval); + } + } + + // visible for testing + void cleanUpStaleFiles() { + ClusterState currentAppliedState = clusterApplierService.state(); + if (currentAppliedState.nodes().isLocalNodeElectedClusterManager()) { + long cleanUpAttemptStateVersion = currentAppliedState.version(); + assert Strings.isNotEmpty(currentAppliedState.getClusterName().value()) : "cluster name is not set"; + assert Strings.isNotEmpty(currentAppliedState.metadata().clusterUUID()) : "cluster uuid is not set"; + if (cleanUpAttemptStateVersion - lastCleanupAttemptStateVersion > SKIP_CLEANUP_STATE_CHANGES) { + logger.info( + "Cleaning up stale remote state files for cluster [{}] with uuid [{}]. Last clean was done before {} updates", + currentAppliedState.getClusterName().value(), + currentAppliedState.metadata().clusterUUID(), + cleanUpAttemptStateVersion - lastCleanupAttemptStateVersion + ); + this.deleteStaleClusterMetadata( + currentAppliedState.getClusterName().value(), + currentAppliedState.metadata().clusterUUID(), + RETAINED_MANIFESTS + ); + lastCleanupAttemptStateVersion = cleanUpAttemptStateVersion; + } else { + logger.debug( + "Skipping cleanup of stale remote state files for cluster [{}] with uuid [{}]. Last clean was done before {} updates, which is less than threshold {}", + currentAppliedState.getClusterName().value(), + currentAppliedState.metadata().clusterUUID(), + cleanUpAttemptStateVersion - lastCleanupAttemptStateVersion, + SKIP_CLEANUP_STATE_CHANGES + ); + } + } else { + logger.debug("Skipping cleanup task as local node is not elected Cluster Manager"); + } + } + + private void addStaleGlobalMetadataPath(String fileName, Set filesToKeep, Set staleGlobalMetadataPaths) { + if (!filesToKeep.contains(fileName)) { + String[] splitPath = fileName.split("/"); + staleGlobalMetadataPaths.add( + new BlobPath().add(GLOBAL_METADATA_PATH_TOKEN).buildAsString() + GLOBAL_METADATA_FORMAT.blobName( + splitPath[splitPath.length - 1] + ) + ); + } + } + + // visible for testing + void deleteClusterMetadata( + String clusterName, + String clusterUUID, + List activeManifestBlobMetadata, + List staleManifestBlobMetadata + ) { + try { + Set filesToKeep = new HashSet<>(); + Set staleManifestPaths = new HashSet<>(); + Set staleIndexMetadataPaths = new HashSet<>(); + Set staleGlobalMetadataPaths = new HashSet<>(); + activeManifestBlobMetadata.forEach(blobMetadata -> { + ClusterMetadataManifest clusterMetadataManifest = remoteClusterStateService.fetchRemoteClusterMetadataManifest( + clusterName, + clusterUUID, + blobMetadata.name() + ); + clusterMetadataManifest.getIndices() + .forEach(uploadedIndexMetadata -> filesToKeep.add(uploadedIndexMetadata.getUploadedFilename())); + if (clusterMetadataManifest.getCodecVersion() == ClusterMetadataManifest.CODEC_V1) { + filesToKeep.add(clusterMetadataManifest.getGlobalMetadataFileName()); + } else if (clusterMetadataManifest.getCodecVersion() >= ClusterMetadataManifest.CODEC_V2) { + filesToKeep.add(clusterMetadataManifest.getCoordinationMetadata().getUploadedFilename()); + filesToKeep.add(clusterMetadataManifest.getSettingsMetadata().getUploadedFilename()); + filesToKeep.add(clusterMetadataManifest.getTemplatesMetadata().getUploadedFilename()); + clusterMetadataManifest.getCustomMetadataMap() + .values() + .forEach(attribute -> filesToKeep.add(attribute.getUploadedFilename())); + } + }); + staleManifestBlobMetadata.forEach(blobMetadata -> { + ClusterMetadataManifest clusterMetadataManifest = remoteClusterStateService.fetchRemoteClusterMetadataManifest( + clusterName, + clusterUUID, + blobMetadata.name() + ); + staleManifestPaths.add(new BlobPath().add(MANIFEST_PATH_TOKEN).buildAsString() + blobMetadata.name()); + if (clusterMetadataManifest.getCodecVersion() == ClusterMetadataManifest.CODEC_V1) { + addStaleGlobalMetadataPath(clusterMetadataManifest.getGlobalMetadataFileName(), filesToKeep, staleGlobalMetadataPaths); + } else if (clusterMetadataManifest.getCodecVersion() >= ClusterMetadataManifest.CODEC_V2) { + addStaleGlobalMetadataPath( + clusterMetadataManifest.getCoordinationMetadata().getUploadedFilename(), + filesToKeep, + staleGlobalMetadataPaths + ); + addStaleGlobalMetadataPath( + clusterMetadataManifest.getSettingsMetadata().getUploadedFilename(), + filesToKeep, + staleGlobalMetadataPaths + ); + addStaleGlobalMetadataPath( + clusterMetadataManifest.getTemplatesMetadata().getUploadedFilename(), + filesToKeep, + staleGlobalMetadataPaths + ); + clusterMetadataManifest.getCustomMetadataMap() + .values() + .forEach( + attribute -> addStaleGlobalMetadataPath(attribute.getUploadedFilename(), filesToKeep, staleGlobalMetadataPaths) + ); + } + + clusterMetadataManifest.getIndices().forEach(uploadedIndexMetadata -> { + if (filesToKeep.contains(uploadedIndexMetadata.getUploadedFilename()) == false) { + staleIndexMetadataPaths.add( + new BlobPath().add(INDEX_PATH_TOKEN).add(uploadedIndexMetadata.getIndexUUID()).buildAsString() + + INDEX_METADATA_FORMAT.blobName(uploadedIndexMetadata.getUploadedFilename()) + ); + } + }); + }); + + if (staleManifestPaths.isEmpty()) { + logger.debug("No stale Remote Cluster Metadata files found"); + return; + } + + deleteStalePaths(clusterName, clusterUUID, new ArrayList<>(staleGlobalMetadataPaths)); + deleteStalePaths(clusterName, clusterUUID, new ArrayList<>(staleIndexMetadataPaths)); + deleteStalePaths(clusterName, clusterUUID, new ArrayList<>(staleManifestPaths)); + } catch (IllegalStateException e) { + logger.error("Error while fetching Remote Cluster Metadata manifests", e); + } catch (IOException e) { + logger.error("Error while deleting stale Remote Cluster Metadata files", e); + remoteStateStats.cleanUpAttemptFailed(); + } catch (Exception e) { + logger.error("Unexpected error while deleting stale Remote Cluster Metadata files", e); + remoteStateStats.cleanUpAttemptFailed(); + } + } + + /** + * Deletes older than last {@code versionsToRetain} manifests. Also cleans up unreferenced IndexMetadata associated with older manifests + * + * @param clusterName name of the cluster + * @param clusterUUID uuid of cluster state to refer to in remote + * @param manifestsToRetain no of latest manifest files to keep in remote + */ + // package private for testing + void deleteStaleClusterMetadata(String clusterName, String clusterUUID, int manifestsToRetain) { + if (deleteStaleMetadataRunning.compareAndSet(false, true) == false) { + logger.info("Delete stale cluster metadata task is already in progress."); + return; + } + try { + getBlobStoreTransferService().listAllInSortedOrderAsync( + ThreadPool.Names.REMOTE_PURGE, + remoteClusterStateService.getManifestFolderPath(clusterName, clusterUUID), + MANIFEST_FILE_PREFIX, + Integer.MAX_VALUE, + new ActionListener<>() { + @Override + public void onResponse(List blobMetadata) { + if (blobMetadata.size() > manifestsToRetain) { + deleteClusterMetadata( + clusterName, + clusterUUID, + blobMetadata.subList(0, manifestsToRetain), + blobMetadata.subList(manifestsToRetain, blobMetadata.size()) + ); + } + deleteStaleMetadataRunning.set(false); + } + + @Override + public void onFailure(Exception e) { + logger.error( + new ParameterizedMessage( + "Exception occurred while deleting Remote Cluster Metadata for clusterUUIDs {}", + clusterUUID + ) + ); + deleteStaleMetadataRunning.set(false); + } + } + ); + } catch (Exception e) { + deleteStaleMetadataRunning.set(false); + throw e; + } + } + + /** + * Purges all remote cluster state against provided cluster UUIDs + * + * @param clusterName name of the cluster + * @param clusterUUIDs clusteUUIDs for which the remote state needs to be purged + */ + void deleteStaleUUIDsClusterMetadata(String clusterName, List clusterUUIDs) { + clusterUUIDs.forEach( + clusterUUID -> getBlobStoreTransferService().deleteAsync( + ThreadPool.Names.REMOTE_PURGE, + remoteClusterStateService.getCusterMetadataBasePath(clusterName, clusterUUID), + new ActionListener<>() { + @Override + public void onResponse(Void unused) { + logger.info("Deleted all remote cluster metadata for cluster UUID - {}", clusterUUID); + } + + @Override + public void onFailure(Exception e) { + logger.error( + new ParameterizedMessage( + "Exception occurred while deleting all remote cluster metadata for cluster UUID {}", + clusterUUID + ), + e + ); + remoteStateStats.cleanUpAttemptFailed(); + } + } + ) + ); + } + + // package private for testing + void deleteStalePaths(String clusterName, String clusterUUID, List stalePaths) throws IOException { + logger.debug(String.format(Locale.ROOT, "Deleting stale files from remote - %s", stalePaths)); + getBlobStoreTransferService().deleteBlobs( + remoteClusterStateService.getCusterMetadataBasePath(clusterName, clusterUUID), + stalePaths + ); + } + + /** + * Purges all remote cluster state against provided cluster UUIDs + * @param clusterState current state of the cluster + * @param committedManifest last committed ClusterMetadataManifest + */ + public void deleteStaleClusterUUIDs(ClusterState clusterState, ClusterMetadataManifest committedManifest) { + threadpool.executor(ThreadPool.Names.REMOTE_PURGE).execute(() -> { + String clusterName = clusterState.getClusterName().value(); + logger.debug("Deleting stale cluster UUIDs data from remote [{}]", clusterName); + Set allClustersUUIDsInRemote; + try { + allClustersUUIDsInRemote = new HashSet<>( + remoteClusterStateService.getAllClusterUUIDs(clusterState.getClusterName().value()) + ); + } catch (IOException e) { + logger.info(String.format(Locale.ROOT, "Error while fetching all cluster UUIDs for [%s]", clusterName)); + return; + } + // Retain last 2 cluster uuids data + allClustersUUIDsInRemote.remove(committedManifest.getClusterUUID()); + allClustersUUIDsInRemote.remove(committedManifest.getPreviousClusterUUID()); + deleteStaleUUIDsClusterMetadata(clusterName, new ArrayList<>(allClustersUUIDsInRemote)); + }); + } + + public TimeValue getStaleFileCleanupInterval() { + return this.staleFileCleanupInterval; + } + + AsyncStaleFileDeletion getStaleFileDeletionTask() { // for testing + return this.staleFileDeletionTask; + } + + RemotePersistenceStats getStats() { + return this.remoteStateStats; + } + + static final class AsyncStaleFileDeletion extends AbstractAsyncTask { + private final RemoteClusterStateCleanupManager remoteClusterStateCleanupManager; + + AsyncStaleFileDeletion(RemoteClusterStateCleanupManager remoteClusterStateCleanupManager) { + super( + logger, + remoteClusterStateCleanupManager.threadpool, + remoteClusterStateCleanupManager.getStaleFileCleanupInterval(), + true + ); + this.remoteClusterStateCleanupManager = remoteClusterStateCleanupManager; + rescheduleIfNecessary(); + } + + @Override + protected boolean mustReschedule() { + return true; + } + + @Override + protected void runInternal() { + remoteClusterStateCleanupManager.cleanUpStaleFiles(); + } + + @Override + protected String getThreadPool() { + return ThreadPool.Names.REMOTE_PURGE; + } + } +} diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index 0f862d1b68820..01ffd8f1cca46 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -18,11 +18,13 @@ import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.metadata.TemplatesMetadata; +import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.CheckedRunnable; import org.opensearch.common.Nullable; import org.opensearch.common.blobstore.BlobContainer; import org.opensearch.common.blobstore.BlobMetadata; import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.blobstore.BlobStore; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Setting.Property; @@ -59,7 +61,6 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.function.LongSupplier; @@ -81,8 +82,6 @@ public class RemoteClusterStateService implements Closeable { public static final String METADATA_MANIFEST_NAME_FORMAT = "%s"; - public static final int RETAINED_MANIFESTS = 10; - public static final String DELIMITER = "__"; public static final String CUSTOM_DELIMITER = "--"; @@ -207,8 +206,7 @@ public class RemoteClusterStateService implements Closeable { private volatile TimeValue indexMetadataUploadTimeout; private volatile TimeValue globalMetadataUploadTimeout; private volatile TimeValue metadataManifestUploadTimeout; - - private final AtomicBoolean deleteStaleMetadataRunning = new AtomicBoolean(false); + private RemoteClusterStateCleanupManager remoteClusterStateCleanupManager; private final RemotePersistenceStats remoteStateStats; private final String CLUSTER_STATE_UPLOAD_TIME_LOG_STRING = "writing cluster state for version [{}] took [{}ms]"; private final String METADATA_UPDATE_LOG_STRING = "wrote metadata for [{}] indices and skipped [{}] unchanged " @@ -232,7 +230,7 @@ public RemoteClusterStateService( String nodeId, Supplier repositoriesService, Settings settings, - ClusterSettings clusterSettings, + ClusterService clusterService, LongSupplier relativeTimeNanosSupplier, ThreadPool threadPool, List indexMetadataUploadListeners @@ -243,6 +241,7 @@ public RemoteClusterStateService( this.settings = settings; this.relativeTimeNanosSupplier = relativeTimeNanosSupplier; this.threadpool = threadPool; + ClusterSettings clusterSettings = clusterService.getClusterSettings(); this.slowWriteLoggingThreshold = clusterSettings.get(SLOW_WRITE_LOGGING_THRESHOLD); this.indexMetadataUploadTimeout = clusterSettings.get(INDEX_METADATA_UPLOAD_TIMEOUT_SETTING); this.globalMetadataUploadTimeout = clusterSettings.get(GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING); @@ -252,16 +251,10 @@ public RemoteClusterStateService( clusterSettings.addSettingsUpdateConsumer(GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING, this::setGlobalMetadataUploadTimeout); clusterSettings.addSettingsUpdateConsumer(METADATA_MANIFEST_UPLOAD_TIMEOUT_SETTING, this::setMetadataManifestUploadTimeout); this.remoteStateStats = new RemotePersistenceStats(); + this.remoteClusterStateCleanupManager = new RemoteClusterStateCleanupManager(this, clusterService); this.indexMetadataUploadListeners = indexMetadataUploadListeners; } - private BlobStoreTransferService getBlobStoreTransferService() { - if (blobStoreTransferService == null) { - blobStoreTransferService = new BlobStoreTransferService(blobStoreRepository.blobStore(), threadpool); - } - return blobStoreTransferService; - } - /** * This method uploads entire cluster state metadata to the configured blob store. For now only index metadata upload is supported. This method should be * invoked by the elected cluster manager when the remote cluster state is enabled. @@ -417,7 +410,6 @@ public ClusterMetadataManifest writeIncrementalMetadata( : previousManifest.getCustomMetadataMap(), false ); - deleteStaleClusterMetadata(clusterState.getClusterName().value(), clusterState.metadata().clusterUUID(), RETAINED_MANIFESTS); final long durationMillis = TimeValue.nsecToMSec(relativeTimeNanosSupplier.getAsLong() - startTimeNanos); remoteStateStats.stateSucceeded(); @@ -721,6 +713,10 @@ private CheckedRunnable getAsyncMetadataWriteAction( ); } + public RemoteClusterStateCleanupManager getCleanupManager() { + return remoteClusterStateCleanupManager; + } + @Nullable public ClusterMetadataManifest markLastStateAsCommitted(ClusterState clusterState, ClusterMetadataManifest previousManifest) throws IOException { @@ -740,12 +736,16 @@ public ClusterMetadataManifest markLastStateAsCommitted(ClusterState clusterStat previousManifest.getCustomMetadataMap(), true ); - deleteStaleClusterUUIDs(clusterState, committedManifest); + if (!previousManifest.isClusterUUIDCommitted() && committedManifest.isClusterUUIDCommitted()) { + remoteClusterStateCleanupManager.deleteStaleClusterUUIDs(clusterState, committedManifest); + } + return committedManifest; } @Override public void close() throws IOException { + remoteClusterStateCleanupManager.close(); if (blobStoreRepository != null) { IOUtils.close(blobStoreRepository); } @@ -760,6 +760,7 @@ public void start() { final Repository repository = repositoriesService.get().repository(remoteStoreRepo); assert repository instanceof BlobStoreRepository : "Repository should be instance of BlobStoreRepository"; blobStoreRepository = (BlobStoreRepository) repository; + remoteClusterStateCleanupManager.start(); } private ClusterMetadataManifest uploadManifest( @@ -850,6 +851,14 @@ private void writeMetadataManifest(String clusterName, String clusterUUID, Clust ); } + ThreadPool getThreadpool() { + return threadpool; + } + + BlobStore getBlobStore() { + return blobStoreRepository.blobStore(); + } + private BlobContainer indexMetadataContainer(String clusterName, String clusterUUID, String indexUUID) { // 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/index/ftqsCnn9TgOX return blobStoreRepository.blobStore() @@ -867,7 +876,7 @@ private BlobContainer manifestContainer(String clusterName, String clusterUUID) return blobStoreRepository.blobStore().blobContainer(getManifestFolderPath(clusterName, clusterUUID)); } - private BlobPath getCusterMetadataBasePath(String clusterName, String clusterUUID) { + BlobPath getCusterMetadataBasePath(String clusterName, String clusterUUID) { return blobStoreRepository.basePath().add(encodeString(clusterName)).add(CLUSTER_STATE_PATH_TOKEN).add(clusterUUID); } @@ -982,7 +991,7 @@ private static String metadataAttributeFileName(String componentPrefix, Long met ); } - private BlobPath getManifestFolderPath(String clusterName, String clusterUUID) { + BlobPath getManifestFolderPath(String clusterName, String clusterUUID) { return getCusterMetadataBasePath(clusterName, clusterUUID).add(MANIFEST_PATH_TOKEN); } @@ -1235,7 +1244,7 @@ public String getLastKnownUUIDFromRemote(String clusterName) { } } - private Set getAllClusterUUIDs(String clusterName) throws IOException { + Set getAllClusterUUIDs(String clusterName) throws IOException { Map clusterUUIDMetadata = clusterUUIDContainer(clusterName).children(); if (clusterUUIDMetadata == null) { return Collections.emptySet(); @@ -1426,7 +1435,7 @@ private Optional getLatestManifestFileName(String clusterName, String cl * @param clusterName name of the cluster * @return ClusterMetadataManifest */ - private ClusterMetadataManifest fetchRemoteClusterMetadataManifest(String clusterName, String clusterUUID, String filename) + ClusterMetadataManifest fetchRemoteClusterMetadataManifest(String clusterName, String clusterUUID, String filename) throws IllegalStateException { try { return getClusterMetadataManifestBlobStoreFormat(filename).read( @@ -1486,234 +1495,6 @@ public RemoteStateTransferException(String errorDesc, Throwable cause) { } } - /** - * Purges all remote cluster state against provided cluster UUIDs - * - * @param clusterName name of the cluster - * @param clusterUUIDs clusteUUIDs for which the remote state needs to be purged - */ - void deleteStaleUUIDsClusterMetadata(String clusterName, List clusterUUIDs) { - clusterUUIDs.forEach(clusterUUID -> { - getBlobStoreTransferService().deleteAsync( - ThreadPool.Names.REMOTE_PURGE, - getCusterMetadataBasePath(clusterName, clusterUUID), - new ActionListener<>() { - @Override - public void onResponse(Void unused) { - logger.info("Deleted all remote cluster metadata for cluster UUID - {}", clusterUUID); - } - - @Override - public void onFailure(Exception e) { - logger.error( - new ParameterizedMessage( - "Exception occurred while deleting all remote cluster metadata for cluster UUID {}", - clusterUUID - ), - e - ); - remoteStateStats.cleanUpAttemptFailed(); - } - } - ); - }); - } - - /** - * Deletes older than last {@code versionsToRetain} manifests. Also cleans up unreferenced IndexMetadata associated with older manifests - * - * @param clusterName name of the cluster - * @param clusterUUID uuid of cluster state to refer to in remote - * @param manifestsToRetain no of latest manifest files to keep in remote - */ - // package private for testing - void deleteStaleClusterMetadata(String clusterName, String clusterUUID, int manifestsToRetain) { - if (deleteStaleMetadataRunning.compareAndSet(false, true) == false) { - logger.info("Delete stale cluster metadata task is already in progress."); - return; - } - try { - getBlobStoreTransferService().listAllInSortedOrderAsync( - ThreadPool.Names.REMOTE_PURGE, - getManifestFolderPath(clusterName, clusterUUID), - "manifest", - Integer.MAX_VALUE, - new ActionListener<>() { - @Override - public void onResponse(List blobMetadata) { - if (blobMetadata.size() > manifestsToRetain) { - deleteClusterMetadata( - clusterName, - clusterUUID, - blobMetadata.subList(0, manifestsToRetain - 1), - blobMetadata.subList(manifestsToRetain - 1, blobMetadata.size()) - ); - } - deleteStaleMetadataRunning.set(false); - } - - @Override - public void onFailure(Exception e) { - logger.error( - new ParameterizedMessage( - "Exception occurred while deleting Remote Cluster Metadata for clusterUUIDs {}", - clusterUUID - ) - ); - deleteStaleMetadataRunning.set(false); - } - } - ); - } catch (Exception e) { - deleteStaleMetadataRunning.set(false); - throw e; - } - } - - private void deleteClusterMetadata( - String clusterName, - String clusterUUID, - List activeManifestBlobMetadata, - List staleManifestBlobMetadata - ) { - try { - Set filesToKeep = new HashSet<>(); - Set staleManifestPaths = new HashSet<>(); - Set staleIndexMetadataPaths = new HashSet<>(); - Set staleGlobalMetadataPaths = new HashSet<>(); - activeManifestBlobMetadata.forEach(blobMetadata -> { - ClusterMetadataManifest clusterMetadataManifest = fetchRemoteClusterMetadataManifest( - clusterName, - clusterUUID, - blobMetadata.name() - ); - clusterMetadataManifest.getIndices() - .forEach(uploadedIndexMetadata -> filesToKeep.add(uploadedIndexMetadata.getUploadedFilename())); - if (clusterMetadataManifest.getGlobalMetadataFileName() != null) { - filesToKeep.add(clusterMetadataManifest.getGlobalMetadataFileName()); - } else { - filesToKeep.add(clusterMetadataManifest.getCoordinationMetadata().getUploadedFilename()); - filesToKeep.add(clusterMetadataManifest.getTemplatesMetadata().getUploadedFilename()); - filesToKeep.add(clusterMetadataManifest.getSettingsMetadata().getUploadedFilename()); - clusterMetadataManifest.getCustomMetadataMap() - .forEach((key, value) -> { filesToKeep.add(value.getUploadedFilename()); }); - } - }); - staleManifestBlobMetadata.forEach(blobMetadata -> { - ClusterMetadataManifest clusterMetadataManifest = fetchRemoteClusterMetadataManifest( - clusterName, - clusterUUID, - blobMetadata.name() - ); - staleManifestPaths.add(new BlobPath().add(MANIFEST_PATH_TOKEN).buildAsString() + blobMetadata.name()); - if (clusterMetadataManifest.getGlobalMetadataFileName() != null) { - if (filesToKeep.contains(clusterMetadataManifest.getGlobalMetadataFileName()) == false) { - String[] globalMetadataSplitPath = clusterMetadataManifest.getGlobalMetadataFileName().split("/"); - staleGlobalMetadataPaths.add( - new BlobPath().add(GLOBAL_METADATA_PATH_TOKEN).buildAsString() + GLOBAL_METADATA_FORMAT.blobName( - globalMetadataSplitPath[globalMetadataSplitPath.length - 1] - ) - ); - } - } else { - if (filesToKeep.contains(clusterMetadataManifest.getCoordinationMetadata().getUploadedFilename()) == false) { - String[] coordinationMetadataSplitPath = clusterMetadataManifest.getCoordinationMetadata() - .getUploadedFilename() - .split("/"); - staleGlobalMetadataPaths.add( - new BlobPath().add(GLOBAL_METADATA_PATH_TOKEN).buildAsString() + GLOBAL_METADATA_FORMAT.blobName( - coordinationMetadataSplitPath[coordinationMetadataSplitPath.length - 1] - ) - ); - } - if (filesToKeep.contains(clusterMetadataManifest.getTemplatesMetadata().getUploadedFilename()) == false) { - String[] templatesMetadataSplitPath = clusterMetadataManifest.getTemplatesMetadata() - .getUploadedFilename() - .split("/"); - staleGlobalMetadataPaths.add( - new BlobPath().add(GLOBAL_METADATA_PATH_TOKEN).buildAsString() + GLOBAL_METADATA_FORMAT.blobName( - templatesMetadataSplitPath[templatesMetadataSplitPath.length - 1] - ) - ); - } - if (filesToKeep.contains(clusterMetadataManifest.getSettingsMetadata().getUploadedFilename()) == false) { - String[] settingsMetadataSplitPath = clusterMetadataManifest.getSettingsMetadata().getUploadedFilename().split("/"); - staleGlobalMetadataPaths.add( - new BlobPath().add(GLOBAL_METADATA_PATH_TOKEN).buildAsString() + GLOBAL_METADATA_FORMAT.blobName( - settingsMetadataSplitPath[settingsMetadataSplitPath.length - 1] - ) - ); - } - clusterMetadataManifest.getCustomMetadataMap().forEach((key, value) -> { - if (filesToKeep.contains(value.getUploadedFilename()) == false) { - String[] customMetadataSplitPath = value.getUploadedFilename().split("/"); - staleGlobalMetadataPaths.add( - new BlobPath().add(GLOBAL_METADATA_PATH_TOKEN).buildAsString() + GLOBAL_METADATA_FORMAT.blobName( - customMetadataSplitPath[customMetadataSplitPath.length - 1] - ) - ); - } - }); - } - - clusterMetadataManifest.getIndices().forEach(uploadedIndexMetadata -> { - if (filesToKeep.contains(uploadedIndexMetadata.getUploadedFilename()) == false) { - staleIndexMetadataPaths.add( - new BlobPath().add(INDEX_PATH_TOKEN).add(uploadedIndexMetadata.getIndexUUID()).buildAsString() - + INDEX_METADATA_FORMAT.blobName(uploadedIndexMetadata.getUploadedFilename()) - ); - } - }); - }); - - if (staleManifestPaths.isEmpty()) { - logger.debug("No stale Remote Cluster Metadata files found"); - return; - } - - deleteStalePaths(clusterName, clusterUUID, new ArrayList<>(staleGlobalMetadataPaths)); - deleteStalePaths(clusterName, clusterUUID, new ArrayList<>(staleIndexMetadataPaths)); - deleteStalePaths(clusterName, clusterUUID, new ArrayList<>(staleManifestPaths)); - } catch (IllegalStateException e) { - logger.error("Error while fetching Remote Cluster Metadata manifests", e); - } catch (IOException e) { - logger.error("Error while deleting stale Remote Cluster Metadata files", e); - remoteStateStats.cleanUpAttemptFailed(); - } catch (Exception e) { - logger.error("Unexpected error while deleting stale Remote Cluster Metadata files", e); - remoteStateStats.cleanUpAttemptFailed(); - } - } - - private void deleteStalePaths(String clusterName, String clusterUUID, List stalePaths) throws IOException { - logger.debug(String.format(Locale.ROOT, "Deleting stale files from remote - %s", stalePaths)); - getBlobStoreTransferService().deleteBlobs(getCusterMetadataBasePath(clusterName, clusterUUID), stalePaths); - } - - /** - * Purges all remote cluster state against provided cluster UUIDs - * - * @param clusterState current state of the cluster - * @param committedManifest last committed ClusterMetadataManifest - */ - public void deleteStaleClusterUUIDs(ClusterState clusterState, ClusterMetadataManifest committedManifest) { - threadpool.executor(ThreadPool.Names.REMOTE_PURGE).execute(() -> { - String clusterName = clusterState.getClusterName().value(); - logger.debug("Deleting stale cluster UUIDs data from remote [{}]", clusterName); - Set allClustersUUIDsInRemote; - try { - allClustersUUIDsInRemote = new HashSet<>(getAllClusterUUIDs(clusterState.getClusterName().value())); - } catch (IOException e) { - logger.info(String.format(Locale.ROOT, "Error while fetching all cluster UUIDs for [%s]", clusterName)); - return; - } - // Retain last 2 cluster uuids data - allClustersUUIDsInRemote.remove(committedManifest.getClusterUUID()); - allClustersUUIDsInRemote.remove(committedManifest.getPreviousClusterUUID()); - deleteStaleUUIDsClusterMetadata(clusterName, new ArrayList<>(allClustersUUIDsInRemote)); - }); - } - public RemotePersistenceStats getStats() { return remoteStateStats; } diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 76109ba10624a..49545fa8a0c8b 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -138,6 +138,7 @@ import org.opensearch.gateway.MetaStateService; import org.opensearch.gateway.PersistedClusterStateService; import org.opensearch.gateway.ShardsBatchGatewayAllocator; +import org.opensearch.gateway.remote.RemoteClusterStateCleanupManager; import org.opensearch.gateway.remote.RemoteClusterStateService; import org.opensearch.http.HttpServerTransport; import org.opensearch.identity.IdentityService; @@ -752,6 +753,7 @@ protected Node( threadPool::relativeTimeInMillis ); final RemoteClusterStateService remoteClusterStateService; + final RemoteClusterStateCleanupManager remoteClusterStateCleanupManager; final RemoteIndexPathUploader remoteIndexPathUploader; if (isRemoteStoreClusterStateEnabled(settings)) { remoteIndexPathUploader = new RemoteIndexPathUploader( @@ -764,14 +766,16 @@ protected Node( nodeEnvironment.nodeId(), repositoriesServiceReference::get, settings, - clusterService.getClusterSettings(), + clusterService, threadPool::preciseRelativeTimeInNanos, threadPool, List.of(remoteIndexPathUploader) ); + remoteClusterStateCleanupManager = remoteClusterStateService.getCleanupManager(); } else { remoteClusterStateService = null; remoteIndexPathUploader = null; + remoteClusterStateCleanupManager = null; } // collect engine factory providers from plugins @@ -1376,6 +1380,7 @@ protected Node( b.bind(MetricsRegistry.class).toInstance(metricsRegistry); b.bind(RemoteClusterStateService.class).toProvider(() -> remoteClusterStateService); b.bind(RemoteIndexPathUploader.class).toProvider(() -> remoteIndexPathUploader); + b.bind(RemoteClusterStateCleanupManager.class).toProvider(() -> remoteClusterStateCleanupManager); b.bind(PersistedStateRegistry.class).toInstance(persistedStateRegistry); b.bind(SegmentReplicationStatsTracker.class).toInstance(segmentReplicationStatsTracker); b.bind(SearchRequestOperationsCompositeListenerFactory.class).toInstance(searchRequestOperationsCompositeListenerFactory); diff --git a/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java b/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java index 3ba98c44f8d3e..418e6d8de6adb 100644 --- a/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java +++ b/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java @@ -462,9 +462,7 @@ public void testDataOnlyNodePersistence() throws Exception { }); when(transportService.getThreadPool()).thenReturn(threadPool); ClusterService clusterService = mock(ClusterService.class); - when(clusterService.getClusterSettings()).thenReturn( - new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS) - ); + when(clusterService.getClusterSettings()).thenReturn(new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)); final PersistedClusterStateService persistedClusterStateService = new PersistedClusterStateService( nodeEnvironment, xContentRegistry(), @@ -487,7 +485,7 @@ public void testDataOnlyNodePersistence() throws Exception { nodeEnvironment.nodeId(), repositoriesServiceSupplier, settings, - clusterSettings, + clusterService, () -> 0L, threadPool, List.of(new RemoteIndexPathUploader(threadPool, settings, repositoriesServiceSupplier, clusterSettings)) diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManagerTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManagerTests.java new file mode 100644 index 0000000000000..24fd1b164a4ff --- /dev/null +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManagerTests.java @@ -0,0 +1,446 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote; + +import org.opensearch.cluster.ClusterName; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.service.ClusterApplierService; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.blobstore.BlobContainer; +import org.opensearch.common.blobstore.BlobMetadata; +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.blobstore.BlobStore; +import org.opensearch.common.blobstore.support.PlainBlobMetadata; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.concurrent.AbstractAsyncTask; +import org.opensearch.core.action.ActionListener; +import org.opensearch.repositories.RepositoriesService; +import org.opensearch.repositories.blobstore.BlobStoreRepository; +import org.opensearch.repositories.fs.FsRepository; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.test.VersionUtils; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; +import org.junit.After; +import org.junit.Before; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Locale; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; + +import static org.opensearch.gateway.remote.ClusterMetadataManifest.CODEC_V1; +import static org.opensearch.gateway.remote.ClusterMetadataManifest.CODEC_V2; +import static org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata; +import static org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadataAttribute; +import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.AsyncStaleFileDeletion; +import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.CLUSTER_STATE_CLEANUP_INTERVAL_DEFAULT; +import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING; +import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.RETAINED_MANIFESTS; +import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.SKIP_CLEANUP_STATE_CHANGES; +import static org.opensearch.gateway.remote.RemoteClusterStateService.CLUSTER_STATE_PATH_TOKEN; +import static org.opensearch.gateway.remote.RemoteClusterStateService.COORDINATION_METADATA; +import static org.opensearch.gateway.remote.RemoteClusterStateService.DELIMITER; +import static org.opensearch.gateway.remote.RemoteClusterStateService.GLOBAL_METADATA_PATH_TOKEN; +import static org.opensearch.gateway.remote.RemoteClusterStateService.INDEX_PATH_TOKEN; +import static org.opensearch.gateway.remote.RemoteClusterStateService.MANIFEST_FILE_PREFIX; +import static org.opensearch.gateway.remote.RemoteClusterStateService.MANIFEST_PATH_TOKEN; +import static org.opensearch.gateway.remote.RemoteClusterStateService.SETTING_METADATA; +import static org.opensearch.gateway.remote.RemoteClusterStateService.TEMPLATES_METADATA; +import static org.opensearch.gateway.remote.RemoteClusterStateService.encodeString; +import static org.opensearch.gateway.remote.RemoteClusterStateServiceTests.generateClusterStateWithOneIndex; +import static org.opensearch.gateway.remote.RemoteClusterStateServiceTests.nodesWithLocalNodeClusterManager; +import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY; +import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX; +import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class RemoteClusterStateCleanupManagerTests extends OpenSearchTestCase { + private RemoteClusterStateCleanupManager remoteClusterStateCleanupManager; + private Supplier repositoriesServiceSupplier; + private RepositoriesService repositoriesService; + private BlobStoreRepository blobStoreRepository; + private BlobStore blobStore; + private ClusterSettings clusterSettings; + private ClusterApplierService clusterApplierService; + private ClusterState clusterState; + private Metadata metadata; + private RemoteClusterStateService remoteClusterStateService; + private final ThreadPool threadPool = new TestThreadPool(getClass().getName()); + + @Before + public void setup() { + repositoriesServiceSupplier = mock(Supplier.class); + repositoriesService = mock(RepositoriesService.class); + when(repositoriesServiceSupplier.get()).thenReturn(repositoriesService); + + String stateRepoTypeAttributeKey = String.format( + Locale.getDefault(), + "node.attr." + REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT, + "remote_store_repository" + ); + String stateRepoSettingsAttributeKeyPrefix = String.format( + Locale.getDefault(), + "node.attr." + REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX, + "remote_store_repository" + ); + + Settings settings = Settings.builder() + .put("node.attr." + REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY, "remote_store_repository") + .put(stateRepoTypeAttributeKey, FsRepository.TYPE) + .put(stateRepoSettingsAttributeKeyPrefix + "location", "randomRepoPath") + .put(RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true) + .build(); + + clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + clusterApplierService = mock(ClusterApplierService.class); + clusterState = mock(ClusterState.class); + metadata = mock(Metadata.class); + ClusterService clusterService = mock(ClusterService.class); + when(clusterService.getClusterSettings()).thenReturn(clusterSettings); + when(clusterState.getClusterName()).thenReturn(new ClusterName("test")); + when(metadata.clusterUUID()).thenReturn("testUUID"); + when(clusterState.metadata()).thenReturn(metadata); + when(clusterApplierService.state()).thenReturn(clusterState); + when(clusterService.getClusterApplierService()).thenReturn(clusterApplierService); + + blobStoreRepository = mock(BlobStoreRepository.class); + blobStore = mock(BlobStore.class); + when(blobStoreRepository.blobStore()).thenReturn(blobStore); + when(repositoriesService.repository("remote_store_repository")).thenReturn(blobStoreRepository); + + remoteClusterStateService = mock(RemoteClusterStateService.class); + when(remoteClusterStateService.getStats()).thenReturn(new RemotePersistenceStats()); + when(remoteClusterStateService.getThreadpool()).thenReturn(threadPool); + when(remoteClusterStateService.getBlobStore()).thenReturn(blobStore); + remoteClusterStateCleanupManager = new RemoteClusterStateCleanupManager(remoteClusterStateService, clusterService); + } + + @After + public void teardown() throws Exception { + super.tearDown(); + remoteClusterStateCleanupManager.close(); + threadPool.shutdown(); + } + + public void testDeleteClusterMetadata() throws IOException { + String clusterUUID = "clusterUUID"; + String clusterName = "test-cluster"; + List inactiveBlobs = Arrays.asList( + new PlainBlobMetadata("manifest1.dat", 1L), + new PlainBlobMetadata("manifest2.dat", 1L), + new PlainBlobMetadata("manifest3.dat", 1L) + ); + List activeBlobs = Arrays.asList( + new PlainBlobMetadata("manifest4.dat", 1L), + new PlainBlobMetadata("manifest5.dat", 1L) + ); + UploadedIndexMetadata index1Metadata = new UploadedIndexMetadata("index1", "indexUUID1", "index_metadata1"); + UploadedIndexMetadata index2Metadata = new UploadedIndexMetadata("index2", "indexUUID2", "index_metadata2"); + UploadedIndexMetadata index1UpdatedMetadata = new UploadedIndexMetadata("index1", "indexUUID1", "index_metadata1_updated"); + UploadedMetadataAttribute coordinationMetadata = new UploadedMetadataAttribute(COORDINATION_METADATA, "coordination_metadata"); + UploadedMetadataAttribute templateMetadata = new UploadedMetadataAttribute(TEMPLATES_METADATA, "template_metadata"); + UploadedMetadataAttribute settingMetadata = new UploadedMetadataAttribute(SETTING_METADATA, "settings_metadata"); + UploadedMetadataAttribute coordinationMetadataUpdated = new UploadedMetadataAttribute( + COORDINATION_METADATA, + "coordination_metadata_updated" + ); + UploadedMetadataAttribute templateMetadataUpdated = new UploadedMetadataAttribute(TEMPLATES_METADATA, "template_metadata_updated"); + UploadedMetadataAttribute settingMetadataUpdated = new UploadedMetadataAttribute(SETTING_METADATA, "settings_metadata_updated"); + ClusterMetadataManifest manifest1 = ClusterMetadataManifest.builder() + .indices(List.of(index1Metadata)) + .globalMetadataFileName("global_metadata") + .clusterTerm(1L) + .stateVersion(1L) + .codecVersion(CODEC_V1) + .stateUUID(randomAlphaOfLength(10)) + .clusterUUID(clusterUUID) + .nodeId("nodeA") + .opensearchVersion(VersionUtils.randomOpenSearchVersion(random())) + .previousClusterUUID(ClusterState.UNKNOWN_UUID) + .committed(true) + .build(); + ClusterMetadataManifest manifest2 = ClusterMetadataManifest.builder(manifest1) + .indices(List.of(index1Metadata, index2Metadata)) + .codecVersion(CODEC_V2) + .globalMetadataFileName(null) + .coordinationMetadata(coordinationMetadata) + .templatesMetadata(templateMetadata) + .settingMetadata(settingMetadata) + .build(); + ClusterMetadataManifest manifest3 = ClusterMetadataManifest.builder(manifest2) + .indices(List.of(index1UpdatedMetadata, index2Metadata)) + .settingMetadata(settingMetadataUpdated) + .build(); + + // active manifest have reference to index1Updated, index2, settingsUpdated, coordinationUpdated, templates, templatesUpdated + ClusterMetadataManifest manifest4 = ClusterMetadataManifest.builder(manifest3) + .coordinationMetadata(coordinationMetadataUpdated) + .build(); + ClusterMetadataManifest manifest5 = ClusterMetadataManifest.builder(manifest4).templatesMetadata(templateMetadataUpdated).build(); + + when(remoteClusterStateService.fetchRemoteClusterMetadataManifest(eq(clusterName), eq(clusterUUID), any())).thenReturn( + manifest4, + manifest5, + manifest1, + manifest2, + manifest3 + ); + BlobContainer container = mock(BlobContainer.class); + when(blobStore.blobContainer(any())).thenReturn(container); + doNothing().when(container).deleteBlobsIgnoringIfNotExists(any()); + + remoteClusterStateCleanupManager.deleteClusterMetadata(clusterName, clusterUUID, activeBlobs, inactiveBlobs); + verify(container).deleteBlobsIgnoringIfNotExists( + List.of( + new BlobPath().add(GLOBAL_METADATA_PATH_TOKEN).buildAsString() + coordinationMetadata.getUploadedFilename() + ".dat", + new BlobPath().add(GLOBAL_METADATA_PATH_TOKEN).buildAsString() + settingMetadata.getUploadedFilename() + ".dat", + new BlobPath().add(GLOBAL_METADATA_PATH_TOKEN).buildAsString() + "global_metadata.dat" + ) + ); + verify(container).deleteBlobsIgnoringIfNotExists( + List.of( + new BlobPath().add(INDEX_PATH_TOKEN).add(index1Metadata.getIndexUUID()).buildAsString() + + index1Metadata.getUploadedFilePath() + + ".dat" + ) + ); + Set staleManifest = new HashSet<>(); + inactiveBlobs.forEach(blob -> staleManifest.add(new BlobPath().add(MANIFEST_PATH_TOKEN).buildAsString() + blob.name())); + verify(container).deleteBlobsIgnoringIfNotExists(new ArrayList<>(staleManifest)); + } + + public void testDeleteStaleClusterUUIDs() throws IOException { + final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build(); + ClusterMetadataManifest clusterMetadataManifest = ClusterMetadataManifest.builder() + .indices(List.of()) + .clusterTerm(1L) + .stateVersion(1L) + .stateUUID(randomAlphaOfLength(10)) + .clusterUUID("cluster-uuid1") + .nodeId("nodeA") + .opensearchVersion(VersionUtils.randomOpenSearchVersion(random())) + .previousClusterUUID(ClusterState.UNKNOWN_UUID) + .committed(true) + .build(); + + BlobPath blobPath = new BlobPath().add("random-path"); + BlobContainer uuidContainerContainer = mock(BlobContainer.class); + BlobContainer manifest2Container = mock(BlobContainer.class); + BlobContainer manifest3Container = mock(BlobContainer.class); + when(blobStore.blobContainer(any())).then(invocation -> { + BlobPath blobPath1 = invocation.getArgument(0); + if (blobPath1.buildAsString().endsWith("cluster-state/")) { + return uuidContainerContainer; + } else if (blobPath1.buildAsString().contains("cluster-state/cluster-uuid2/")) { + return manifest2Container; + } else if (blobPath1.buildAsString().contains("cluster-state/cluster-uuid3/")) { + return manifest3Container; + } else { + throw new IllegalArgumentException("Unexpected blob path " + blobPath1); + } + }); + when( + manifest2Container.listBlobsByPrefixInSortedOrder( + MANIFEST_FILE_PREFIX + DELIMITER, + Integer.MAX_VALUE, + BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC + ) + ).thenReturn(List.of(new PlainBlobMetadata("mainfest2", 1L))); + when( + manifest3Container.listBlobsByPrefixInSortedOrder( + MANIFEST_FILE_PREFIX + DELIMITER, + Integer.MAX_VALUE, + BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC + ) + ).thenReturn(List.of(new PlainBlobMetadata("mainfest3", 1L))); + Set uuids = new HashSet<>(Arrays.asList("cluster-uuid1", "cluster-uuid2", "cluster-uuid3")); + when(remoteClusterStateService.getAllClusterUUIDs(any())).thenReturn(uuids); + when(remoteClusterStateService.getCusterMetadataBasePath(any(), any())).then( + invocationOnMock -> blobPath.add(encodeString(invocationOnMock.getArgument(0))) + .add(CLUSTER_STATE_PATH_TOKEN) + .add((String) invocationOnMock.getArgument(1)) + ); + remoteClusterStateCleanupManager.start(); + remoteClusterStateCleanupManager.deleteStaleClusterUUIDs(clusterState, clusterMetadataManifest); + try { + assertBusy(() -> { + verify(manifest2Container, times(1)).delete(); + verify(manifest3Container, times(1)).delete(); + }); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public void testRemoteStateCleanupFailureStats() throws IOException { + BlobContainer blobContainer = mock(BlobContainer.class); + doThrow(IOException.class).when(blobContainer).delete(); + when(blobStore.blobContainer(any())).thenReturn(blobContainer); + BlobPath blobPath = new BlobPath().add("random-path"); + when((blobStoreRepository.basePath())).thenReturn(blobPath); + remoteClusterStateCleanupManager.start(); + remoteClusterStateCleanupManager.deleteStaleUUIDsClusterMetadata("cluster1", List.of("cluster-uuid1")); + try { + assertBusy(() -> { + // wait for stats to get updated + assertNotNull(remoteClusterStateCleanupManager.getStats()); + assertEquals(0, remoteClusterStateCleanupManager.getStats().getSuccessCount()); + assertEquals(1, remoteClusterStateCleanupManager.getStats().getCleanupAttemptFailedCount()); + }); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public void testSingleConcurrentExecutionOfStaleManifestCleanup() throws Exception { + BlobContainer blobContainer = mock(BlobContainer.class); + when(blobStore.blobContainer(any())).thenReturn(blobContainer); + + CountDownLatch latch = new CountDownLatch(1); + AtomicInteger callCount = new AtomicInteger(0); + doAnswer(invocation -> { + callCount.incrementAndGet(); + if (latch.await(5000, TimeUnit.SECONDS) == false) { + throw new Exception("Timed out waiting for delete task queuing to complete"); + } + return null; + }).when(blobContainer) + .listBlobsByPrefixInSortedOrder( + any(String.class), + any(int.class), + any(BlobContainer.BlobNameSortOrder.class), + any(ActionListener.class) + ); + + remoteClusterStateCleanupManager.start(); + remoteClusterStateCleanupManager.deleteStaleClusterMetadata("cluster-name", "cluster-uuid", RETAINED_MANIFESTS); + remoteClusterStateCleanupManager.deleteStaleClusterMetadata("cluster-name", "cluster-uuid", RETAINED_MANIFESTS); + + latch.countDown(); + assertBusy(() -> assertEquals(1, callCount.get())); + } + + public void testRemoteClusterStateCleanupSetting() { + remoteClusterStateCleanupManager.start(); + // verify default value + assertEquals(CLUSTER_STATE_CLEANUP_INTERVAL_DEFAULT, remoteClusterStateCleanupManager.getStaleFileCleanupInterval()); + + // verify update interval + int cleanupInterval = randomIntBetween(1, 10); + Settings newSettings = Settings.builder().put("cluster.remote_store.state.cleanup_interval", cleanupInterval + "s").build(); + clusterSettings.applySettings(newSettings); + assertEquals(cleanupInterval, remoteClusterStateCleanupManager.getStaleFileCleanupInterval().seconds()); + } + + public void testRemoteCleanupTaskScheduled() { + AbstractAsyncTask cleanupTask = remoteClusterStateCleanupManager.getStaleFileDeletionTask(); + assertNull(cleanupTask); + // now the task should be initialized + remoteClusterStateCleanupManager.start(); + assertNotNull(remoteClusterStateCleanupManager.getStaleFileDeletionTask()); + assertTrue(remoteClusterStateCleanupManager.getStaleFileDeletionTask().mustReschedule()); + assertEquals( + clusterSettings.get(REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING), + remoteClusterStateCleanupManager.getStaleFileDeletionTask().getInterval() + ); + assertTrue(remoteClusterStateCleanupManager.getStaleFileDeletionTask().isScheduled()); + assertFalse(remoteClusterStateCleanupManager.getStaleFileDeletionTask().isClosed()); + } + + public void testRemoteCleanupSkipsOnOnlyElectedClusterManager() { + DiscoveryNodes nodes = mock(DiscoveryNodes.class); + when(nodes.isLocalNodeElectedClusterManager()).thenReturn(false); + when(clusterState.nodes()).thenReturn(nodes); + RemoteClusterStateCleanupManager spyManager = spy(remoteClusterStateCleanupManager); + AtomicInteger callCount = new AtomicInteger(0); + doAnswer(invocation -> callCount.incrementAndGet()).when(spyManager).deleteStaleClusterMetadata(any(), any(), anyInt()); + spyManager.cleanUpStaleFiles(); + assertEquals(0, callCount.get()); + + when(nodes.isLocalNodeElectedClusterManager()).thenReturn(true); + when(clusterState.version()).thenReturn(randomLongBetween(11, 20)); + spyManager.cleanUpStaleFiles(); + assertEquals(1, callCount.get()); + } + + public void testRemoteCleanupSkipsIfVersionIncrementLessThanThreshold() { + DiscoveryNodes nodes = mock(DiscoveryNodes.class); + long version = randomLongBetween(1, SKIP_CLEANUP_STATE_CHANGES); + when(clusterApplierService.state()).thenReturn(clusterState); + when(nodes.isLocalNodeElectedClusterManager()).thenReturn(true); + when(clusterState.nodes()).thenReturn(nodes); + when(clusterState.version()).thenReturn(version); + + RemoteClusterStateCleanupManager spyManager = spy(remoteClusterStateCleanupManager); + AtomicInteger callCount = new AtomicInteger(0); + doAnswer(invocation -> callCount.incrementAndGet()).when(spyManager).deleteStaleClusterMetadata(any(), any(), anyInt()); + + remoteClusterStateCleanupManager.cleanUpStaleFiles(); + assertEquals(0, callCount.get()); + } + + public void testRemoteCleanupCallsDeleteIfVersionIncrementGreaterThanThreshold() { + DiscoveryNodes nodes = mock(DiscoveryNodes.class); + long version = randomLongBetween(SKIP_CLEANUP_STATE_CHANGES + 1, SKIP_CLEANUP_STATE_CHANGES + 10); + when(clusterApplierService.state()).thenReturn(clusterState); + when(nodes.isLocalNodeElectedClusterManager()).thenReturn(true); + when(clusterState.nodes()).thenReturn(nodes); + when(clusterState.version()).thenReturn(version); + + RemoteClusterStateCleanupManager spyManager = spy(remoteClusterStateCleanupManager); + AtomicInteger callCount = new AtomicInteger(0); + doAnswer(invocation -> callCount.incrementAndGet()).when(spyManager).deleteStaleClusterMetadata(any(), any(), anyInt()); + + // using spied cleanup manager so that stubbed deleteStaleClusterMetadata is called + spyManager.cleanUpStaleFiles(); + assertEquals(1, callCount.get()); + } + + public void testRemoteCleanupSchedulesEvenAfterFailure() { + remoteClusterStateCleanupManager.start(); + RemoteClusterStateCleanupManager spyManager = spy(remoteClusterStateCleanupManager); + AtomicInteger callCount = new AtomicInteger(0); + doAnswer(invocationOnMock -> { + callCount.incrementAndGet(); + throw new RuntimeException("Test exception"); + }).when(spyManager).cleanUpStaleFiles(); + AsyncStaleFileDeletion task = new AsyncStaleFileDeletion(spyManager); + assertTrue(task.isScheduled()); + task.run(); + // Task is still scheduled after the failure + assertTrue(task.isScheduled()); + assertEquals(1, callCount.get()); + + task.run(); + // Task is still scheduled after the failure + assertTrue(task.isScheduled()); + assertEquals(2, callCount.get()); + } +} diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java index 1b242b921c0d7..5f0c371a3137e 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java @@ -19,6 +19,7 @@ import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.metadata.TemplatesMetadata; import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer; import org.opensearch.common.blobstore.BlobContainer; import org.opensearch.common.blobstore.BlobMetadata; @@ -72,9 +73,6 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; import java.util.function.Function; import java.util.function.Supplier; @@ -93,7 +91,6 @@ import static org.opensearch.gateway.remote.RemoteClusterStateService.MANIFEST_CURRENT_CODEC_VERSION; import static org.opensearch.gateway.remote.RemoteClusterStateService.MANIFEST_FILE_PREFIX; import static org.opensearch.gateway.remote.RemoteClusterStateService.METADATA_FILE_PREFIX; -import static org.opensearch.gateway.remote.RemoteClusterStateService.RETAINED_MANIFESTS; import static org.opensearch.gateway.remote.RemoteClusterStateService.SETTING_METADATA; import static org.opensearch.gateway.remote.RemoteClusterStateService.TEMPLATES_METADATA; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY; @@ -109,13 +106,12 @@ import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class RemoteClusterStateServiceTests extends OpenSearchTestCase { private RemoteClusterStateService remoteClusterStateService; + private ClusterService clusterService; private ClusterSettings clusterSettings; private Supplier repositoriesServiceSupplier; private RepositoriesService repositoriesService; @@ -148,6 +144,8 @@ public void setup() { .build(); clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + clusterService = mock(ClusterService.class); + when(clusterService.getClusterSettings()).thenReturn(clusterSettings); NamedXContentRegistry xContentRegistry = new NamedXContentRegistry( Stream.of( NetworkModule.getNamedXContents().stream(), @@ -165,7 +163,7 @@ public void setup() { "test-node-id", repositoriesServiceSupplier, settings, - clusterSettings, + clusterService, () -> 0L, threadPool, List.of(new RemoteIndexPathUploader(threadPool, settings, repositoriesServiceSupplier, clusterSettings)) @@ -187,14 +185,14 @@ public void testFailWriteFullMetadataNonClusterManagerNode() throws IOException public void testFailInitializationWhenRemoteStateDisabled() { final Settings settings = Settings.builder().build(); - ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + when(clusterService.getClusterSettings()).thenReturn(new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)); assertThrows( AssertionError.class, () -> new RemoteClusterStateService( "test-node-id", repositoriesServiceSupplier, settings, - clusterSettings, + clusterService, () -> 0L, threadPool, List.of(new RemoteIndexPathUploader(threadPool, settings, repositoriesServiceSupplier, clusterSettings)) @@ -1280,72 +1278,6 @@ public void testGetValidPreviousClusterUUIDWhenLastUUIDUncommitted() throws IOEx assertThat(previousClusterUUID, equalTo("cluster-uuid2")); } - public void testDeleteStaleClusterUUIDs() throws IOException { - final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build(); - ClusterMetadataManifest clusterMetadataManifest = ClusterMetadataManifest.builder() - .indices(List.of()) - .clusterTerm(1L) - .stateVersion(1L) - .stateUUID(randomAlphaOfLength(10)) - .clusterUUID("cluster-uuid1") - .nodeId("nodeA") - .opensearchVersion(VersionUtils.randomOpenSearchVersion(random())) - .previousClusterUUID(ClusterState.UNKNOWN_UUID) - .committed(true) - .build(); - - BlobPath blobPath = new BlobPath().add("random-path"); - when((blobStoreRepository.basePath())).thenReturn(blobPath); - BlobContainer uuidContainerContainer = mock(BlobContainer.class); - BlobContainer manifest2Container = mock(BlobContainer.class); - BlobContainer manifest3Container = mock(BlobContainer.class); - when(blobStore.blobContainer(any())).then(invocation -> { - BlobPath blobPath1 = invocation.getArgument(0); - if (blobPath1.buildAsString().endsWith("cluster-state/")) { - return uuidContainerContainer; - } else if (blobPath1.buildAsString().contains("cluster-state/cluster-uuid2/")) { - return manifest2Container; - } else if (blobPath1.buildAsString().contains("cluster-state/cluster-uuid3/")) { - return manifest3Container; - } else { - throw new IllegalArgumentException("Unexpected blob path " + blobPath1); - } - }); - Map blobMetadataMap = Map.of( - "cluster-uuid1", - mock(BlobContainer.class), - "cluster-uuid2", - mock(BlobContainer.class), - "cluster-uuid3", - mock(BlobContainer.class) - ); - when(uuidContainerContainer.children()).thenReturn(blobMetadataMap); - when( - manifest2Container.listBlobsByPrefixInSortedOrder( - MANIFEST_FILE_PREFIX + DELIMITER, - Integer.MAX_VALUE, - BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC - ) - ).thenReturn(List.of(new PlainBlobMetadata("mainfest2", 1L))); - when( - manifest3Container.listBlobsByPrefixInSortedOrder( - MANIFEST_FILE_PREFIX + DELIMITER, - Integer.MAX_VALUE, - BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC - ) - ).thenReturn(List.of(new PlainBlobMetadata("mainfest3", 1L))); - remoteClusterStateService.start(); - remoteClusterStateService.deleteStaleClusterUUIDs(clusterState, clusterMetadataManifest); - try { - assertBusy(() -> { - verify(manifest2Container, times(1)).delete(); - verify(manifest3Container, times(1)).delete(); - }); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - public void testRemoteStateStats() throws IOException { final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build(); mockBlobStoreObjects(); @@ -1358,26 +1290,6 @@ public void testRemoteStateStats() throws IOException { assertEquals(0, remoteClusterStateService.getStats().getFailedCount()); } - public void testRemoteStateCleanupFailureStats() throws IOException { - BlobContainer blobContainer = mock(BlobContainer.class); - doThrow(IOException.class).when(blobContainer).delete(); - when(blobStore.blobContainer(any())).thenReturn(blobContainer); - BlobPath blobPath = new BlobPath().add("random-path"); - when((blobStoreRepository.basePath())).thenReturn(blobPath); - remoteClusterStateService.start(); - remoteClusterStateService.deleteStaleUUIDsClusterMetadata("cluster1", Arrays.asList("cluster-uuid1")); - try { - assertBusy(() -> { - // wait for stats to get updated - assertTrue(remoteClusterStateService.getStats() != null); - assertEquals(0, remoteClusterStateService.getStats().getSuccessCount()); - assertEquals(1, remoteClusterStateService.getStats().getCleanupAttemptFailedCount()); - }); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - public void testFileNames() { final Index index = new Index("test-index", "index-uuid"); final Settings idxSettings = Settings.builder() @@ -1418,36 +1330,6 @@ private void verifyManifestFileNameWithCodec(int codecVersion) { assertThat(splittedName[3], is("P")); } - public void testSingleConcurrentExecutionOfStaleManifestCleanup() throws Exception { - BlobContainer blobContainer = mock(BlobContainer.class); - BlobPath blobPath = new BlobPath().add("random-path"); - when((blobStoreRepository.basePath())).thenReturn(blobPath); - when(blobStore.blobContainer(any())).thenReturn(blobContainer); - - CountDownLatch latch = new CountDownLatch(1); - AtomicInteger callCount = new AtomicInteger(0); - doAnswer(invocation -> { - callCount.incrementAndGet(); - if (latch.await(5000, TimeUnit.SECONDS) == false) { - throw new Exception("Timed out waiting for delete task queuing to complete"); - } - return null; - }).when(blobContainer) - .listBlobsByPrefixInSortedOrder( - any(String.class), - any(int.class), - any(BlobContainer.BlobNameSortOrder.class), - any(ActionListener.class) - ); - - remoteClusterStateService.start(); - remoteClusterStateService.deleteStaleClusterMetadata("cluster-name", "cluster-uuid", RETAINED_MANIFESTS); - remoteClusterStateService.deleteStaleClusterMetadata("cluster-name", "cluster-uuid", RETAINED_MANIFESTS); - - latch.countDown(); - assertBusy(() -> assertEquals(1, callCount.get())); - } - public void testIndexMetadataUploadWaitTimeSetting() { // verify default value assertEquals( @@ -1891,7 +1773,7 @@ private static ClusterState.Builder generateClusterStateWithGlobalMetadata() { ); } - private static ClusterState.Builder generateClusterStateWithOneIndex() { + static ClusterState.Builder generateClusterStateWithOneIndex() { final Index index = new Index("test-index", "index-uuid"); final Settings idxSettings = Settings.builder() .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) @@ -1921,7 +1803,7 @@ private static ClusterState.Builder generateClusterStateWithOneIndex() { ); } - private static DiscoveryNodes nodesWithLocalNodeClusterManager() { + static DiscoveryNodes nodesWithLocalNodeClusterManager() { return DiscoveryNodes.builder().clusterManagerNodeId("cluster-manager-id").localNodeId("cluster-manager-id").build(); }