diff --git a/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManagerIT.java b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManagerIT.java deleted file mode 100644 index 0f441fe01a368..0000000000000 --- a/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManagerIT.java +++ /dev/null @@ -1,145 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.gateway.remote; - -import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse; -import org.opensearch.common.blobstore.BlobPath; -import org.opensearch.common.settings.Settings; -import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase; -import org.opensearch.repositories.RepositoriesService; -import org.opensearch.repositories.blobstore.BlobStoreRepository; -import org.opensearch.test.OpenSearchIntegTestCase; -import org.junit.Before; - -import java.nio.charset.StandardCharsets; -import java.util.Base64; -import java.util.Map; -import java.util.concurrent.TimeUnit; - -import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.CLUSTER_STATE_CLEANUP_INTERVAL_DEFAULT; -import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING; -import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.RETAINED_MANIFESTS; -import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.SKIP_CLEANUP_STATE_CHANGES; -import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING; -import static org.opensearch.indices.IndicesService.CLUSTER_DEFAULT_INDEX_REFRESH_INTERVAL_SETTING; - -@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) -public class RemoteClusterStateCleanupManagerIT extends RemoteStoreBaseIntegTestCase { - - private static final String INDEX_NAME = "test-index"; - - @Before - public void setup() { - asyncUploadMockFsRepo = false; - } - - @Override - protected Settings nodeSettings(int nodeOrdinal) { - return Settings.builder().put(super.nodeSettings(nodeOrdinal)).put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true).build(); - } - - private Map initialTestSetup(int shardCount, int replicaCount, int dataNodeCount, int clusterManagerNodeCount) { - prepareCluster(clusterManagerNodeCount, dataNodeCount, INDEX_NAME, replicaCount, shardCount); - Map indexStats = indexData(1, false, INDEX_NAME); - assertEquals(shardCount * (replicaCount + 1), getNumShards(INDEX_NAME).totalNumShards); - ensureGreen(INDEX_NAME); - return indexStats; - } - - public void testRemoteCleanupTaskUpdated() { - int shardCount = randomIntBetween(1, 2); - int replicaCount = 1; - int dataNodeCount = shardCount * (replicaCount + 1); - int clusterManagerNodeCount = 1; - - initialTestSetup(shardCount, replicaCount, dataNodeCount, clusterManagerNodeCount); - RemoteClusterStateCleanupManager remoteClusterStateCleanupManager = internalCluster().getClusterManagerNodeInstance( - RemoteClusterStateCleanupManager.class - ); - - assertEquals(CLUSTER_STATE_CLEANUP_INTERVAL_DEFAULT, remoteClusterStateCleanupManager.getStaleFileDeletionTask().getInterval()); - assertTrue(remoteClusterStateCleanupManager.getStaleFileDeletionTask().isScheduled()); - - // now disable - client().admin() - .cluster() - .prepareUpdateSettings() - .setPersistentSettings(Settings.builder().put(REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING.getKey(), -1)) - .get(); - - assertEquals(-1, remoteClusterStateCleanupManager.getStaleFileDeletionTask().getInterval().getMillis()); - assertFalse(remoteClusterStateCleanupManager.getStaleFileDeletionTask().isScheduled()); - - // now set Clean up interval to 1 min - client().admin() - .cluster() - .prepareUpdateSettings() - .setPersistentSettings(Settings.builder().put(REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING.getKey(), "1m")) - .get(); - assertEquals(1, remoteClusterStateCleanupManager.getStaleFileDeletionTask().getInterval().getMinutes()); - } - - public void testRemoteCleanupDeleteStale() throws Exception { - int shardCount = randomIntBetween(1, 2); - int replicaCount = 1; - int dataNodeCount = shardCount * (replicaCount + 1); - int clusterManagerNodeCount = 1; - - initialTestSetup(shardCount, replicaCount, dataNodeCount, clusterManagerNodeCount); - - // set cleanup interval to 100 ms to make the test faster - ClusterUpdateSettingsResponse response = client().admin() - .cluster() - .prepareUpdateSettings() - .setPersistentSettings(Settings.builder().put(REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING.getKey(), "100ms")) - .get(); - - assertTrue(response.isAcknowledged()); - - // update cluster state 21 times to ensure that clean up has run after this will upload 42 manifest files - // to repository, if manifest files are less than that it means clean up has run - updateClusterStateNTimes(RETAINED_MANIFESTS + SKIP_CLEANUP_STATE_CHANGES + 1); - - RepositoriesService repositoriesService = internalCluster().getClusterManagerNodeInstance(RepositoriesService.class); - BlobStoreRepository repository = (BlobStoreRepository) repositoriesService.repository(REPOSITORY_NAME); - BlobPath baseMetadataPath = repository.basePath() - .add( - Base64.getUrlEncoder() - .withoutPadding() - .encodeToString(getClusterState().getClusterName().value().getBytes(StandardCharsets.UTF_8)) - ) - .add("cluster-state") - .add(getClusterState().metadata().clusterUUID()); - BlobPath manifestContainerPath = baseMetadataPath.add("manifest"); - - assertBusy(() -> { - int manifestFiles = repository.blobStore().blobContainer(manifestContainerPath).listBlobsByPrefix("manifest").size(); - logger.info("number of current manifest file: {}", manifestFiles); - // we can't guarantee that we have same number of manifest as Retained manifest in our repo as there can be other queued task - // other than replica count change which can upload new manifest files, that's why we check that number of manifests is between - // Retained manifests and Retained manifests + 2 * Skip cleanup state changes (each cluster state update uploads 2 manifests) - assertTrue( - "Current number of manifest files: " + manifestFiles, - manifestFiles >= RETAINED_MANIFESTS && manifestFiles < RETAINED_MANIFESTS + 2 * SKIP_CLEANUP_STATE_CHANGES - ); - }, 500, TimeUnit.MILLISECONDS); - } - - private void updateClusterStateNTimes(int n) { - int newReplicaCount = randomIntBetween(0, 3); - for (int i = n; i > 0; i--) { - ClusterUpdateSettingsResponse response = client().admin() - .cluster() - .prepareUpdateSettings() - .setPersistentSettings(Settings.builder().put(CLUSTER_DEFAULT_INDEX_REFRESH_INTERVAL_SETTING.getKey(), i, TimeUnit.SECONDS)) - .get(); - assertTrue(response.isAcknowledged()); - } - } -} diff --git a/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateServiceIT.java b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateServiceIT.java index ab2f0f0080566..42120aa32eb47 100644 --- a/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateServiceIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateServiceIT.java @@ -10,6 +10,7 @@ import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest; import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse; +import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.settings.Settings; import org.opensearch.discovery.DiscoveryStats; @@ -26,6 +27,7 @@ import java.util.function.Function; import java.util.stream.Collectors; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; import static org.opensearch.gateway.remote.RemoteClusterStateService.COORDINATION_METADATA; import static org.opensearch.gateway.remote.RemoteClusterStateService.CUSTOM_METADATA; import static org.opensearch.gateway.remote.RemoteClusterStateService.DELIMITER; @@ -49,6 +51,16 @@ protected Settings nodeSettings(int nodeOrdinal) { return Settings.builder().put(super.nodeSettings(nodeOrdinal)).put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true).build(); } + private void prepareCluster(int numClusterManagerNodes, int numDataOnlyNodes, String indices, int replicaCount, int shardCount) { + internalCluster().startClusterManagerOnlyNodes(numClusterManagerNodes); + internalCluster().startDataOnlyNodes(numDataOnlyNodes); + for (String index : indices.split(",")) { + createIndex(index, remoteStoreIndexSettings(replicaCount, shardCount)); + ensureYellowAndNoInitializingShards(index); + ensureGreen(index); + } + } + private Map initialTestSetup(int shardCount, int replicaCount, int dataNodeCount, int clusterManagerNodeCount) { prepareCluster(clusterManagerNodeCount, dataNodeCount, INDEX_NAME, replicaCount, shardCount); Map indexStats = indexData(1, false, INDEX_NAME); @@ -57,6 +69,49 @@ private Map initialTestSetup(int shardCount, int replicaCount, int return indexStats; } + public void testFullClusterRestoreStaleDelete() throws Exception { + int shardCount = randomIntBetween(1, 2); + int replicaCount = 1; + int dataNodeCount = shardCount * (replicaCount + 1); + int clusterManagerNodeCount = 1; + + initialTestSetup(shardCount, replicaCount, dataNodeCount, clusterManagerNodeCount); + setReplicaCount(0); + setReplicaCount(2); + setReplicaCount(0); + setReplicaCount(1); + setReplicaCount(0); + setReplicaCount(1); + setReplicaCount(0); + setReplicaCount(2); + setReplicaCount(0); + + RemoteClusterStateService remoteClusterStateService = internalCluster().getClusterManagerNodeInstance( + RemoteClusterStateService.class + ); + + RepositoriesService repositoriesService = internalCluster().getClusterManagerNodeInstance(RepositoriesService.class); + + BlobStoreRepository repository = (BlobStoreRepository) repositoriesService.repository(REPOSITORY_NAME); + BlobPath baseMetadataPath = repository.basePath() + .add( + Base64.getUrlEncoder() + .withoutPadding() + .encodeToString(getClusterState().getClusterName().value().getBytes(StandardCharsets.UTF_8)) + ) + .add("cluster-state") + .add(getClusterState().metadata().clusterUUID()); + + assertEquals(10, repository.blobStore().blobContainer(baseMetadataPath.add("manifest")).listBlobsByPrefix("manifest").size()); + + Map indexMetadataMap = remoteClusterStateService.getLatestClusterState( + cluster().getClusterName(), + getClusterState().metadata().clusterUUID() + ).getMetadata().getIndices(); + assertEquals(0, indexMetadataMap.values().stream().findFirst().get().getNumberOfReplicas()); + assertEquals(shardCount, indexMetadataMap.values().stream().findFirst().get().getNumberOfShards()); + } + public void testRemoteStateStats() { int shardCount = randomIntBetween(1, 2); int replicaCount = 1; @@ -186,4 +241,12 @@ private void validateNodesStatsResponse(NodesStatsResponse nodesStatsResponse) { assertNotNull(nodesStatsResponse.getNodes().get(0)); assertNotNull(nodesStatsResponse.getNodes().get(0).getDiscoveryStats()); } + + private void setReplicaCount(int replicaCount) { + client().admin() + .indices() + .prepareUpdateSettings(INDEX_NAME) + .setSettings(Settings.builder().put(SETTING_NUMBER_OF_REPLICAS, replicaCount)) + .get(); + } } diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java index 64efcee6ef1b5..740aee69f7d80 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java @@ -350,14 +350,4 @@ protected void restore(boolean restoreAllShards, String... indices) { PlainActionFuture.newFuture() ); } - - protected void prepareCluster(int numClusterManagerNodes, int numDataOnlyNodes, String indices, int replicaCount, int shardCount) { - internalCluster().startClusterManagerOnlyNodes(numClusterManagerNodes); - internalCluster().startDataOnlyNodes(numDataOnlyNodes); - for (String index : indices.split(",")) { - createIndex(index, remoteStoreIndexSettings(replicaCount, shardCount)); - ensureYellowAndNoInitializingShards(index); - ensureGreen(index); - } - } } diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 297fc98764d07..7814518af471b 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -104,7 +104,6 @@ import org.opensearch.gateway.GatewayService; import org.opensearch.gateway.PersistedClusterStateService; import org.opensearch.gateway.ShardsBatchGatewayAllocator; -import org.opensearch.gateway.remote.RemoteClusterStateCleanupManager; import org.opensearch.gateway.remote.RemoteClusterStateService; import org.opensearch.http.HttpTransportSettings; import org.opensearch.index.IndexModule; @@ -712,7 +711,6 @@ public void apply(Settings value, Settings current, Settings previous) { SearchRequestSlowLog.CLUSTER_SEARCH_REQUEST_SLOWLOG_LEVEL, // Remote cluster state settings - RemoteClusterStateCleanupManager.REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING, RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING, RemoteClusterStateService.INDEX_METADATA_UPLOAD_TIMEOUT_SETTING, RemoteClusterStateService.GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING, diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManager.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManager.java deleted file mode 100644 index 8a106a25e5630..0000000000000 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManager.java +++ /dev/null @@ -1,408 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.gateway.remote; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.message.ParameterizedMessage; -import org.apache.logging.log4j.util.Strings; -import org.opensearch.cluster.ClusterState; -import org.opensearch.cluster.service.ClusterApplierService; -import org.opensearch.cluster.service.ClusterService; -import org.opensearch.common.blobstore.BlobMetadata; -import org.opensearch.common.blobstore.BlobPath; -import org.opensearch.common.settings.ClusterSettings; -import org.opensearch.common.settings.Setting; -import org.opensearch.common.unit.TimeValue; -import org.opensearch.common.util.concurrent.AbstractAsyncTask; -import org.opensearch.core.action.ActionListener; -import org.opensearch.index.translog.transfer.BlobStoreTransferService; -import org.opensearch.threadpool.ThreadPool; - -import java.io.Closeable; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Locale; -import java.util.Set; -import java.util.concurrent.atomic.AtomicBoolean; - -import static org.opensearch.gateway.remote.RemoteClusterStateService.GLOBAL_METADATA_FORMAT; -import static org.opensearch.gateway.remote.RemoteClusterStateService.GLOBAL_METADATA_PATH_TOKEN; -import static org.opensearch.gateway.remote.RemoteClusterStateService.INDEX_METADATA_FORMAT; -import static org.opensearch.gateway.remote.RemoteClusterStateService.INDEX_PATH_TOKEN; -import static org.opensearch.gateway.remote.RemoteClusterStateService.MANIFEST_FILE_PREFIX; -import static org.opensearch.gateway.remote.RemoteClusterStateService.MANIFEST_PATH_TOKEN; - -/** - * A Manager which provides APIs to clean up stale cluster state files and runs an async stale cleanup task - * - * @opensearch.internal - */ -public class RemoteClusterStateCleanupManager implements Closeable { - - public static final int RETAINED_MANIFESTS = 10; - public static final int SKIP_CLEANUP_STATE_CHANGES = 10; - public static final TimeValue CLUSTER_STATE_CLEANUP_INTERVAL_DEFAULT = TimeValue.timeValueMinutes(5); - public static final TimeValue CLUSTER_STATE_CLEANUP_INTERVAL_MINIMUM = TimeValue.MINUS_ONE; - - /** - * Setting to specify the interval to do run stale file cleanup job - * Min value -1 indicates that the stale file cleanup job should be disabled - */ - public static final Setting REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING = Setting.timeSetting( - "cluster.remote_store.state.cleanup_interval", - CLUSTER_STATE_CLEANUP_INTERVAL_DEFAULT, - CLUSTER_STATE_CLEANUP_INTERVAL_MINIMUM, - Setting.Property.NodeScope, - Setting.Property.Dynamic - ); - private static final Logger logger = LogManager.getLogger(RemoteClusterStateCleanupManager.class); - private final RemoteClusterStateService remoteClusterStateService; - private final RemotePersistenceStats remoteStateStats; - private BlobStoreTransferService blobStoreTransferService; - private TimeValue staleFileCleanupInterval; - private final AtomicBoolean deleteStaleMetadataRunning = new AtomicBoolean(false); - private AsyncStaleFileDeletion staleFileDeletionTask; - private long lastCleanupAttemptStateVersion; - private final ThreadPool threadpool; - private final ClusterApplierService clusterApplierService; - - public RemoteClusterStateCleanupManager(RemoteClusterStateService remoteClusterStateService, ClusterService clusterService) { - this.remoteClusterStateService = remoteClusterStateService; - this.remoteStateStats = remoteClusterStateService.getStats(); - ClusterSettings clusterSettings = clusterService.getClusterSettings(); - this.clusterApplierService = clusterService.getClusterApplierService(); - this.staleFileCleanupInterval = clusterSettings.get(REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING); - this.threadpool = remoteClusterStateService.getThreadpool(); - // initialize with 0, a cleanup will be done when this node is elected master node and version is incremented more than threshold - this.lastCleanupAttemptStateVersion = 0; - clusterSettings.addSettingsUpdateConsumer(REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING, this::updateCleanupInterval); - } - - void start() { - staleFileDeletionTask = new AsyncStaleFileDeletion(this); - } - - @Override - public void close() throws IOException { - if (staleFileDeletionTask != null) { - staleFileDeletionTask.close(); - } - } - - private BlobStoreTransferService getBlobStoreTransferService() { - if (blobStoreTransferService == null) { - blobStoreTransferService = new BlobStoreTransferService(remoteClusterStateService.getBlobStore(), threadpool); - } - return blobStoreTransferService; - } - - private void updateCleanupInterval(TimeValue updatedInterval) { - this.staleFileCleanupInterval = updatedInterval; - logger.info("updated remote state cleanup interval to {}", updatedInterval); - // After updating the interval, we need to close the current task and create a new one which will run with updated interval - if (staleFileDeletionTask != null && !staleFileDeletionTask.getInterval().equals(updatedInterval)) { - staleFileDeletionTask.setInterval(updatedInterval); - } - } - - // visible for testing - void cleanUpStaleFiles() { - ClusterState currentAppliedState = clusterApplierService.state(); - if (currentAppliedState.nodes().isLocalNodeElectedClusterManager()) { - long cleanUpAttemptStateVersion = currentAppliedState.version(); - assert Strings.isNotEmpty(currentAppliedState.getClusterName().value()) : "cluster name is not set"; - assert Strings.isNotEmpty(currentAppliedState.metadata().clusterUUID()) : "cluster uuid is not set"; - if (cleanUpAttemptStateVersion - lastCleanupAttemptStateVersion > SKIP_CLEANUP_STATE_CHANGES) { - logger.info( - "Cleaning up stale remote state files for cluster [{}] with uuid [{}]. Last clean was done before {} updates", - currentAppliedState.getClusterName().value(), - currentAppliedState.metadata().clusterUUID(), - cleanUpAttemptStateVersion - lastCleanupAttemptStateVersion - ); - this.deleteStaleClusterMetadata( - currentAppliedState.getClusterName().value(), - currentAppliedState.metadata().clusterUUID(), - RETAINED_MANIFESTS - ); - lastCleanupAttemptStateVersion = cleanUpAttemptStateVersion; - } else { - logger.debug( - "Skipping cleanup of stale remote state files for cluster [{}] with uuid [{}]. Last clean was done before {} updates, which is less than threshold {}", - currentAppliedState.getClusterName().value(), - currentAppliedState.metadata().clusterUUID(), - cleanUpAttemptStateVersion - lastCleanupAttemptStateVersion, - SKIP_CLEANUP_STATE_CHANGES - ); - } - } else { - logger.debug("Skipping cleanup task as local node is not elected Cluster Manager"); - } - } - - private void addStaleGlobalMetadataPath(String fileName, Set filesToKeep, Set staleGlobalMetadataPaths) { - if (!filesToKeep.contains(fileName)) { - String[] splitPath = fileName.split("/"); - staleGlobalMetadataPaths.add( - new BlobPath().add(GLOBAL_METADATA_PATH_TOKEN).buildAsString() + GLOBAL_METADATA_FORMAT.blobName( - splitPath[splitPath.length - 1] - ) - ); - } - } - - // visible for testing - void deleteClusterMetadata( - String clusterName, - String clusterUUID, - List activeManifestBlobMetadata, - List staleManifestBlobMetadata - ) { - try { - Set filesToKeep = new HashSet<>(); - Set staleManifestPaths = new HashSet<>(); - Set staleIndexMetadataPaths = new HashSet<>(); - Set staleGlobalMetadataPaths = new HashSet<>(); - activeManifestBlobMetadata.forEach(blobMetadata -> { - ClusterMetadataManifest clusterMetadataManifest = remoteClusterStateService.fetchRemoteClusterMetadataManifest( - clusterName, - clusterUUID, - blobMetadata.name() - ); - clusterMetadataManifest.getIndices() - .forEach(uploadedIndexMetadata -> filesToKeep.add(uploadedIndexMetadata.getUploadedFilename())); - if (clusterMetadataManifest.getCodecVersion() == ClusterMetadataManifest.CODEC_V1) { - filesToKeep.add(clusterMetadataManifest.getGlobalMetadataFileName()); - } else if (clusterMetadataManifest.getCodecVersion() >= ClusterMetadataManifest.CODEC_V2) { - filesToKeep.add(clusterMetadataManifest.getCoordinationMetadata().getUploadedFilename()); - filesToKeep.add(clusterMetadataManifest.getSettingsMetadata().getUploadedFilename()); - filesToKeep.add(clusterMetadataManifest.getTemplatesMetadata().getUploadedFilename()); - clusterMetadataManifest.getCustomMetadataMap() - .values() - .forEach(attribute -> filesToKeep.add(attribute.getUploadedFilename())); - } - }); - staleManifestBlobMetadata.forEach(blobMetadata -> { - ClusterMetadataManifest clusterMetadataManifest = remoteClusterStateService.fetchRemoteClusterMetadataManifest( - clusterName, - clusterUUID, - blobMetadata.name() - ); - staleManifestPaths.add(new BlobPath().add(MANIFEST_PATH_TOKEN).buildAsString() + blobMetadata.name()); - if (clusterMetadataManifest.getCodecVersion() == ClusterMetadataManifest.CODEC_V1) { - addStaleGlobalMetadataPath(clusterMetadataManifest.getGlobalMetadataFileName(), filesToKeep, staleGlobalMetadataPaths); - } else if (clusterMetadataManifest.getCodecVersion() >= ClusterMetadataManifest.CODEC_V2) { - addStaleGlobalMetadataPath( - clusterMetadataManifest.getCoordinationMetadata().getUploadedFilename(), - filesToKeep, - staleGlobalMetadataPaths - ); - addStaleGlobalMetadataPath( - clusterMetadataManifest.getSettingsMetadata().getUploadedFilename(), - filesToKeep, - staleGlobalMetadataPaths - ); - addStaleGlobalMetadataPath( - clusterMetadataManifest.getTemplatesMetadata().getUploadedFilename(), - filesToKeep, - staleGlobalMetadataPaths - ); - clusterMetadataManifest.getCustomMetadataMap() - .values() - .forEach( - attribute -> addStaleGlobalMetadataPath(attribute.getUploadedFilename(), filesToKeep, staleGlobalMetadataPaths) - ); - } - - clusterMetadataManifest.getIndices().forEach(uploadedIndexMetadata -> { - if (filesToKeep.contains(uploadedIndexMetadata.getUploadedFilename()) == false) { - staleIndexMetadataPaths.add( - new BlobPath().add(INDEX_PATH_TOKEN).add(uploadedIndexMetadata.getIndexUUID()).buildAsString() - + INDEX_METADATA_FORMAT.blobName(uploadedIndexMetadata.getUploadedFilename()) - ); - } - }); - }); - - if (staleManifestPaths.isEmpty()) { - logger.debug("No stale Remote Cluster Metadata files found"); - return; - } - - deleteStalePaths(clusterName, clusterUUID, new ArrayList<>(staleGlobalMetadataPaths)); - deleteStalePaths(clusterName, clusterUUID, new ArrayList<>(staleIndexMetadataPaths)); - deleteStalePaths(clusterName, clusterUUID, new ArrayList<>(staleManifestPaths)); - } catch (IllegalStateException e) { - logger.error("Error while fetching Remote Cluster Metadata manifests", e); - } catch (IOException e) { - logger.error("Error while deleting stale Remote Cluster Metadata files", e); - remoteStateStats.cleanUpAttemptFailed(); - } catch (Exception e) { - logger.error("Unexpected error while deleting stale Remote Cluster Metadata files", e); - remoteStateStats.cleanUpAttemptFailed(); - } - } - - /** - * Deletes older than last {@code versionsToRetain} manifests. Also cleans up unreferenced IndexMetadata associated with older manifests - * - * @param clusterName name of the cluster - * @param clusterUUID uuid of cluster state to refer to in remote - * @param manifestsToRetain no of latest manifest files to keep in remote - */ - // package private for testing - void deleteStaleClusterMetadata(String clusterName, String clusterUUID, int manifestsToRetain) { - if (deleteStaleMetadataRunning.compareAndSet(false, true) == false) { - logger.info("Delete stale cluster metadata task is already in progress."); - return; - } - try { - getBlobStoreTransferService().listAllInSortedOrderAsync( - ThreadPool.Names.REMOTE_PURGE, - remoteClusterStateService.getManifestFolderPath(clusterName, clusterUUID), - MANIFEST_FILE_PREFIX, - Integer.MAX_VALUE, - new ActionListener<>() { - @Override - public void onResponse(List blobMetadata) { - if (blobMetadata.size() > manifestsToRetain) { - deleteClusterMetadata( - clusterName, - clusterUUID, - blobMetadata.subList(0, manifestsToRetain), - blobMetadata.subList(manifestsToRetain, blobMetadata.size()) - ); - } - deleteStaleMetadataRunning.set(false); - } - - @Override - public void onFailure(Exception e) { - logger.error( - new ParameterizedMessage( - "Exception occurred while deleting Remote Cluster Metadata for clusterUUIDs {}", - clusterUUID - ) - ); - deleteStaleMetadataRunning.set(false); - } - } - ); - } catch (Exception e) { - deleteStaleMetadataRunning.set(false); - throw e; - } - } - - /** - * Purges all remote cluster state against provided cluster UUIDs - * - * @param clusterName name of the cluster - * @param clusterUUIDs clusteUUIDs for which the remote state needs to be purged - */ - void deleteStaleUUIDsClusterMetadata(String clusterName, List clusterUUIDs) { - clusterUUIDs.forEach( - clusterUUID -> getBlobStoreTransferService().deleteAsync( - ThreadPool.Names.REMOTE_PURGE, - remoteClusterStateService.getCusterMetadataBasePath(clusterName, clusterUUID), - new ActionListener<>() { - @Override - public void onResponse(Void unused) { - logger.info("Deleted all remote cluster metadata for cluster UUID - {}", clusterUUID); - } - - @Override - public void onFailure(Exception e) { - logger.error( - new ParameterizedMessage( - "Exception occurred while deleting all remote cluster metadata for cluster UUID {}", - clusterUUID - ), - e - ); - remoteStateStats.cleanUpAttemptFailed(); - } - } - ) - ); - } - - // package private for testing - void deleteStalePaths(String clusterName, String clusterUUID, List stalePaths) throws IOException { - logger.debug(String.format(Locale.ROOT, "Deleting stale files from remote - %s", stalePaths)); - getBlobStoreTransferService().deleteBlobs( - remoteClusterStateService.getCusterMetadataBasePath(clusterName, clusterUUID), - stalePaths - ); - } - - /** - * Purges all remote cluster state against provided cluster UUIDs - * @param clusterState current state of the cluster - * @param committedManifest last committed ClusterMetadataManifest - */ - public void deleteStaleClusterUUIDs(ClusterState clusterState, ClusterMetadataManifest committedManifest) { - threadpool.executor(ThreadPool.Names.REMOTE_PURGE).execute(() -> { - String clusterName = clusterState.getClusterName().value(); - logger.debug("Deleting stale cluster UUIDs data from remote [{}]", clusterName); - Set allClustersUUIDsInRemote; - try { - allClustersUUIDsInRemote = new HashSet<>( - remoteClusterStateService.getAllClusterUUIDs(clusterState.getClusterName().value()) - ); - } catch (IOException e) { - logger.info(String.format(Locale.ROOT, "Error while fetching all cluster UUIDs for [%s]", clusterName)); - return; - } - // Retain last 2 cluster uuids data - allClustersUUIDsInRemote.remove(committedManifest.getClusterUUID()); - allClustersUUIDsInRemote.remove(committedManifest.getPreviousClusterUUID()); - deleteStaleUUIDsClusterMetadata(clusterName, new ArrayList<>(allClustersUUIDsInRemote)); - }); - } - - public TimeValue getStaleFileCleanupInterval() { - return this.staleFileCleanupInterval; - } - - AsyncStaleFileDeletion getStaleFileDeletionTask() { // for testing - return this.staleFileDeletionTask; - } - - RemotePersistenceStats getStats() { - return this.remoteStateStats; - } - - static final class AsyncStaleFileDeletion extends AbstractAsyncTask { - private final RemoteClusterStateCleanupManager remoteClusterStateCleanupManager; - - AsyncStaleFileDeletion(RemoteClusterStateCleanupManager remoteClusterStateCleanupManager) { - super( - logger, - remoteClusterStateCleanupManager.threadpool, - remoteClusterStateCleanupManager.getStaleFileCleanupInterval(), - true - ); - this.remoteClusterStateCleanupManager = remoteClusterStateCleanupManager; - rescheduleIfNecessary(); - } - - @Override - protected boolean mustReschedule() { - return true; - } - - @Override - protected void runInternal() { - remoteClusterStateCleanupManager.cleanUpStaleFiles(); - } - } -} diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index 01ffd8f1cca46..0f862d1b68820 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -18,13 +18,11 @@ import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.metadata.TemplatesMetadata; -import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.CheckedRunnable; import org.opensearch.common.Nullable; import org.opensearch.common.blobstore.BlobContainer; import org.opensearch.common.blobstore.BlobMetadata; import org.opensearch.common.blobstore.BlobPath; -import org.opensearch.common.blobstore.BlobStore; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Setting.Property; @@ -61,6 +59,7 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.function.LongSupplier; @@ -82,6 +81,8 @@ public class RemoteClusterStateService implements Closeable { public static final String METADATA_MANIFEST_NAME_FORMAT = "%s"; + public static final int RETAINED_MANIFESTS = 10; + public static final String DELIMITER = "__"; public static final String CUSTOM_DELIMITER = "--"; @@ -206,7 +207,8 @@ public class RemoteClusterStateService implements Closeable { private volatile TimeValue indexMetadataUploadTimeout; private volatile TimeValue globalMetadataUploadTimeout; private volatile TimeValue metadataManifestUploadTimeout; - private RemoteClusterStateCleanupManager remoteClusterStateCleanupManager; + + private final AtomicBoolean deleteStaleMetadataRunning = new AtomicBoolean(false); private final RemotePersistenceStats remoteStateStats; private final String CLUSTER_STATE_UPLOAD_TIME_LOG_STRING = "writing cluster state for version [{}] took [{}ms]"; private final String METADATA_UPDATE_LOG_STRING = "wrote metadata for [{}] indices and skipped [{}] unchanged " @@ -230,7 +232,7 @@ public RemoteClusterStateService( String nodeId, Supplier repositoriesService, Settings settings, - ClusterService clusterService, + ClusterSettings clusterSettings, LongSupplier relativeTimeNanosSupplier, ThreadPool threadPool, List indexMetadataUploadListeners @@ -241,7 +243,6 @@ public RemoteClusterStateService( this.settings = settings; this.relativeTimeNanosSupplier = relativeTimeNanosSupplier; this.threadpool = threadPool; - ClusterSettings clusterSettings = clusterService.getClusterSettings(); this.slowWriteLoggingThreshold = clusterSettings.get(SLOW_WRITE_LOGGING_THRESHOLD); this.indexMetadataUploadTimeout = clusterSettings.get(INDEX_METADATA_UPLOAD_TIMEOUT_SETTING); this.globalMetadataUploadTimeout = clusterSettings.get(GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING); @@ -251,10 +252,16 @@ public RemoteClusterStateService( clusterSettings.addSettingsUpdateConsumer(GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING, this::setGlobalMetadataUploadTimeout); clusterSettings.addSettingsUpdateConsumer(METADATA_MANIFEST_UPLOAD_TIMEOUT_SETTING, this::setMetadataManifestUploadTimeout); this.remoteStateStats = new RemotePersistenceStats(); - this.remoteClusterStateCleanupManager = new RemoteClusterStateCleanupManager(this, clusterService); this.indexMetadataUploadListeners = indexMetadataUploadListeners; } + private BlobStoreTransferService getBlobStoreTransferService() { + if (blobStoreTransferService == null) { + blobStoreTransferService = new BlobStoreTransferService(blobStoreRepository.blobStore(), threadpool); + } + return blobStoreTransferService; + } + /** * This method uploads entire cluster state metadata to the configured blob store. For now only index metadata upload is supported. This method should be * invoked by the elected cluster manager when the remote cluster state is enabled. @@ -410,6 +417,7 @@ public ClusterMetadataManifest writeIncrementalMetadata( : previousManifest.getCustomMetadataMap(), false ); + deleteStaleClusterMetadata(clusterState.getClusterName().value(), clusterState.metadata().clusterUUID(), RETAINED_MANIFESTS); final long durationMillis = TimeValue.nsecToMSec(relativeTimeNanosSupplier.getAsLong() - startTimeNanos); remoteStateStats.stateSucceeded(); @@ -713,10 +721,6 @@ private CheckedRunnable getAsyncMetadataWriteAction( ); } - public RemoteClusterStateCleanupManager getCleanupManager() { - return remoteClusterStateCleanupManager; - } - @Nullable public ClusterMetadataManifest markLastStateAsCommitted(ClusterState clusterState, ClusterMetadataManifest previousManifest) throws IOException { @@ -736,16 +740,12 @@ public ClusterMetadataManifest markLastStateAsCommitted(ClusterState clusterStat previousManifest.getCustomMetadataMap(), true ); - if (!previousManifest.isClusterUUIDCommitted() && committedManifest.isClusterUUIDCommitted()) { - remoteClusterStateCleanupManager.deleteStaleClusterUUIDs(clusterState, committedManifest); - } - + deleteStaleClusterUUIDs(clusterState, committedManifest); return committedManifest; } @Override public void close() throws IOException { - remoteClusterStateCleanupManager.close(); if (blobStoreRepository != null) { IOUtils.close(blobStoreRepository); } @@ -760,7 +760,6 @@ public void start() { final Repository repository = repositoriesService.get().repository(remoteStoreRepo); assert repository instanceof BlobStoreRepository : "Repository should be instance of BlobStoreRepository"; blobStoreRepository = (BlobStoreRepository) repository; - remoteClusterStateCleanupManager.start(); } private ClusterMetadataManifest uploadManifest( @@ -851,14 +850,6 @@ private void writeMetadataManifest(String clusterName, String clusterUUID, Clust ); } - ThreadPool getThreadpool() { - return threadpool; - } - - BlobStore getBlobStore() { - return blobStoreRepository.blobStore(); - } - private BlobContainer indexMetadataContainer(String clusterName, String clusterUUID, String indexUUID) { // 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/index/ftqsCnn9TgOX return blobStoreRepository.blobStore() @@ -876,7 +867,7 @@ private BlobContainer manifestContainer(String clusterName, String clusterUUID) return blobStoreRepository.blobStore().blobContainer(getManifestFolderPath(clusterName, clusterUUID)); } - BlobPath getCusterMetadataBasePath(String clusterName, String clusterUUID) { + private BlobPath getCusterMetadataBasePath(String clusterName, String clusterUUID) { return blobStoreRepository.basePath().add(encodeString(clusterName)).add(CLUSTER_STATE_PATH_TOKEN).add(clusterUUID); } @@ -991,7 +982,7 @@ private static String metadataAttributeFileName(String componentPrefix, Long met ); } - BlobPath getManifestFolderPath(String clusterName, String clusterUUID) { + private BlobPath getManifestFolderPath(String clusterName, String clusterUUID) { return getCusterMetadataBasePath(clusterName, clusterUUID).add(MANIFEST_PATH_TOKEN); } @@ -1244,7 +1235,7 @@ public String getLastKnownUUIDFromRemote(String clusterName) { } } - Set getAllClusterUUIDs(String clusterName) throws IOException { + private Set getAllClusterUUIDs(String clusterName) throws IOException { Map clusterUUIDMetadata = clusterUUIDContainer(clusterName).children(); if (clusterUUIDMetadata == null) { return Collections.emptySet(); @@ -1435,7 +1426,7 @@ private Optional getLatestManifestFileName(String clusterName, String cl * @param clusterName name of the cluster * @return ClusterMetadataManifest */ - ClusterMetadataManifest fetchRemoteClusterMetadataManifest(String clusterName, String clusterUUID, String filename) + private ClusterMetadataManifest fetchRemoteClusterMetadataManifest(String clusterName, String clusterUUID, String filename) throws IllegalStateException { try { return getClusterMetadataManifestBlobStoreFormat(filename).read( @@ -1495,6 +1486,234 @@ public RemoteStateTransferException(String errorDesc, Throwable cause) { } } + /** + * Purges all remote cluster state against provided cluster UUIDs + * + * @param clusterName name of the cluster + * @param clusterUUIDs clusteUUIDs for which the remote state needs to be purged + */ + void deleteStaleUUIDsClusterMetadata(String clusterName, List clusterUUIDs) { + clusterUUIDs.forEach(clusterUUID -> { + getBlobStoreTransferService().deleteAsync( + ThreadPool.Names.REMOTE_PURGE, + getCusterMetadataBasePath(clusterName, clusterUUID), + new ActionListener<>() { + @Override + public void onResponse(Void unused) { + logger.info("Deleted all remote cluster metadata for cluster UUID - {}", clusterUUID); + } + + @Override + public void onFailure(Exception e) { + logger.error( + new ParameterizedMessage( + "Exception occurred while deleting all remote cluster metadata for cluster UUID {}", + clusterUUID + ), + e + ); + remoteStateStats.cleanUpAttemptFailed(); + } + } + ); + }); + } + + /** + * Deletes older than last {@code versionsToRetain} manifests. Also cleans up unreferenced IndexMetadata associated with older manifests + * + * @param clusterName name of the cluster + * @param clusterUUID uuid of cluster state to refer to in remote + * @param manifestsToRetain no of latest manifest files to keep in remote + */ + // package private for testing + void deleteStaleClusterMetadata(String clusterName, String clusterUUID, int manifestsToRetain) { + if (deleteStaleMetadataRunning.compareAndSet(false, true) == false) { + logger.info("Delete stale cluster metadata task is already in progress."); + return; + } + try { + getBlobStoreTransferService().listAllInSortedOrderAsync( + ThreadPool.Names.REMOTE_PURGE, + getManifestFolderPath(clusterName, clusterUUID), + "manifest", + Integer.MAX_VALUE, + new ActionListener<>() { + @Override + public void onResponse(List blobMetadata) { + if (blobMetadata.size() > manifestsToRetain) { + deleteClusterMetadata( + clusterName, + clusterUUID, + blobMetadata.subList(0, manifestsToRetain - 1), + blobMetadata.subList(manifestsToRetain - 1, blobMetadata.size()) + ); + } + deleteStaleMetadataRunning.set(false); + } + + @Override + public void onFailure(Exception e) { + logger.error( + new ParameterizedMessage( + "Exception occurred while deleting Remote Cluster Metadata for clusterUUIDs {}", + clusterUUID + ) + ); + deleteStaleMetadataRunning.set(false); + } + } + ); + } catch (Exception e) { + deleteStaleMetadataRunning.set(false); + throw e; + } + } + + private void deleteClusterMetadata( + String clusterName, + String clusterUUID, + List activeManifestBlobMetadata, + List staleManifestBlobMetadata + ) { + try { + Set filesToKeep = new HashSet<>(); + Set staleManifestPaths = new HashSet<>(); + Set staleIndexMetadataPaths = new HashSet<>(); + Set staleGlobalMetadataPaths = new HashSet<>(); + activeManifestBlobMetadata.forEach(blobMetadata -> { + ClusterMetadataManifest clusterMetadataManifest = fetchRemoteClusterMetadataManifest( + clusterName, + clusterUUID, + blobMetadata.name() + ); + clusterMetadataManifest.getIndices() + .forEach(uploadedIndexMetadata -> filesToKeep.add(uploadedIndexMetadata.getUploadedFilename())); + if (clusterMetadataManifest.getGlobalMetadataFileName() != null) { + filesToKeep.add(clusterMetadataManifest.getGlobalMetadataFileName()); + } else { + filesToKeep.add(clusterMetadataManifest.getCoordinationMetadata().getUploadedFilename()); + filesToKeep.add(clusterMetadataManifest.getTemplatesMetadata().getUploadedFilename()); + filesToKeep.add(clusterMetadataManifest.getSettingsMetadata().getUploadedFilename()); + clusterMetadataManifest.getCustomMetadataMap() + .forEach((key, value) -> { filesToKeep.add(value.getUploadedFilename()); }); + } + }); + staleManifestBlobMetadata.forEach(blobMetadata -> { + ClusterMetadataManifest clusterMetadataManifest = fetchRemoteClusterMetadataManifest( + clusterName, + clusterUUID, + blobMetadata.name() + ); + staleManifestPaths.add(new BlobPath().add(MANIFEST_PATH_TOKEN).buildAsString() + blobMetadata.name()); + if (clusterMetadataManifest.getGlobalMetadataFileName() != null) { + if (filesToKeep.contains(clusterMetadataManifest.getGlobalMetadataFileName()) == false) { + String[] globalMetadataSplitPath = clusterMetadataManifest.getGlobalMetadataFileName().split("/"); + staleGlobalMetadataPaths.add( + new BlobPath().add(GLOBAL_METADATA_PATH_TOKEN).buildAsString() + GLOBAL_METADATA_FORMAT.blobName( + globalMetadataSplitPath[globalMetadataSplitPath.length - 1] + ) + ); + } + } else { + if (filesToKeep.contains(clusterMetadataManifest.getCoordinationMetadata().getUploadedFilename()) == false) { + String[] coordinationMetadataSplitPath = clusterMetadataManifest.getCoordinationMetadata() + .getUploadedFilename() + .split("/"); + staleGlobalMetadataPaths.add( + new BlobPath().add(GLOBAL_METADATA_PATH_TOKEN).buildAsString() + GLOBAL_METADATA_FORMAT.blobName( + coordinationMetadataSplitPath[coordinationMetadataSplitPath.length - 1] + ) + ); + } + if (filesToKeep.contains(clusterMetadataManifest.getTemplatesMetadata().getUploadedFilename()) == false) { + String[] templatesMetadataSplitPath = clusterMetadataManifest.getTemplatesMetadata() + .getUploadedFilename() + .split("/"); + staleGlobalMetadataPaths.add( + new BlobPath().add(GLOBAL_METADATA_PATH_TOKEN).buildAsString() + GLOBAL_METADATA_FORMAT.blobName( + templatesMetadataSplitPath[templatesMetadataSplitPath.length - 1] + ) + ); + } + if (filesToKeep.contains(clusterMetadataManifest.getSettingsMetadata().getUploadedFilename()) == false) { + String[] settingsMetadataSplitPath = clusterMetadataManifest.getSettingsMetadata().getUploadedFilename().split("/"); + staleGlobalMetadataPaths.add( + new BlobPath().add(GLOBAL_METADATA_PATH_TOKEN).buildAsString() + GLOBAL_METADATA_FORMAT.blobName( + settingsMetadataSplitPath[settingsMetadataSplitPath.length - 1] + ) + ); + } + clusterMetadataManifest.getCustomMetadataMap().forEach((key, value) -> { + if (filesToKeep.contains(value.getUploadedFilename()) == false) { + String[] customMetadataSplitPath = value.getUploadedFilename().split("/"); + staleGlobalMetadataPaths.add( + new BlobPath().add(GLOBAL_METADATA_PATH_TOKEN).buildAsString() + GLOBAL_METADATA_FORMAT.blobName( + customMetadataSplitPath[customMetadataSplitPath.length - 1] + ) + ); + } + }); + } + + clusterMetadataManifest.getIndices().forEach(uploadedIndexMetadata -> { + if (filesToKeep.contains(uploadedIndexMetadata.getUploadedFilename()) == false) { + staleIndexMetadataPaths.add( + new BlobPath().add(INDEX_PATH_TOKEN).add(uploadedIndexMetadata.getIndexUUID()).buildAsString() + + INDEX_METADATA_FORMAT.blobName(uploadedIndexMetadata.getUploadedFilename()) + ); + } + }); + }); + + if (staleManifestPaths.isEmpty()) { + logger.debug("No stale Remote Cluster Metadata files found"); + return; + } + + deleteStalePaths(clusterName, clusterUUID, new ArrayList<>(staleGlobalMetadataPaths)); + deleteStalePaths(clusterName, clusterUUID, new ArrayList<>(staleIndexMetadataPaths)); + deleteStalePaths(clusterName, clusterUUID, new ArrayList<>(staleManifestPaths)); + } catch (IllegalStateException e) { + logger.error("Error while fetching Remote Cluster Metadata manifests", e); + } catch (IOException e) { + logger.error("Error while deleting stale Remote Cluster Metadata files", e); + remoteStateStats.cleanUpAttemptFailed(); + } catch (Exception e) { + logger.error("Unexpected error while deleting stale Remote Cluster Metadata files", e); + remoteStateStats.cleanUpAttemptFailed(); + } + } + + private void deleteStalePaths(String clusterName, String clusterUUID, List stalePaths) throws IOException { + logger.debug(String.format(Locale.ROOT, "Deleting stale files from remote - %s", stalePaths)); + getBlobStoreTransferService().deleteBlobs(getCusterMetadataBasePath(clusterName, clusterUUID), stalePaths); + } + + /** + * Purges all remote cluster state against provided cluster UUIDs + * + * @param clusterState current state of the cluster + * @param committedManifest last committed ClusterMetadataManifest + */ + public void deleteStaleClusterUUIDs(ClusterState clusterState, ClusterMetadataManifest committedManifest) { + threadpool.executor(ThreadPool.Names.REMOTE_PURGE).execute(() -> { + String clusterName = clusterState.getClusterName().value(); + logger.debug("Deleting stale cluster UUIDs data from remote [{}]", clusterName); + Set allClustersUUIDsInRemote; + try { + allClustersUUIDsInRemote = new HashSet<>(getAllClusterUUIDs(clusterState.getClusterName().value())); + } catch (IOException e) { + logger.info(String.format(Locale.ROOT, "Error while fetching all cluster UUIDs for [%s]", clusterName)); + return; + } + // Retain last 2 cluster uuids data + allClustersUUIDsInRemote.remove(committedManifest.getClusterUUID()); + allClustersUUIDsInRemote.remove(committedManifest.getPreviousClusterUUID()); + deleteStaleUUIDsClusterMetadata(clusterName, new ArrayList<>(allClustersUUIDsInRemote)); + }); + } + public RemotePersistenceStats getStats() { return remoteStateStats; } diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 49545fa8a0c8b..76109ba10624a 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -138,7 +138,6 @@ import org.opensearch.gateway.MetaStateService; import org.opensearch.gateway.PersistedClusterStateService; import org.opensearch.gateway.ShardsBatchGatewayAllocator; -import org.opensearch.gateway.remote.RemoteClusterStateCleanupManager; import org.opensearch.gateway.remote.RemoteClusterStateService; import org.opensearch.http.HttpServerTransport; import org.opensearch.identity.IdentityService; @@ -753,7 +752,6 @@ protected Node( threadPool::relativeTimeInMillis ); final RemoteClusterStateService remoteClusterStateService; - final RemoteClusterStateCleanupManager remoteClusterStateCleanupManager; final RemoteIndexPathUploader remoteIndexPathUploader; if (isRemoteStoreClusterStateEnabled(settings)) { remoteIndexPathUploader = new RemoteIndexPathUploader( @@ -766,16 +764,14 @@ protected Node( nodeEnvironment.nodeId(), repositoriesServiceReference::get, settings, - clusterService, + clusterService.getClusterSettings(), threadPool::preciseRelativeTimeInNanos, threadPool, List.of(remoteIndexPathUploader) ); - remoteClusterStateCleanupManager = remoteClusterStateService.getCleanupManager(); } else { remoteClusterStateService = null; remoteIndexPathUploader = null; - remoteClusterStateCleanupManager = null; } // collect engine factory providers from plugins @@ -1380,7 +1376,6 @@ protected Node( b.bind(MetricsRegistry.class).toInstance(metricsRegistry); b.bind(RemoteClusterStateService.class).toProvider(() -> remoteClusterStateService); b.bind(RemoteIndexPathUploader.class).toProvider(() -> remoteIndexPathUploader); - b.bind(RemoteClusterStateCleanupManager.class).toProvider(() -> remoteClusterStateCleanupManager); b.bind(PersistedStateRegistry.class).toInstance(persistedStateRegistry); b.bind(SegmentReplicationStatsTracker.class).toInstance(segmentReplicationStatsTracker); b.bind(SearchRequestOperationsCompositeListenerFactory.class).toInstance(searchRequestOperationsCompositeListenerFactory); diff --git a/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java b/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java index 418e6d8de6adb..3ba98c44f8d3e 100644 --- a/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java +++ b/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java @@ -462,7 +462,9 @@ public void testDataOnlyNodePersistence() throws Exception { }); when(transportService.getThreadPool()).thenReturn(threadPool); ClusterService clusterService = mock(ClusterService.class); - when(clusterService.getClusterSettings()).thenReturn(new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)); + when(clusterService.getClusterSettings()).thenReturn( + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS) + ); final PersistedClusterStateService persistedClusterStateService = new PersistedClusterStateService( nodeEnvironment, xContentRegistry(), @@ -485,7 +487,7 @@ public void testDataOnlyNodePersistence() throws Exception { nodeEnvironment.nodeId(), repositoriesServiceSupplier, settings, - clusterService, + clusterSettings, () -> 0L, threadPool, List.of(new RemoteIndexPathUploader(threadPool, settings, repositoriesServiceSupplier, clusterSettings)) diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManagerTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManagerTests.java deleted file mode 100644 index 24fd1b164a4ff..0000000000000 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManagerTests.java +++ /dev/null @@ -1,446 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.gateway.remote; - -import org.opensearch.cluster.ClusterName; -import org.opensearch.cluster.ClusterState; -import org.opensearch.cluster.metadata.Metadata; -import org.opensearch.cluster.node.DiscoveryNodes; -import org.opensearch.cluster.service.ClusterApplierService; -import org.opensearch.cluster.service.ClusterService; -import org.opensearch.common.blobstore.BlobContainer; -import org.opensearch.common.blobstore.BlobMetadata; -import org.opensearch.common.blobstore.BlobPath; -import org.opensearch.common.blobstore.BlobStore; -import org.opensearch.common.blobstore.support.PlainBlobMetadata; -import org.opensearch.common.settings.ClusterSettings; -import org.opensearch.common.settings.Settings; -import org.opensearch.common.util.concurrent.AbstractAsyncTask; -import org.opensearch.core.action.ActionListener; -import org.opensearch.repositories.RepositoriesService; -import org.opensearch.repositories.blobstore.BlobStoreRepository; -import org.opensearch.repositories.fs.FsRepository; -import org.opensearch.test.OpenSearchTestCase; -import org.opensearch.test.VersionUtils; -import org.opensearch.threadpool.TestThreadPool; -import org.opensearch.threadpool.ThreadPool; -import org.junit.After; -import org.junit.Before; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashSet; -import java.util.List; -import java.util.Locale; -import java.util.Set; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Supplier; - -import static org.opensearch.gateway.remote.ClusterMetadataManifest.CODEC_V1; -import static org.opensearch.gateway.remote.ClusterMetadataManifest.CODEC_V2; -import static org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata; -import static org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadataAttribute; -import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.AsyncStaleFileDeletion; -import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.CLUSTER_STATE_CLEANUP_INTERVAL_DEFAULT; -import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING; -import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.RETAINED_MANIFESTS; -import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.SKIP_CLEANUP_STATE_CHANGES; -import static org.opensearch.gateway.remote.RemoteClusterStateService.CLUSTER_STATE_PATH_TOKEN; -import static org.opensearch.gateway.remote.RemoteClusterStateService.COORDINATION_METADATA; -import static org.opensearch.gateway.remote.RemoteClusterStateService.DELIMITER; -import static org.opensearch.gateway.remote.RemoteClusterStateService.GLOBAL_METADATA_PATH_TOKEN; -import static org.opensearch.gateway.remote.RemoteClusterStateService.INDEX_PATH_TOKEN; -import static org.opensearch.gateway.remote.RemoteClusterStateService.MANIFEST_FILE_PREFIX; -import static org.opensearch.gateway.remote.RemoteClusterStateService.MANIFEST_PATH_TOKEN; -import static org.opensearch.gateway.remote.RemoteClusterStateService.SETTING_METADATA; -import static org.opensearch.gateway.remote.RemoteClusterStateService.TEMPLATES_METADATA; -import static org.opensearch.gateway.remote.RemoteClusterStateService.encodeString; -import static org.opensearch.gateway.remote.RemoteClusterStateServiceTests.generateClusterStateWithOneIndex; -import static org.opensearch.gateway.remote.RemoteClusterStateServiceTests.nodesWithLocalNodeClusterManager; -import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY; -import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX; -import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyInt; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.doNothing; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -public class RemoteClusterStateCleanupManagerTests extends OpenSearchTestCase { - private RemoteClusterStateCleanupManager remoteClusterStateCleanupManager; - private Supplier repositoriesServiceSupplier; - private RepositoriesService repositoriesService; - private BlobStoreRepository blobStoreRepository; - private BlobStore blobStore; - private ClusterSettings clusterSettings; - private ClusterApplierService clusterApplierService; - private ClusterState clusterState; - private Metadata metadata; - private RemoteClusterStateService remoteClusterStateService; - private final ThreadPool threadPool = new TestThreadPool(getClass().getName()); - - @Before - public void setup() { - repositoriesServiceSupplier = mock(Supplier.class); - repositoriesService = mock(RepositoriesService.class); - when(repositoriesServiceSupplier.get()).thenReturn(repositoriesService); - - String stateRepoTypeAttributeKey = String.format( - Locale.getDefault(), - "node.attr." + REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT, - "remote_store_repository" - ); - String stateRepoSettingsAttributeKeyPrefix = String.format( - Locale.getDefault(), - "node.attr." + REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX, - "remote_store_repository" - ); - - Settings settings = Settings.builder() - .put("node.attr." + REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY, "remote_store_repository") - .put(stateRepoTypeAttributeKey, FsRepository.TYPE) - .put(stateRepoSettingsAttributeKeyPrefix + "location", "randomRepoPath") - .put(RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true) - .build(); - - clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); - clusterApplierService = mock(ClusterApplierService.class); - clusterState = mock(ClusterState.class); - metadata = mock(Metadata.class); - ClusterService clusterService = mock(ClusterService.class); - when(clusterService.getClusterSettings()).thenReturn(clusterSettings); - when(clusterState.getClusterName()).thenReturn(new ClusterName("test")); - when(metadata.clusterUUID()).thenReturn("testUUID"); - when(clusterState.metadata()).thenReturn(metadata); - when(clusterApplierService.state()).thenReturn(clusterState); - when(clusterService.getClusterApplierService()).thenReturn(clusterApplierService); - - blobStoreRepository = mock(BlobStoreRepository.class); - blobStore = mock(BlobStore.class); - when(blobStoreRepository.blobStore()).thenReturn(blobStore); - when(repositoriesService.repository("remote_store_repository")).thenReturn(blobStoreRepository); - - remoteClusterStateService = mock(RemoteClusterStateService.class); - when(remoteClusterStateService.getStats()).thenReturn(new RemotePersistenceStats()); - when(remoteClusterStateService.getThreadpool()).thenReturn(threadPool); - when(remoteClusterStateService.getBlobStore()).thenReturn(blobStore); - remoteClusterStateCleanupManager = new RemoteClusterStateCleanupManager(remoteClusterStateService, clusterService); - } - - @After - public void teardown() throws Exception { - super.tearDown(); - remoteClusterStateCleanupManager.close(); - threadPool.shutdown(); - } - - public void testDeleteClusterMetadata() throws IOException { - String clusterUUID = "clusterUUID"; - String clusterName = "test-cluster"; - List inactiveBlobs = Arrays.asList( - new PlainBlobMetadata("manifest1.dat", 1L), - new PlainBlobMetadata("manifest2.dat", 1L), - new PlainBlobMetadata("manifest3.dat", 1L) - ); - List activeBlobs = Arrays.asList( - new PlainBlobMetadata("manifest4.dat", 1L), - new PlainBlobMetadata("manifest5.dat", 1L) - ); - UploadedIndexMetadata index1Metadata = new UploadedIndexMetadata("index1", "indexUUID1", "index_metadata1"); - UploadedIndexMetadata index2Metadata = new UploadedIndexMetadata("index2", "indexUUID2", "index_metadata2"); - UploadedIndexMetadata index1UpdatedMetadata = new UploadedIndexMetadata("index1", "indexUUID1", "index_metadata1_updated"); - UploadedMetadataAttribute coordinationMetadata = new UploadedMetadataAttribute(COORDINATION_METADATA, "coordination_metadata"); - UploadedMetadataAttribute templateMetadata = new UploadedMetadataAttribute(TEMPLATES_METADATA, "template_metadata"); - UploadedMetadataAttribute settingMetadata = new UploadedMetadataAttribute(SETTING_METADATA, "settings_metadata"); - UploadedMetadataAttribute coordinationMetadataUpdated = new UploadedMetadataAttribute( - COORDINATION_METADATA, - "coordination_metadata_updated" - ); - UploadedMetadataAttribute templateMetadataUpdated = new UploadedMetadataAttribute(TEMPLATES_METADATA, "template_metadata_updated"); - UploadedMetadataAttribute settingMetadataUpdated = new UploadedMetadataAttribute(SETTING_METADATA, "settings_metadata_updated"); - ClusterMetadataManifest manifest1 = ClusterMetadataManifest.builder() - .indices(List.of(index1Metadata)) - .globalMetadataFileName("global_metadata") - .clusterTerm(1L) - .stateVersion(1L) - .codecVersion(CODEC_V1) - .stateUUID(randomAlphaOfLength(10)) - .clusterUUID(clusterUUID) - .nodeId("nodeA") - .opensearchVersion(VersionUtils.randomOpenSearchVersion(random())) - .previousClusterUUID(ClusterState.UNKNOWN_UUID) - .committed(true) - .build(); - ClusterMetadataManifest manifest2 = ClusterMetadataManifest.builder(manifest1) - .indices(List.of(index1Metadata, index2Metadata)) - .codecVersion(CODEC_V2) - .globalMetadataFileName(null) - .coordinationMetadata(coordinationMetadata) - .templatesMetadata(templateMetadata) - .settingMetadata(settingMetadata) - .build(); - ClusterMetadataManifest manifest3 = ClusterMetadataManifest.builder(manifest2) - .indices(List.of(index1UpdatedMetadata, index2Metadata)) - .settingMetadata(settingMetadataUpdated) - .build(); - - // active manifest have reference to index1Updated, index2, settingsUpdated, coordinationUpdated, templates, templatesUpdated - ClusterMetadataManifest manifest4 = ClusterMetadataManifest.builder(manifest3) - .coordinationMetadata(coordinationMetadataUpdated) - .build(); - ClusterMetadataManifest manifest5 = ClusterMetadataManifest.builder(manifest4).templatesMetadata(templateMetadataUpdated).build(); - - when(remoteClusterStateService.fetchRemoteClusterMetadataManifest(eq(clusterName), eq(clusterUUID), any())).thenReturn( - manifest4, - manifest5, - manifest1, - manifest2, - manifest3 - ); - BlobContainer container = mock(BlobContainer.class); - when(blobStore.blobContainer(any())).thenReturn(container); - doNothing().when(container).deleteBlobsIgnoringIfNotExists(any()); - - remoteClusterStateCleanupManager.deleteClusterMetadata(clusterName, clusterUUID, activeBlobs, inactiveBlobs); - verify(container).deleteBlobsIgnoringIfNotExists( - List.of( - new BlobPath().add(GLOBAL_METADATA_PATH_TOKEN).buildAsString() + coordinationMetadata.getUploadedFilename() + ".dat", - new BlobPath().add(GLOBAL_METADATA_PATH_TOKEN).buildAsString() + settingMetadata.getUploadedFilename() + ".dat", - new BlobPath().add(GLOBAL_METADATA_PATH_TOKEN).buildAsString() + "global_metadata.dat" - ) - ); - verify(container).deleteBlobsIgnoringIfNotExists( - List.of( - new BlobPath().add(INDEX_PATH_TOKEN).add(index1Metadata.getIndexUUID()).buildAsString() - + index1Metadata.getUploadedFilePath() - + ".dat" - ) - ); - Set staleManifest = new HashSet<>(); - inactiveBlobs.forEach(blob -> staleManifest.add(new BlobPath().add(MANIFEST_PATH_TOKEN).buildAsString() + blob.name())); - verify(container).deleteBlobsIgnoringIfNotExists(new ArrayList<>(staleManifest)); - } - - public void testDeleteStaleClusterUUIDs() throws IOException { - final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build(); - ClusterMetadataManifest clusterMetadataManifest = ClusterMetadataManifest.builder() - .indices(List.of()) - .clusterTerm(1L) - .stateVersion(1L) - .stateUUID(randomAlphaOfLength(10)) - .clusterUUID("cluster-uuid1") - .nodeId("nodeA") - .opensearchVersion(VersionUtils.randomOpenSearchVersion(random())) - .previousClusterUUID(ClusterState.UNKNOWN_UUID) - .committed(true) - .build(); - - BlobPath blobPath = new BlobPath().add("random-path"); - BlobContainer uuidContainerContainer = mock(BlobContainer.class); - BlobContainer manifest2Container = mock(BlobContainer.class); - BlobContainer manifest3Container = mock(BlobContainer.class); - when(blobStore.blobContainer(any())).then(invocation -> { - BlobPath blobPath1 = invocation.getArgument(0); - if (blobPath1.buildAsString().endsWith("cluster-state/")) { - return uuidContainerContainer; - } else if (blobPath1.buildAsString().contains("cluster-state/cluster-uuid2/")) { - return manifest2Container; - } else if (blobPath1.buildAsString().contains("cluster-state/cluster-uuid3/")) { - return manifest3Container; - } else { - throw new IllegalArgumentException("Unexpected blob path " + blobPath1); - } - }); - when( - manifest2Container.listBlobsByPrefixInSortedOrder( - MANIFEST_FILE_PREFIX + DELIMITER, - Integer.MAX_VALUE, - BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC - ) - ).thenReturn(List.of(new PlainBlobMetadata("mainfest2", 1L))); - when( - manifest3Container.listBlobsByPrefixInSortedOrder( - MANIFEST_FILE_PREFIX + DELIMITER, - Integer.MAX_VALUE, - BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC - ) - ).thenReturn(List.of(new PlainBlobMetadata("mainfest3", 1L))); - Set uuids = new HashSet<>(Arrays.asList("cluster-uuid1", "cluster-uuid2", "cluster-uuid3")); - when(remoteClusterStateService.getAllClusterUUIDs(any())).thenReturn(uuids); - when(remoteClusterStateService.getCusterMetadataBasePath(any(), any())).then( - invocationOnMock -> blobPath.add(encodeString(invocationOnMock.getArgument(0))) - .add(CLUSTER_STATE_PATH_TOKEN) - .add((String) invocationOnMock.getArgument(1)) - ); - remoteClusterStateCleanupManager.start(); - remoteClusterStateCleanupManager.deleteStaleClusterUUIDs(clusterState, clusterMetadataManifest); - try { - assertBusy(() -> { - verify(manifest2Container, times(1)).delete(); - verify(manifest3Container, times(1)).delete(); - }); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - public void testRemoteStateCleanupFailureStats() throws IOException { - BlobContainer blobContainer = mock(BlobContainer.class); - doThrow(IOException.class).when(blobContainer).delete(); - when(blobStore.blobContainer(any())).thenReturn(blobContainer); - BlobPath blobPath = new BlobPath().add("random-path"); - when((blobStoreRepository.basePath())).thenReturn(blobPath); - remoteClusterStateCleanupManager.start(); - remoteClusterStateCleanupManager.deleteStaleUUIDsClusterMetadata("cluster1", List.of("cluster-uuid1")); - try { - assertBusy(() -> { - // wait for stats to get updated - assertNotNull(remoteClusterStateCleanupManager.getStats()); - assertEquals(0, remoteClusterStateCleanupManager.getStats().getSuccessCount()); - assertEquals(1, remoteClusterStateCleanupManager.getStats().getCleanupAttemptFailedCount()); - }); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - public void testSingleConcurrentExecutionOfStaleManifestCleanup() throws Exception { - BlobContainer blobContainer = mock(BlobContainer.class); - when(blobStore.blobContainer(any())).thenReturn(blobContainer); - - CountDownLatch latch = new CountDownLatch(1); - AtomicInteger callCount = new AtomicInteger(0); - doAnswer(invocation -> { - callCount.incrementAndGet(); - if (latch.await(5000, TimeUnit.SECONDS) == false) { - throw new Exception("Timed out waiting for delete task queuing to complete"); - } - return null; - }).when(blobContainer) - .listBlobsByPrefixInSortedOrder( - any(String.class), - any(int.class), - any(BlobContainer.BlobNameSortOrder.class), - any(ActionListener.class) - ); - - remoteClusterStateCleanupManager.start(); - remoteClusterStateCleanupManager.deleteStaleClusterMetadata("cluster-name", "cluster-uuid", RETAINED_MANIFESTS); - remoteClusterStateCleanupManager.deleteStaleClusterMetadata("cluster-name", "cluster-uuid", RETAINED_MANIFESTS); - - latch.countDown(); - assertBusy(() -> assertEquals(1, callCount.get())); - } - - public void testRemoteClusterStateCleanupSetting() { - remoteClusterStateCleanupManager.start(); - // verify default value - assertEquals(CLUSTER_STATE_CLEANUP_INTERVAL_DEFAULT, remoteClusterStateCleanupManager.getStaleFileCleanupInterval()); - - // verify update interval - int cleanupInterval = randomIntBetween(1, 10); - Settings newSettings = Settings.builder().put("cluster.remote_store.state.cleanup_interval", cleanupInterval + "s").build(); - clusterSettings.applySettings(newSettings); - assertEquals(cleanupInterval, remoteClusterStateCleanupManager.getStaleFileCleanupInterval().seconds()); - } - - public void testRemoteCleanupTaskScheduled() { - AbstractAsyncTask cleanupTask = remoteClusterStateCleanupManager.getStaleFileDeletionTask(); - assertNull(cleanupTask); - // now the task should be initialized - remoteClusterStateCleanupManager.start(); - assertNotNull(remoteClusterStateCleanupManager.getStaleFileDeletionTask()); - assertTrue(remoteClusterStateCleanupManager.getStaleFileDeletionTask().mustReschedule()); - assertEquals( - clusterSettings.get(REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING), - remoteClusterStateCleanupManager.getStaleFileDeletionTask().getInterval() - ); - assertTrue(remoteClusterStateCleanupManager.getStaleFileDeletionTask().isScheduled()); - assertFalse(remoteClusterStateCleanupManager.getStaleFileDeletionTask().isClosed()); - } - - public void testRemoteCleanupSkipsOnOnlyElectedClusterManager() { - DiscoveryNodes nodes = mock(DiscoveryNodes.class); - when(nodes.isLocalNodeElectedClusterManager()).thenReturn(false); - when(clusterState.nodes()).thenReturn(nodes); - RemoteClusterStateCleanupManager spyManager = spy(remoteClusterStateCleanupManager); - AtomicInteger callCount = new AtomicInteger(0); - doAnswer(invocation -> callCount.incrementAndGet()).when(spyManager).deleteStaleClusterMetadata(any(), any(), anyInt()); - spyManager.cleanUpStaleFiles(); - assertEquals(0, callCount.get()); - - when(nodes.isLocalNodeElectedClusterManager()).thenReturn(true); - when(clusterState.version()).thenReturn(randomLongBetween(11, 20)); - spyManager.cleanUpStaleFiles(); - assertEquals(1, callCount.get()); - } - - public void testRemoteCleanupSkipsIfVersionIncrementLessThanThreshold() { - DiscoveryNodes nodes = mock(DiscoveryNodes.class); - long version = randomLongBetween(1, SKIP_CLEANUP_STATE_CHANGES); - when(clusterApplierService.state()).thenReturn(clusterState); - when(nodes.isLocalNodeElectedClusterManager()).thenReturn(true); - when(clusterState.nodes()).thenReturn(nodes); - when(clusterState.version()).thenReturn(version); - - RemoteClusterStateCleanupManager spyManager = spy(remoteClusterStateCleanupManager); - AtomicInteger callCount = new AtomicInteger(0); - doAnswer(invocation -> callCount.incrementAndGet()).when(spyManager).deleteStaleClusterMetadata(any(), any(), anyInt()); - - remoteClusterStateCleanupManager.cleanUpStaleFiles(); - assertEquals(0, callCount.get()); - } - - public void testRemoteCleanupCallsDeleteIfVersionIncrementGreaterThanThreshold() { - DiscoveryNodes nodes = mock(DiscoveryNodes.class); - long version = randomLongBetween(SKIP_CLEANUP_STATE_CHANGES + 1, SKIP_CLEANUP_STATE_CHANGES + 10); - when(clusterApplierService.state()).thenReturn(clusterState); - when(nodes.isLocalNodeElectedClusterManager()).thenReturn(true); - when(clusterState.nodes()).thenReturn(nodes); - when(clusterState.version()).thenReturn(version); - - RemoteClusterStateCleanupManager spyManager = spy(remoteClusterStateCleanupManager); - AtomicInteger callCount = new AtomicInteger(0); - doAnswer(invocation -> callCount.incrementAndGet()).when(spyManager).deleteStaleClusterMetadata(any(), any(), anyInt()); - - // using spied cleanup manager so that stubbed deleteStaleClusterMetadata is called - spyManager.cleanUpStaleFiles(); - assertEquals(1, callCount.get()); - } - - public void testRemoteCleanupSchedulesEvenAfterFailure() { - remoteClusterStateCleanupManager.start(); - RemoteClusterStateCleanupManager spyManager = spy(remoteClusterStateCleanupManager); - AtomicInteger callCount = new AtomicInteger(0); - doAnswer(invocationOnMock -> { - callCount.incrementAndGet(); - throw new RuntimeException("Test exception"); - }).when(spyManager).cleanUpStaleFiles(); - AsyncStaleFileDeletion task = new AsyncStaleFileDeletion(spyManager); - assertTrue(task.isScheduled()); - task.run(); - // Task is still scheduled after the failure - assertTrue(task.isScheduled()); - assertEquals(1, callCount.get()); - - task.run(); - // Task is still scheduled after the failure - assertTrue(task.isScheduled()); - assertEquals(2, callCount.get()); - } -} diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java index 5f0c371a3137e..1b242b921c0d7 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java @@ -19,7 +19,6 @@ import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.metadata.TemplatesMetadata; import org.opensearch.cluster.node.DiscoveryNodes; -import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer; import org.opensearch.common.blobstore.BlobContainer; import org.opensearch.common.blobstore.BlobMetadata; @@ -73,6 +72,9 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; import java.util.function.Function; import java.util.function.Supplier; @@ -91,6 +93,7 @@ import static org.opensearch.gateway.remote.RemoteClusterStateService.MANIFEST_CURRENT_CODEC_VERSION; import static org.opensearch.gateway.remote.RemoteClusterStateService.MANIFEST_FILE_PREFIX; import static org.opensearch.gateway.remote.RemoteClusterStateService.METADATA_FILE_PREFIX; +import static org.opensearch.gateway.remote.RemoteClusterStateService.RETAINED_MANIFESTS; import static org.opensearch.gateway.remote.RemoteClusterStateService.SETTING_METADATA; import static org.opensearch.gateway.remote.RemoteClusterStateService.TEMPLATES_METADATA; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY; @@ -106,12 +109,13 @@ import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class RemoteClusterStateServiceTests extends OpenSearchTestCase { private RemoteClusterStateService remoteClusterStateService; - private ClusterService clusterService; private ClusterSettings clusterSettings; private Supplier repositoriesServiceSupplier; private RepositoriesService repositoriesService; @@ -144,8 +148,6 @@ public void setup() { .build(); clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); - clusterService = mock(ClusterService.class); - when(clusterService.getClusterSettings()).thenReturn(clusterSettings); NamedXContentRegistry xContentRegistry = new NamedXContentRegistry( Stream.of( NetworkModule.getNamedXContents().stream(), @@ -163,7 +165,7 @@ public void setup() { "test-node-id", repositoriesServiceSupplier, settings, - clusterService, + clusterSettings, () -> 0L, threadPool, List.of(new RemoteIndexPathUploader(threadPool, settings, repositoriesServiceSupplier, clusterSettings)) @@ -185,14 +187,14 @@ public void testFailWriteFullMetadataNonClusterManagerNode() throws IOException public void testFailInitializationWhenRemoteStateDisabled() { final Settings settings = Settings.builder().build(); - when(clusterService.getClusterSettings()).thenReturn(new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)); + ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); assertThrows( AssertionError.class, () -> new RemoteClusterStateService( "test-node-id", repositoriesServiceSupplier, settings, - clusterService, + clusterSettings, () -> 0L, threadPool, List.of(new RemoteIndexPathUploader(threadPool, settings, repositoriesServiceSupplier, clusterSettings)) @@ -1278,6 +1280,72 @@ public void testGetValidPreviousClusterUUIDWhenLastUUIDUncommitted() throws IOEx assertThat(previousClusterUUID, equalTo("cluster-uuid2")); } + public void testDeleteStaleClusterUUIDs() throws IOException { + final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build(); + ClusterMetadataManifest clusterMetadataManifest = ClusterMetadataManifest.builder() + .indices(List.of()) + .clusterTerm(1L) + .stateVersion(1L) + .stateUUID(randomAlphaOfLength(10)) + .clusterUUID("cluster-uuid1") + .nodeId("nodeA") + .opensearchVersion(VersionUtils.randomOpenSearchVersion(random())) + .previousClusterUUID(ClusterState.UNKNOWN_UUID) + .committed(true) + .build(); + + BlobPath blobPath = new BlobPath().add("random-path"); + when((blobStoreRepository.basePath())).thenReturn(blobPath); + BlobContainer uuidContainerContainer = mock(BlobContainer.class); + BlobContainer manifest2Container = mock(BlobContainer.class); + BlobContainer manifest3Container = mock(BlobContainer.class); + when(blobStore.blobContainer(any())).then(invocation -> { + BlobPath blobPath1 = invocation.getArgument(0); + if (blobPath1.buildAsString().endsWith("cluster-state/")) { + return uuidContainerContainer; + } else if (blobPath1.buildAsString().contains("cluster-state/cluster-uuid2/")) { + return manifest2Container; + } else if (blobPath1.buildAsString().contains("cluster-state/cluster-uuid3/")) { + return manifest3Container; + } else { + throw new IllegalArgumentException("Unexpected blob path " + blobPath1); + } + }); + Map blobMetadataMap = Map.of( + "cluster-uuid1", + mock(BlobContainer.class), + "cluster-uuid2", + mock(BlobContainer.class), + "cluster-uuid3", + mock(BlobContainer.class) + ); + when(uuidContainerContainer.children()).thenReturn(blobMetadataMap); + when( + manifest2Container.listBlobsByPrefixInSortedOrder( + MANIFEST_FILE_PREFIX + DELIMITER, + Integer.MAX_VALUE, + BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC + ) + ).thenReturn(List.of(new PlainBlobMetadata("mainfest2", 1L))); + when( + manifest3Container.listBlobsByPrefixInSortedOrder( + MANIFEST_FILE_PREFIX + DELIMITER, + Integer.MAX_VALUE, + BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC + ) + ).thenReturn(List.of(new PlainBlobMetadata("mainfest3", 1L))); + remoteClusterStateService.start(); + remoteClusterStateService.deleteStaleClusterUUIDs(clusterState, clusterMetadataManifest); + try { + assertBusy(() -> { + verify(manifest2Container, times(1)).delete(); + verify(manifest3Container, times(1)).delete(); + }); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + public void testRemoteStateStats() throws IOException { final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build(); mockBlobStoreObjects(); @@ -1290,6 +1358,26 @@ public void testRemoteStateStats() throws IOException { assertEquals(0, remoteClusterStateService.getStats().getFailedCount()); } + public void testRemoteStateCleanupFailureStats() throws IOException { + BlobContainer blobContainer = mock(BlobContainer.class); + doThrow(IOException.class).when(blobContainer).delete(); + when(blobStore.blobContainer(any())).thenReturn(blobContainer); + BlobPath blobPath = new BlobPath().add("random-path"); + when((blobStoreRepository.basePath())).thenReturn(blobPath); + remoteClusterStateService.start(); + remoteClusterStateService.deleteStaleUUIDsClusterMetadata("cluster1", Arrays.asList("cluster-uuid1")); + try { + assertBusy(() -> { + // wait for stats to get updated + assertTrue(remoteClusterStateService.getStats() != null); + assertEquals(0, remoteClusterStateService.getStats().getSuccessCount()); + assertEquals(1, remoteClusterStateService.getStats().getCleanupAttemptFailedCount()); + }); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + public void testFileNames() { final Index index = new Index("test-index", "index-uuid"); final Settings idxSettings = Settings.builder() @@ -1330,6 +1418,36 @@ private void verifyManifestFileNameWithCodec(int codecVersion) { assertThat(splittedName[3], is("P")); } + public void testSingleConcurrentExecutionOfStaleManifestCleanup() throws Exception { + BlobContainer blobContainer = mock(BlobContainer.class); + BlobPath blobPath = new BlobPath().add("random-path"); + when((blobStoreRepository.basePath())).thenReturn(blobPath); + when(blobStore.blobContainer(any())).thenReturn(blobContainer); + + CountDownLatch latch = new CountDownLatch(1); + AtomicInteger callCount = new AtomicInteger(0); + doAnswer(invocation -> { + callCount.incrementAndGet(); + if (latch.await(5000, TimeUnit.SECONDS) == false) { + throw new Exception("Timed out waiting for delete task queuing to complete"); + } + return null; + }).when(blobContainer) + .listBlobsByPrefixInSortedOrder( + any(String.class), + any(int.class), + any(BlobContainer.BlobNameSortOrder.class), + any(ActionListener.class) + ); + + remoteClusterStateService.start(); + remoteClusterStateService.deleteStaleClusterMetadata("cluster-name", "cluster-uuid", RETAINED_MANIFESTS); + remoteClusterStateService.deleteStaleClusterMetadata("cluster-name", "cluster-uuid", RETAINED_MANIFESTS); + + latch.countDown(); + assertBusy(() -> assertEquals(1, callCount.get())); + } + public void testIndexMetadataUploadWaitTimeSetting() { // verify default value assertEquals( @@ -1773,7 +1891,7 @@ private static ClusterState.Builder generateClusterStateWithGlobalMetadata() { ); } - static ClusterState.Builder generateClusterStateWithOneIndex() { + private static ClusterState.Builder generateClusterStateWithOneIndex() { final Index index = new Index("test-index", "index-uuid"); final Settings idxSettings = Settings.builder() .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) @@ -1803,7 +1921,7 @@ static ClusterState.Builder generateClusterStateWithOneIndex() { ); } - static DiscoveryNodes nodesWithLocalNodeClusterManager() { + private static DiscoveryNodes nodesWithLocalNodeClusterManager() { return DiscoveryNodes.builder().clusterManagerNodeId("cluster-manager-id").localNodeId("cluster-manager-id").build(); }