From 21a73e9504e834ec16ad7621ea987027675066d2 Mon Sep 17 00:00:00 2001 From: bansvaru Date: Wed, 6 Sep 2023 23:48:37 +0530 Subject: [PATCH 01/13] Remove stale remote cluster state Signed-off-by: bansvaru --- .../remote/RemoteClusterStateServiceIT.java | 107 ++++++++++++ .../remote/ClusterMetadataManifest.java | 9 +- .../remote/RemoteClusterStateService.java | 153 ++++++++++++++---- .../recovery/RemoteStoreRestoreService.java | 2 +- .../main/java/org/opensearch/node/Node.java | 3 +- .../GatewayMetaStatePersistedStateTests.java | 16 +- .../RemoteClusterStateServiceTests.java | 18 ++- .../gateway/MockGatewayMetaState.java | 8 + 8 files changed, 278 insertions(+), 38 deletions(-) create mode 100644 server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateServiceIT.java diff --git a/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateServiceIT.java b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateServiceIT.java new file mode 100644 index 0000000000000..fc8eccef6c801 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateServiceIT.java @@ -0,0 +1,107 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote; + +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.settings.Settings; +import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase; +import org.opensearch.repositories.RepositoriesService; +import org.opensearch.repositories.blobstore.BlobStoreRepository; + +import java.nio.charset.StandardCharsets; +import java.util.Base64; +import java.util.Map; + +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; +import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING; +import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_REPOSITORY_SETTING; + +public class RemoteClusterStateServiceIT extends RemoteStoreBaseIntegTestCase { + + private static String INDEX_NAME = "test-index"; + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal)) + .put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true) + .put(REMOTE_CLUSTER_STATE_REPOSITORY_SETTING.getKey(), REPOSITORY_NAME) + .build(); + } + + private void prepareCluster(int numClusterManagerNodes, int numDataOnlyNodes, String indices, int replicaCount, int shardCount) { + internalCluster().startClusterManagerOnlyNodes(numClusterManagerNodes); + internalCluster().startDataOnlyNodes(numDataOnlyNodes); + for (String index : indices.split(",")) { + createIndex(index, remoteStoreIndexSettings(replicaCount, shardCount)); + ensureYellowAndNoInitializingShards(index); + ensureGreen(index); + } + } + + private Map initialTestSetup(int shardCount, int replicaCount, int dataNodeCount, int clusterManagerNodeCount) { + prepareCluster(clusterManagerNodeCount, dataNodeCount, INDEX_NAME, replicaCount, shardCount); + Map indexStats = indexData(1, false, INDEX_NAME); + assertEquals(shardCount * (replicaCount + 1), getNumShards(INDEX_NAME).totalNumShards); + ensureGreen(INDEX_NAME); + return indexStats; + } + + public void testFullClusterRestoreStaleDelete() throws Exception { + int shardCount = randomIntBetween(1, 2); + int replicaCount = 1; + int dataNodeCount = shardCount * (replicaCount + 1); + int clusterManagerNodeCount = 1; + + initialTestSetup(shardCount, replicaCount, dataNodeCount, clusterManagerNodeCount); + setReplicaCount(0); + setReplicaCount(1); + setReplicaCount(0); + setReplicaCount(1); + setReplicaCount(0); + setReplicaCount(1); + setReplicaCount(0); + setReplicaCount(1); + setReplicaCount(0); + + RemoteClusterStateService remoteClusterStateService = internalCluster().getClusterManagerNodeInstance( + RemoteClusterStateService.class + ); + + RepositoriesService repositoriesService = internalCluster().getClusterManagerNodeInstance(RepositoriesService.class); + + BlobStoreRepository repository = (BlobStoreRepository) repositoriesService.repository(REPOSITORY_NAME); + BlobPath baseMetadataPath = repository.basePath() + .add( + Base64.getUrlEncoder() + .withoutPadding() + .encodeToString(getClusterState().getClusterName().value().getBytes(StandardCharsets.UTF_8)) + ) + .add("cluster-state") + .add(getClusterState().metadata().clusterUUID()); + + assertEquals(repository.blobStore().blobContainer(baseMetadataPath.add("manifest")).listBlobsByPrefix("manifest").size(), 4); + + Map indexMetadataMap = remoteClusterStateService.getLatestIndexMetadata( + cluster().getClusterName(), + getClusterState().metadata().clusterUUID() + ); + assertEquals(indexMetadataMap.values().stream().findFirst().get().getNumberOfReplicas(), 0); + assertEquals(indexMetadataMap.values().stream().findFirst().get().getNumberOfShards(), shardCount); + } + + private void setReplicaCount(int replicaCount) { + client().admin() + .indices() + .prepareUpdateSettings(INDEX_NAME) + .setSettings(Settings.builder().put(SETTING_NUMBER_OF_REPLICAS, replicaCount)) + .get(); + } +} diff --git a/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java b/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java index 0ebbdc81661ad..040c0663efbd9 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java +++ b/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java @@ -423,10 +423,15 @@ public UploadedIndexMetadata(StreamInput in) throws IOException { this.uploadedFilename = in.readString(); } - public String getUploadedFilename() { + public String getUploadedFilePath() { return uploadedFilename; } + public String getUploadedFilename() { + String[] splitPath = uploadedFilename.split("/"); + return splitPath[splitPath.length - 1]; + } + public String getIndexName() { return indexName; } @@ -440,7 +445,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws return builder.startObject() .field(INDEX_NAME_FIELD.getPreferredName(), getIndexName()) .field(INDEX_UUID_FIELD.getPreferredName(), getIndexUUID()) - .field(UPLOADED_FILENAME_FIELD.getPreferredName(), getUploadedFilename()) + .field(UPLOADED_FILENAME_FIELD.getPreferredName(), getUploadedFilePath()) .endObject(); } diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index 2aa3384b0f33a..60cab16dc8ee6 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -18,6 +18,7 @@ import org.opensearch.common.Nullable; import org.opensearch.common.blobstore.BlobContainer; import org.opensearch.common.blobstore.BlobMetadata; +import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Setting.Property; @@ -28,12 +29,14 @@ import org.opensearch.core.index.Index; import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata; import org.opensearch.index.remote.RemoteStoreUtils; +import org.opensearch.index.translog.transfer.BlobStoreTransferService; import org.opensearch.node.Node; import org.opensearch.node.remotestore.RemoteStoreNodeAttribute; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.Repository; import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat; +import org.opensearch.threadpool.ThreadPool; import java.io.Closeable; import java.io.IOException; @@ -42,6 +45,7 @@ import java.util.Base64; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Locale; import java.util.Map; @@ -69,6 +73,12 @@ public class RemoteClusterStateService implements Closeable { public static final String METADATA_MANIFEST_NAME_FORMAT = "%s"; + public static final int RETAINED_MANIFESTS = 3; + + public static final String DELIMITER = "__"; + + private static final Logger logger = LogManager.getLogger(RemoteClusterStateService.class); + public static final int INDEX_METADATA_UPLOAD_WAIT_MILLIS = 20000; public static final ChecksumBlobStoreFormat INDEX_METADATA_FORMAT = new ChecksumBlobStoreFormat<>( @@ -92,9 +102,6 @@ public class RemoteClusterStateService implements Closeable { Property.Final ); - private static final Logger logger = LogManager.getLogger(RemoteClusterStateService.class); - - public static final String DELIMITER = "__"; private static final String CLUSTER_STATE_PATH_TOKEN = "cluster-state"; private static final String INDEX_PATH_TOKEN = "index"; private static final String MANIFEST_PATH_TOKEN = "manifest"; @@ -105,6 +112,7 @@ public class RemoteClusterStateService implements Closeable { private final Supplier repositoriesService; private final Settings settings; private final LongSupplier relativeTimeNanosSupplier; + private final ThreadPool threadpool; private BlobStoreRepository blobStoreRepository; private volatile TimeValue slowWriteLoggingThreshold; @@ -113,13 +121,15 @@ public RemoteClusterStateService( Supplier repositoriesService, Settings settings, ClusterSettings clusterSettings, - LongSupplier relativeTimeNanosSupplier + LongSupplier relativeTimeNanosSupplier, + ThreadPool threadPool ) { assert isRemoteStoreClusterStateEnabled(settings) : "Remote cluster state is not enabled"; this.nodeId = nodeId; this.repositoriesService = repositoriesService; this.settings = settings; this.relativeTimeNanosSupplier = relativeTimeNanosSupplier; + this.threadpool = threadPool; this.slowWriteLoggingThreshold = clusterSettings.get(SLOW_WRITE_LOGGING_THRESHOLD); clusterSettings.addSettingsUpdateConsumer(SLOW_WRITE_LOGGING_THRESHOLD, this::setSlowWriteLoggingThreshold); } @@ -233,6 +243,7 @@ public ClusterMetadataManifest writeIncrementalMetadata( previousManifest.getPreviousClusterUUID(), false ); + deleteClusterMetadataMarker(clusterState.getClusterName().value(), clusterState.metadata().clusterUUID(), RETAINED_MANIFESTS); final long durationMillis = TimeValue.nsecToMSec(relativeTimeNanosSupplier.getAsLong() - startTimeNanos); if (durationMillis >= slowWriteLoggingThreshold.getMillis()) { logger.warn( @@ -439,26 +450,19 @@ private String fetchPreviousClusterUUID(String clusterName, String clusterUUID) private BlobContainer indexMetadataContainer(String clusterName, String clusterUUID, String indexUUID) { // 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/index/ftqsCnn9TgOX return blobStoreRepository.blobStore() - .blobContainer( - blobStoreRepository.basePath() - .add(encodeString(clusterName)) - .add(CLUSTER_STATE_PATH_TOKEN) - .add(clusterUUID) - .add(INDEX_PATH_TOKEN) - .add(indexUUID) - ); + .blobContainer(getCusterMetadataBasePath(clusterName, clusterUUID).add(INDEX_PATH_TOKEN).add(indexUUID)); } private BlobContainer manifestContainer(String clusterName, String clusterUUID) { // 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/manifest - return blobStoreRepository.blobStore() - .blobContainer( - blobStoreRepository.basePath() - .add(encodeString(clusterName)) - .add(CLUSTER_STATE_PATH_TOKEN) - .add(clusterUUID) - .add(MANIFEST_PATH_TOKEN) - ); + return blobStoreRepository.blobStore().blobContainer(getManifestFolderPath(clusterName, clusterUUID)); + } + + private BlobPath getCusterMetadataBasePath(String clusterName, String clusterUUID) { + return blobStoreRepository.basePath() + .add(encodeString(clusterName)) + .add(CLUSTER_STATE_PATH_TOKEN) + .add(clusterUUID); } private BlobContainer clusterUUIDContainer(String clusterName) { @@ -476,13 +480,12 @@ private void setSlowWriteLoggingThreshold(TimeValue slowWriteLoggingThreshold) { private static String getManifestFileName(long term, long version) { // 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/manifest/manifest_2147483642_2147483637_456536447 - return String.join( - DELIMITER, - MANIFEST_FILE_PREFIX, - RemoteStoreUtils.invertLong(term), - RemoteStoreUtils.invertLong(version), - RemoteStoreUtils.invertLong(System.currentTimeMillis()) - ); + return String.join(DELIMITER, getManifestFileNamePrefix(term, version), RemoteStoreUtils.invertLong(System.currentTimeMillis())); + } + + private static String getManifestFileNamePrefix(long term, long version) { + // 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/manifest/manifest_2147483642_2147483637 + return String.join(DELIMITER, MANIFEST_PATH_TOKEN, RemoteStoreUtils.invertLong(term), RemoteStoreUtils.invertLong(version)); } private static String indexMetadataFileName(IndexMetadata indexMetadata) { @@ -494,6 +497,10 @@ private static String indexMetadataFileName(IndexMetadata indexMetadata) { ); } + private BlobPath getManifestFolderPath(String clusterName, String clusterUUID) { + return getCusterMetadataBasePath(clusterName, clusterUUID).add(MANIFEST_PATH_TOKEN); + } + /** * Fetch latest index metadata from remote cluster state * @@ -710,4 +717,96 @@ public IndexMetadataTransferException(String errorDesc, Throwable cause) { super(errorDesc, cause); } } + + /** + * Deletes older than last {@code versionsToRetain} manifests. Also cleans up unreferenced IndexMetadata associated with older manifests + * @param clusterName name of the cluster + * @param clusterUUID uuid of cluster state to refer to in remote + * @param manifestsToRetain no of latest manifest files to keep in remote + */ + public void deleteClusterMetadataMarker(String clusterName, String clusterUUID, int manifestsToRetain) { + BlobStoreTransferService transferService = new BlobStoreTransferService(blobStoreRepository.blobStore(), threadpool); + + synchronized (this) { + transferService.listAllInSortedOrderAsync( + ThreadPool.Names.REMOTE_PURGE, + getManifestFolderPath(clusterName, clusterUUID), + MANIFEST_PATH_TOKEN, + Integer.MAX_VALUE, + new ActionListener<>() { + int evaluatedManifestCount = 1; + + @Override + public void onResponse(List blobMetadata) { + Set filesToKeep = new HashSet<>(); + List stalePaths = new ArrayList<>(); + blobMetadata.forEach(manifestBlobMetadata -> { + ClusterMetadataManifest clusterMetadataManifest = fetchRemoteClusterMetadataManifest( + clusterName, + clusterUUID, + manifestBlobMetadata.name() + ); + if (evaluatedManifestCount <= manifestsToRetain) { + clusterMetadataManifest.getIndices() + .forEach(uploadedIndexMetadata -> filesToKeep.add(uploadedIndexMetadata.getUploadedFilename())); + } else { + stalePaths.add(new BlobPath().add(MANIFEST_PATH_TOKEN).buildAsString() + manifestBlobMetadata.name()); + clusterMetadataManifest.getIndices().forEach(uploadedIndexMetadata -> { + if (filesToKeep.contains(uploadedIndexMetadata.getUploadedFilename()) == false) { + stalePaths.add( + new BlobPath().add(INDEX_PATH_TOKEN).add(uploadedIndexMetadata.getIndexUUID()).buildAsString() + + uploadedIndexMetadata.getUploadedFilename() + + ".dat" + ); + } + }); + } + evaluatedManifestCount += 1; + }); + + logger.info(String.format(Locale.ROOT, "Deleting stale files from remote - %s", stalePaths)); + + if (stalePaths.toArray().length == 0) { + logger.trace("No stale Remote Cluster Metadata files found"); + return; + } + + transferService.deleteBlobsAsync( + ThreadPool.Names.REMOTE_PURGE, + getCusterMetadataBasePath(clusterName, clusterUUID), + stalePaths, + new ActionListener<>() { + @Override + public void onResponse(Void unused) { + logger.info( + String.format(Locale.ROOT, "Deleted [%s] stale Remote Cluster Metadata files", stalePaths.size()) + ); + } + + @Override + public void onFailure(Exception e) { + logger.error( + new ParameterizedMessage( + "Exception occurred while deleting stale Remote Cluster Metadata files - {}", + stalePaths + ) + ); + } + } + ); + } + + @Override + public void onFailure(Exception e) { + logger.error( + new ParameterizedMessage( + "Exception occurred while deleting Remote Cluster Metadata for clusterUUIDs {}", + clusterUUID + ) + ); + } + } + ); + } + } } diff --git a/server/src/main/java/org/opensearch/index/recovery/RemoteStoreRestoreService.java b/server/src/main/java/org/opensearch/index/recovery/RemoteStoreRestoreService.java index 5cdff14cae360..70c588f7b99dd 100644 --- a/server/src/main/java/org/opensearch/index/recovery/RemoteStoreRestoreService.java +++ b/server/src/main/java/org/opensearch/index/recovery/RemoteStoreRestoreService.java @@ -147,7 +147,7 @@ public RemoteRestoreResult restore( .forEach(indexMetadata -> { indexMetadataMap.put(indexMetadata.getIndex().getName(), new Tuple<>(true, indexMetadata)); }); - } catch (IOException e) { + } catch (Exception e) { throw new IllegalStateException("Unable to restore remote index metadata", e); } } else { diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index e06207a0c7bff..2c987432199a0 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -688,7 +688,8 @@ protected Node( repositoriesServiceReference::get, settings, clusterService.getClusterSettings(), - threadPool::preciseRelativeTimeInNanos + threadPool::preciseRelativeTimeInNanos, + threadPool ); } else { remoteClusterStateService = null; diff --git a/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java b/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java index 6a2f4cd0ab300..486717faaf864 100644 --- a/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java +++ b/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java @@ -118,6 +118,8 @@ public class GatewayMetaStatePersistedStateTests extends OpenSearchTestCase { private DiscoveryNode localNode; private BigArrays bigArrays; + private MockGatewayMetaState gateway; + @Override public void setUp() throws Exception { bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()); @@ -137,11 +139,13 @@ public void setUp() throws Exception { @Override public void tearDown() throws Exception { nodeEnvironment.close(); + IOUtils.close(gateway); super.tearDown(); } - private CoordinationState.PersistedState newGatewayPersistedState() { - final MockGatewayMetaState gateway = new MockGatewayMetaState(localNode, bigArrays); + private CoordinationState.PersistedState newGatewayPersistedState() throws IOException { + IOUtils.close(gateway); + gateway = new MockGatewayMetaState(localNode, bigArrays); final PersistedStateRegistry persistedStateRegistry = persistedStateRegistry(); gateway.start(settings, nodeEnvironment, xContentRegistry(), persistedStateRegistry); final CoordinationState.PersistedState persistedState = gateway.getPersistedState(); @@ -447,7 +451,10 @@ public void testDataOnlyNodePersistence() throws Exception { cleanup.add(gateway); final TransportService transportService = mock(TransportService.class); TestThreadPool threadPool = new TestThreadPool("testMarkAcceptedConfigAsCommittedOnDataOnlyNode"); - cleanup.add(() -> ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS)); + cleanup.add(() -> { + ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS); + threadPool.shutdown(); + }); when(transportService.getThreadPool()).thenReturn(threadPool); ClusterService clusterService = mock(ClusterService.class); when(clusterService.getClusterSettings()).thenReturn( @@ -474,7 +481,8 @@ public void testDataOnlyNodePersistence() throws Exception { ), settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), - () -> 0L + () -> 0L, + threadPool ); } else { return null; diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java index 8c6ccea940816..14723af29979c 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java @@ -41,6 +41,9 @@ import org.opensearch.repositories.fs.FsRepository; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.VersionUtils; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; +import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -82,6 +85,7 @@ public class RemoteClusterStateServiceTests extends OpenSearchTestCase { private RepositoriesService repositoriesService; private BlobStoreRepository blobStoreRepository; private BlobStore blobStore; + private final ThreadPool threadPool = new TestThreadPool(getClass().getName()); @Before public void setup() { @@ -117,17 +121,24 @@ public void setup() { repositoriesServiceSupplier, settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), - () -> 0L + () -> 0L, + threadPool ); } + @After + public void teardown() throws Exception { + super.tearDown(); + remoteClusterStateService.close(); + } + public void testFailWriteFullMetadataNonClusterManagerNode() throws IOException { final ClusterState clusterState = generateClusterStateWithOneIndex().build(); final ClusterMetadataManifest manifest = remoteClusterStateService.writeFullMetadata(clusterState); Assert.assertThat(manifest, nullValue()); } - public void testFailInitializationWhenRemoteStateDisabled() throws IOException { + public void testFailInitializationWhenRemoteStateDisabled() { final Settings settings = Settings.builder().build(); assertThrows( AssertionError.class, @@ -136,7 +147,8 @@ public void testFailInitializationWhenRemoteStateDisabled() throws IOException { repositoriesServiceSupplier, settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), - () -> 0L + () -> 0L, + threadPool ) ); } diff --git a/test/framework/src/main/java/org/opensearch/gateway/MockGatewayMetaState.java b/test/framework/src/main/java/org/opensearch/gateway/MockGatewayMetaState.java index dea205619ce95..bc48c5c236980 100644 --- a/test/framework/src/main/java/org/opensearch/gateway/MockGatewayMetaState.java +++ b/test/framework/src/main/java/org/opensearch/gateway/MockGatewayMetaState.java @@ -48,6 +48,8 @@ import org.opensearch.gateway.remote.RemoteClusterStateService; import org.opensearch.index.recovery.RemoteStoreRestoreService; import org.opensearch.plugins.MetadataUpgrader; +import org.opensearch.repositories.RepositoriesService; +import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportService; @@ -103,6 +105,12 @@ ClusterState prepareInitialClusterState(TransportService transportService, Clust return ClusterStateUpdaters.setLocalNode(clusterState, localNode); } + @Override + public void close() throws IOException { + super.close(); + threadPool.shutdown(); + } + public void start( Settings settings, NodeEnvironment nodeEnvironment, From 80fb79807831f55833bcb9fd72b9fd930dff97a6 Mon Sep 17 00:00:00 2001 From: bansvaru Date: Thu, 7 Sep 2023 11:15:18 +0530 Subject: [PATCH 02/13] refactoring Signed-off-by: bansvaru --- .../remote/RemoteClusterStateService.java | 183 ++++++++++-------- .../recovery/RemoteStoreRestoreService.java | 1 - 2 files changed, 101 insertions(+), 83 deletions(-) diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index 60cab16dc8ee6..47b924a446e5f 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -114,6 +114,7 @@ public class RemoteClusterStateService implements Closeable { private final LongSupplier relativeTimeNanosSupplier; private final ThreadPool threadpool; private BlobStoreRepository blobStoreRepository; + private BlobStoreTransferService blobStoreTransferService; private volatile TimeValue slowWriteLoggingThreshold; public RemoteClusterStateService( @@ -134,6 +135,13 @@ public RemoteClusterStateService( clusterSettings.addSettingsUpdateConsumer(SLOW_WRITE_LOGGING_THRESHOLD, this::setSlowWriteLoggingThreshold); } + private BlobStoreTransferService getBlobStoreTransferService() { + if (blobStoreTransferService == null) { + blobStoreTransferService = new BlobStoreTransferService(blobStoreRepository.blobStore(), threadpool); + } + return blobStoreTransferService; + } + /** * This method uploads entire cluster state metadata to the configured blob store. For now only index metadata upload is supported. This method should be * invoked by the elected cluster manager when the remote cluster state is enabled. @@ -243,7 +251,7 @@ public ClusterMetadataManifest writeIncrementalMetadata( previousManifest.getPreviousClusterUUID(), false ); - deleteClusterMetadataMarker(clusterState.getClusterName().value(), clusterState.metadata().clusterUUID(), RETAINED_MANIFESTS); + deleteStaleClusterMetadata(clusterState.getClusterName().value(), clusterState.metadata().clusterUUID(), RETAINED_MANIFESTS); final long durationMillis = TimeValue.nsecToMSec(relativeTimeNanosSupplier.getAsLong() - startTimeNanos); if (durationMillis >= slowWriteLoggingThreshold.getMillis()) { logger.warn( @@ -459,10 +467,7 @@ private BlobContainer manifestContainer(String clusterName, String clusterUUID) } private BlobPath getCusterMetadataBasePath(String clusterName, String clusterUUID) { - return blobStoreRepository.basePath() - .add(encodeString(clusterName)) - .add(CLUSTER_STATE_PATH_TOKEN) - .add(clusterUUID); + return blobStoreRepository.basePath().add(encodeString(clusterName)).add(CLUSTER_STATE_PATH_TOKEN).add(clusterUUID); } private BlobContainer clusterUUIDContainer(String clusterName) { @@ -724,89 +729,103 @@ public IndexMetadataTransferException(String errorDesc, Throwable cause) { * @param clusterUUID uuid of cluster state to refer to in remote * @param manifestsToRetain no of latest manifest files to keep in remote */ - public void deleteClusterMetadataMarker(String clusterName, String clusterUUID, int manifestsToRetain) { - BlobStoreTransferService transferService = new BlobStoreTransferService(blobStoreRepository.blobStore(), threadpool); - - synchronized (this) { - transferService.listAllInSortedOrderAsync( - ThreadPool.Names.REMOTE_PURGE, - getManifestFolderPath(clusterName, clusterUUID), - MANIFEST_PATH_TOKEN, - Integer.MAX_VALUE, - new ActionListener<>() { - int evaluatedManifestCount = 1; - - @Override - public void onResponse(List blobMetadata) { - Set filesToKeep = new HashSet<>(); - List stalePaths = new ArrayList<>(); - blobMetadata.forEach(manifestBlobMetadata -> { - ClusterMetadataManifest clusterMetadataManifest = fetchRemoteClusterMetadataManifest( - clusterName, - clusterUUID, - manifestBlobMetadata.name() - ); - if (evaluatedManifestCount <= manifestsToRetain) { - clusterMetadataManifest.getIndices() - .forEach(uploadedIndexMetadata -> filesToKeep.add(uploadedIndexMetadata.getUploadedFilename())); - } else { - stalePaths.add(new BlobPath().add(MANIFEST_PATH_TOKEN).buildAsString() + manifestBlobMetadata.name()); - clusterMetadataManifest.getIndices().forEach(uploadedIndexMetadata -> { - if (filesToKeep.contains(uploadedIndexMetadata.getUploadedFilename()) == false) { - stalePaths.add( - new BlobPath().add(INDEX_PATH_TOKEN).add(uploadedIndexMetadata.getIndexUUID()).buildAsString() - + uploadedIndexMetadata.getUploadedFilename() - + ".dat" - ); - } - }); + public void deleteStaleClusterMetadata(String clusterName, String clusterUUID, int manifestsToRetain) { + threadpool.executor(ThreadPool.Names.REMOTE_PURGE).execute(() -> { + synchronized (this) { + getBlobStoreTransferService().listAllInSortedOrder( + getManifestFolderPath(clusterName, clusterUUID), + "manifest", + Integer.MAX_VALUE, + new ActionListener<>() { + @Override + public void onResponse(List blobMetadata) { + if (blobMetadata.size() > manifestsToRetain) { + deleteClusterMetadata( + clusterName, + clusterUUID, + blobMetadata.subList(0, manifestsToRetain - 1), + blobMetadata.subList(manifestsToRetain, blobMetadata.size() - 1) + ); } - evaluatedManifestCount += 1; - }); - - logger.info(String.format(Locale.ROOT, "Deleting stale files from remote - %s", stalePaths)); - - if (stalePaths.toArray().length == 0) { - logger.trace("No stale Remote Cluster Metadata files found"); - return; } - transferService.deleteBlobsAsync( - ThreadPool.Names.REMOTE_PURGE, - getCusterMetadataBasePath(clusterName, clusterUUID), - stalePaths, - new ActionListener<>() { - @Override - public void onResponse(Void unused) { - logger.info( - String.format(Locale.ROOT, "Deleted [%s] stale Remote Cluster Metadata files", stalePaths.size()) - ); - } - - @Override - public void onFailure(Exception e) { - logger.error( - new ParameterizedMessage( - "Exception occurred while deleting stale Remote Cluster Metadata files - {}", - stalePaths - ) - ); - } - } - ); + @Override + public void onFailure(Exception e) { + logger.error( + new ParameterizedMessage( + "Exception occurred while deleting Remote Cluster Metadata for clusterUUIDs {}", + clusterUUID + ) + ); + } } + ); + } + }); + } - @Override - public void onFailure(Exception e) { - logger.error( - new ParameterizedMessage( - "Exception occurred while deleting Remote Cluster Metadata for clusterUUIDs {}", - clusterUUID - ) - ); - } - } + private void deleteClusterMetadata( + String clusterName, + String clusterUUID, + List activeManifestBlobMetadata, + List staleManifestBlobMetadata + ) { + Set filesToKeep = new HashSet<>(); + List stalePaths = new ArrayList<>(); + activeManifestBlobMetadata.forEach(manifestBlobMetadata -> { + ClusterMetadataManifest clusterMetadataManifest = fetchRemoteClusterMetadataManifest( + clusterName, + clusterUUID, + manifestBlobMetadata.name() + ); + clusterMetadataManifest.getIndices() + .forEach(uploadedIndexMetadata -> filesToKeep.add(uploadedIndexMetadata.getUploadedFilename())); + }); + staleManifestBlobMetadata.forEach(manifestBlobMetadata -> { + ClusterMetadataManifest clusterMetadataManifest = fetchRemoteClusterMetadataManifest( + clusterName, + clusterUUID, + manifestBlobMetadata.name() ); + stalePaths.add(new BlobPath().add("manifest").buildAsString() + manifestBlobMetadata.name()); + clusterMetadataManifest.getIndices().forEach(uploadedIndexMetadata -> { + if (filesToKeep.contains(uploadedIndexMetadata.getUploadedFilename()) == false) { + stalePaths.add( + new BlobPath().add("index").add(uploadedIndexMetadata.getIndexUUID()).buildAsString() + + uploadedIndexMetadata.getUploadedFilename() + + ".dat" + ); + } + }); + }); + + if (stalePaths.toArray().length == 0) { + logger.trace("No stale Remote Cluster Metadata files found"); + return; } + + deleteStalePaths(clusterName, clusterUUID, stalePaths); + } + + private void deleteStalePaths(String clusterName, String clusterUUID, List stalePaths) { + logger.debug(String.format(Locale.ROOT, "Deleting stale files from remote - %s", stalePaths)); + getBlobStoreTransferService().deleteBlobsAsync( + ThreadPool.Names.REMOTE_PURGE, + getCusterMetadataBasePath(clusterName, clusterUUID), + stalePaths, + new ActionListener<>() { + @Override + public void onResponse(Void unused) { + logger.info(String.format(Locale.ROOT, "Deleted [%s] stale Remote Cluster Metadata files", stalePaths.size())); + } + + @Override + public void onFailure(Exception e) { + logger.error( + new ParameterizedMessage("Exception occurred while deleting stale Remote Cluster Metadata files - {}", stalePaths) + ); + } + } + ); } } diff --git a/server/src/main/java/org/opensearch/index/recovery/RemoteStoreRestoreService.java b/server/src/main/java/org/opensearch/index/recovery/RemoteStoreRestoreService.java index 70c588f7b99dd..d05242a3aeaf7 100644 --- a/server/src/main/java/org/opensearch/index/recovery/RemoteStoreRestoreService.java +++ b/server/src/main/java/org/opensearch/index/recovery/RemoteStoreRestoreService.java @@ -36,7 +36,6 @@ import org.opensearch.snapshots.RestoreInfo; import org.opensearch.snapshots.RestoreService; -import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; From 76b82749346af0dbe54bb72aa53d7872d15ff21f Mon Sep 17 00:00:00 2001 From: bansvaru Date: Thu, 7 Sep 2023 18:40:19 +0530 Subject: [PATCH 03/13] add atomic boolean to control multiple delete task executions at a time Signed-off-by: bansvaru --- .../remote/RemoteClusterStateServiceIT.java | 19 +- .../remote/RemoteClusterStateService.java | 169 +++++++++--------- 2 files changed, 95 insertions(+), 93 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateServiceIT.java b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateServiceIT.java index fc8eccef6c801..7a35d9e30d0d3 100644 --- a/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateServiceIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateServiceIT.java @@ -14,6 +14,7 @@ import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.blobstore.BlobStoreRepository; +import org.opensearch.test.junit.annotations.TestIssueLogging; import java.nio.charset.StandardCharsets; import java.util.Base64; @@ -21,7 +22,6 @@ import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING; -import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_REPOSITORY_SETTING; public class RemoteClusterStateServiceIT extends RemoteStoreBaseIntegTestCase { @@ -29,11 +29,7 @@ public class RemoteClusterStateServiceIT extends RemoteStoreBaseIntegTestCase { @Override protected Settings nodeSettings(int nodeOrdinal) { - return Settings.builder() - .put(super.nodeSettings(nodeOrdinal)) - .put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true) - .put(REMOTE_CLUSTER_STATE_REPOSITORY_SETTING.getKey(), REPOSITORY_NAME) - .build(); + return Settings.builder().put(super.nodeSettings(nodeOrdinal)).put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true).build(); } private void prepareCluster(int numClusterManagerNodes, int numDataOnlyNodes, String indices, int replicaCount, int shardCount) { @@ -54,6 +50,7 @@ private Map initialTestSetup(int shardCount, int replicaCount, int return indexStats; } + @TestIssueLogging(value = "_root:INFO", issueUrl = "https://github.com/opensearch-project/OpenSearch/issues/7923") public void testFullClusterRestoreStaleDelete() throws Exception { int shardCount = randomIntBetween(1, 2); int replicaCount = 1; @@ -62,13 +59,13 @@ public void testFullClusterRestoreStaleDelete() throws Exception { initialTestSetup(shardCount, replicaCount, dataNodeCount, clusterManagerNodeCount); setReplicaCount(0); - setReplicaCount(1); + setReplicaCount(2); setReplicaCount(0); setReplicaCount(1); setReplicaCount(0); setReplicaCount(1); setReplicaCount(0); - setReplicaCount(1); + setReplicaCount(2); setReplicaCount(0); RemoteClusterStateService remoteClusterStateService = internalCluster().getClusterManagerNodeInstance( @@ -87,14 +84,14 @@ public void testFullClusterRestoreStaleDelete() throws Exception { .add("cluster-state") .add(getClusterState().metadata().clusterUUID()); - assertEquals(repository.blobStore().blobContainer(baseMetadataPath.add("manifest")).listBlobsByPrefix("manifest").size(), 4); + assertEquals(repository.blobStore().blobContainer(baseMetadataPath.add("manifest")).listBlobsByPrefix("manifest").size(), 10); Map indexMetadataMap = remoteClusterStateService.getLatestIndexMetadata( cluster().getClusterName(), getClusterState().metadata().clusterUUID() ); - assertEquals(indexMetadataMap.values().stream().findFirst().get().getNumberOfReplicas(), 0); - assertEquals(indexMetadataMap.values().stream().findFirst().get().getNumberOfShards(), shardCount); + assertEquals(0, indexMetadataMap.values().stream().findFirst().get().getNumberOfReplicas()); + assertEquals(shardCount, indexMetadataMap.values().stream().findFirst().get().getNumberOfShards()); } private void setReplicaCount(int replicaCount) { diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index 47b924a446e5f..af33e7dd823ab 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -54,6 +54,7 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; import java.util.function.LongSupplier; import java.util.function.Supplier; @@ -73,7 +74,7 @@ public class RemoteClusterStateService implements Closeable { public static final String METADATA_MANIFEST_NAME_FORMAT = "%s"; - public static final int RETAINED_MANIFESTS = 3; + public static final int RETAINED_MANIFESTS = 10; public static final String DELIMITER = "__"; @@ -117,6 +118,8 @@ public class RemoteClusterStateService implements Closeable { private BlobStoreTransferService blobStoreTransferService; private volatile TimeValue slowWriteLoggingThreshold; + private final AtomicBoolean deleteStaleMetadataRunning = new AtomicBoolean(false); + public RemoteClusterStateService( String nodeId, Supplier repositoriesService, @@ -252,6 +255,7 @@ public ClusterMetadataManifest writeIncrementalMetadata( false ); deleteStaleClusterMetadata(clusterState.getClusterName().value(), clusterState.metadata().clusterUUID(), RETAINED_MANIFESTS); + final long durationMillis = TimeValue.nsecToMSec(relativeTimeNanosSupplier.getAsLong() - startTimeNanos); if (durationMillis >= slowWriteLoggingThreshold.getMillis()) { logger.warn( @@ -729,103 +733,104 @@ public IndexMetadataTransferException(String errorDesc, Throwable cause) { * @param clusterUUID uuid of cluster state to refer to in remote * @param manifestsToRetain no of latest manifest files to keep in remote */ - public void deleteStaleClusterMetadata(String clusterName, String clusterUUID, int manifestsToRetain) { - threadpool.executor(ThreadPool.Names.REMOTE_PURGE).execute(() -> { - synchronized (this) { - getBlobStoreTransferService().listAllInSortedOrder( - getManifestFolderPath(clusterName, clusterUUID), - "manifest", - Integer.MAX_VALUE, - new ActionListener<>() { - @Override - public void onResponse(List blobMetadata) { - if (blobMetadata.size() > manifestsToRetain) { - deleteClusterMetadata( + private void deleteStaleClusterMetadata(String clusterName, String clusterUUID, int manifestsToRetain) { + if (deleteStaleMetadataRunning.compareAndSet(false, true) == false) { + logger.info("Delete stale cluster metadata task is already in progress."); + // return; + } + getBlobStoreTransferService().listAllInSortedOrderAsync( + ThreadPool.Names.REMOTE_PURGE, + getManifestFolderPath(clusterName, clusterUUID), + "manifest", + Integer.MAX_VALUE, + new ActionListener<>() { + @Override + public void onResponse(List blobMetadata) { + if (blobMetadata.size() > manifestsToRetain) { + List allManifests = blobMetadata.stream() + .map( + mainfestBlobMetadata -> fetchRemoteClusterMetadataManifest( clusterName, clusterUUID, - blobMetadata.subList(0, manifestsToRetain - 1), - blobMetadata.subList(manifestsToRetain, blobMetadata.size() - 1) - ); - } - } - - @Override - public void onFailure(Exception e) { - logger.error( - new ParameterizedMessage( - "Exception occurred while deleting Remote Cluster Metadata for clusterUUIDs {}", - clusterUUID + mainfestBlobMetadata.name() ) - ); - } + ) + .collect(Collectors.toList()); + deleteClusterMetadata( + clusterName, + clusterUUID, + allManifests.subList(0, manifestsToRetain - 1), + allManifests.subList(manifestsToRetain - 1, allManifests.size()) + ); } - ); + deleteStaleMetadataRunning.set(false); + } + + @Override + public void onFailure(Exception e) { + logger.error( + new ParameterizedMessage( + "Exception occurred while deleting Remote Cluster Metadata for clusterUUIDs {}", + clusterUUID + ) + ); + deleteStaleMetadataRunning.set(false); + } } - }); + ); } private void deleteClusterMetadata( String clusterName, String clusterUUID, - List activeManifestBlobMetadata, - List staleManifestBlobMetadata + List activeManifestBlobMetadata, + List staleManifestBlobMetadata ) { - Set filesToKeep = new HashSet<>(); - List stalePaths = new ArrayList<>(); - activeManifestBlobMetadata.forEach(manifestBlobMetadata -> { - ClusterMetadataManifest clusterMetadataManifest = fetchRemoteClusterMetadataManifest( - clusterName, - clusterUUID, - manifestBlobMetadata.name() - ); - clusterMetadataManifest.getIndices() - .forEach(uploadedIndexMetadata -> filesToKeep.add(uploadedIndexMetadata.getUploadedFilename())); - }); - staleManifestBlobMetadata.forEach(manifestBlobMetadata -> { - ClusterMetadataManifest clusterMetadataManifest = fetchRemoteClusterMetadataManifest( - clusterName, - clusterUUID, - manifestBlobMetadata.name() - ); - stalePaths.add(new BlobPath().add("manifest").buildAsString() + manifestBlobMetadata.name()); - clusterMetadataManifest.getIndices().forEach(uploadedIndexMetadata -> { - if (filesToKeep.contains(uploadedIndexMetadata.getUploadedFilename()) == false) { - stalePaths.add( - new BlobPath().add("index").add(uploadedIndexMetadata.getIndexUUID()).buildAsString() - + uploadedIndexMetadata.getUploadedFilename() - + ".dat" - ); - } + try { + Set filesToKeep = new HashSet<>(); + List stalePaths = new ArrayList<>(); + activeManifestBlobMetadata.forEach(clusterMetadataManifest -> { + filesToKeep.add( + getManifestFileNamePrefix(clusterMetadataManifest.getClusterTerm(), clusterMetadataManifest.getStateVersion()) + ); + clusterMetadataManifest.getIndices() + .forEach(uploadedIndexMetadata -> filesToKeep.add(uploadedIndexMetadata.getUploadedFilename())); + }); + staleManifestBlobMetadata.forEach(clusterMetadataManifest -> { + stalePaths.add( + new BlobPath().add("manifest").buildAsString() + getManifestFileName( + clusterMetadataManifest.getClusterTerm(), + clusterMetadataManifest.getStateVersion() + ) + ); + clusterMetadataManifest.getIndices().forEach(uploadedIndexMetadata -> { + if (filesToKeep.contains(uploadedIndexMetadata.getUploadedFilename()) == false) { + stalePaths.add( + new BlobPath().add("index").add(uploadedIndexMetadata.getIndexUUID()).buildAsString() + + uploadedIndexMetadata.getUploadedFilename() + + ".dat" + ); + } + }); }); - }); - if (stalePaths.toArray().length == 0) { - logger.trace("No stale Remote Cluster Metadata files found"); - return; - } + if (stalePaths.isEmpty()) { + logger.info("No stale Remote Cluster Metadata files found"); + return; + } - deleteStalePaths(clusterName, clusterUUID, stalePaths); + deleteStalePaths(clusterName, clusterUUID, stalePaths); + } catch (IllegalStateException e) { + logger.error("Error while fetching Remote Cluster Metadata manifests", e); + } catch (IOException e) { + logger.error("Error while deleting stale Remote Cluster Metadata files", e); + } catch (Exception e) { + logger.error("Unexpected error while deleting stale Remote Cluster Metadata files", e); + } } - private void deleteStalePaths(String clusterName, String clusterUUID, List stalePaths) { + private void deleteStalePaths(String clusterName, String clusterUUID, List stalePaths) throws IOException { logger.debug(String.format(Locale.ROOT, "Deleting stale files from remote - %s", stalePaths)); - getBlobStoreTransferService().deleteBlobsAsync( - ThreadPool.Names.REMOTE_PURGE, - getCusterMetadataBasePath(clusterName, clusterUUID), - stalePaths, - new ActionListener<>() { - @Override - public void onResponse(Void unused) { - logger.info(String.format(Locale.ROOT, "Deleted [%s] stale Remote Cluster Metadata files", stalePaths.size())); - } - - @Override - public void onFailure(Exception e) { - logger.error( - new ParameterizedMessage("Exception occurred while deleting stale Remote Cluster Metadata files - {}", stalePaths) - ); - } - } - ); + getBlobStoreTransferService().deleteBlobs(getCusterMetadataBasePath(clusterName, clusterUUID), stalePaths); } } From d4919907b0d8385e8cbc87081edb9ce9b25fcf51 Mon Sep 17 00:00:00 2001 From: bansvaru Date: Thu, 7 Sep 2023 19:12:01 +0530 Subject: [PATCH 04/13] add support to delete entire cluster state for a clusterUUID Signed-off-by: bansvaru --- .../remote/RemoteClusterStateService.java | 28 ++++++++++++++++++- 1 file changed, 27 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index af33e7dd823ab..08b9125fda8df 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -727,6 +727,32 @@ public IndexMetadataTransferException(String errorDesc, Throwable cause) { } } + /** + * Purges all remote cluster state against provided cluster UUIDs + * @param clusterName name of the cluster + * @param clusterUUIDs clusteUUIDs for which the remote state needs to be purged + */ + public void deleteStaleClusterMetadata(String clusterName, List clusterUUIDs) { + clusterUUIDs.forEach(clusterUUID -> { + getBlobStoreTransferService().deleteAsync( + ThreadPool.Names.REMOTE_PURGE, + getCusterMetadataBasePath(clusterName, clusterUUID), + new ActionListener<>() { + @Override + public void onResponse(Void unused) { + logger.info("Deleted all remote cluster metadata for cluster UUID - {}", clusterUUID); + } + + @Override + public void onFailure(Exception e) { + logger.error(new ParameterizedMessage("Exception occurred while deleting all remote cluster metadata for cluster UUID {}", clusterUUID), e); + } + } + ); + }); + } + + /** * Deletes older than last {@code versionsToRetain} manifests. Also cleans up unreferenced IndexMetadata associated with older manifests * @param clusterName name of the cluster @@ -736,7 +762,7 @@ public IndexMetadataTransferException(String errorDesc, Throwable cause) { private void deleteStaleClusterMetadata(String clusterName, String clusterUUID, int manifestsToRetain) { if (deleteStaleMetadataRunning.compareAndSet(false, true) == false) { logger.info("Delete stale cluster metadata task is already in progress."); - // return; + return; } getBlobStoreTransferService().listAllInSortedOrderAsync( ThreadPool.Names.REMOTE_PURGE, From 5dc1c8f82eb7d41580adaac1e09cf624d37f3c07 Mon Sep 17 00:00:00 2001 From: bansvaru Date: Thu, 7 Sep 2023 19:13:15 +0530 Subject: [PATCH 05/13] spa Signed-off-by: bansvaru --- .../gateway/remote/RemoteClusterStateService.java | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index 08b9125fda8df..77354ce445f6a 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -745,14 +745,19 @@ public void onResponse(Void unused) { @Override public void onFailure(Exception e) { - logger.error(new ParameterizedMessage("Exception occurred while deleting all remote cluster metadata for cluster UUID {}", clusterUUID), e); + logger.error( + new ParameterizedMessage( + "Exception occurred while deleting all remote cluster metadata for cluster UUID {}", + clusterUUID + ), + e + ); } } ); }); } - /** * Deletes older than last {@code versionsToRetain} manifests. Also cleans up unreferenced IndexMetadata associated with older manifests * @param clusterName name of the cluster @@ -762,7 +767,7 @@ public void onFailure(Exception e) { private void deleteStaleClusterMetadata(String clusterName, String clusterUUID, int manifestsToRetain) { if (deleteStaleMetadataRunning.compareAndSet(false, true) == false) { logger.info("Delete stale cluster metadata task is already in progress."); - return; + return; } getBlobStoreTransferService().listAllInSortedOrderAsync( ThreadPool.Names.REMOTE_PURGE, From b803308f2052b1815a0b2ef4ae7ca48b7313442d Mon Sep 17 00:00:00 2001 From: bansvaru Date: Thu, 7 Sep 2023 19:43:01 +0530 Subject: [PATCH 06/13] fix issues with deletion - manifest filename issues Signed-off-by: bansvaru --- .../remote/RemoteClusterStateServiceIT.java | 2 +- .../remote/RemoteClusterStateService.java | 28 ++++++++++--------- 2 files changed, 16 insertions(+), 14 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateServiceIT.java b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateServiceIT.java index 7a35d9e30d0d3..3e7840600c80a 100644 --- a/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateServiceIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateServiceIT.java @@ -84,7 +84,7 @@ public void testFullClusterRestoreStaleDelete() throws Exception { .add("cluster-state") .add(getClusterState().metadata().clusterUUID()); - assertEquals(repository.blobStore().blobContainer(baseMetadataPath.add("manifest")).listBlobsByPrefix("manifest").size(), 10); + assertEquals(10, repository.blobStore().blobContainer(baseMetadataPath.add("manifest")).listBlobsByPrefix("manifest").size()); Map indexMetadataMap = remoteClusterStateService.getLatestIndexMetadata( cluster().getClusterName(), diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index 77354ce445f6a..34c4f384bb0ad 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -790,8 +790,8 @@ public void onResponse(List blobMetadata) { deleteClusterMetadata( clusterName, clusterUUID, - allManifests.subList(0, manifestsToRetain - 1), - allManifests.subList(manifestsToRetain - 1, allManifests.size()) + blobMetadata.subList(0, manifestsToRetain - 1), + blobMetadata.subList(manifestsToRetain - 1, allManifests.size()) ); } deleteStaleMetadataRunning.set(false); @@ -814,26 +814,28 @@ public void onFailure(Exception e) { private void deleteClusterMetadata( String clusterName, String clusterUUID, - List activeManifestBlobMetadata, - List staleManifestBlobMetadata + List activeManifestBlobMetadata, + List staleManifestBlobMetadata ) { try { Set filesToKeep = new HashSet<>(); List stalePaths = new ArrayList<>(); - activeManifestBlobMetadata.forEach(clusterMetadataManifest -> { - filesToKeep.add( - getManifestFileNamePrefix(clusterMetadataManifest.getClusterTerm(), clusterMetadataManifest.getStateVersion()) + activeManifestBlobMetadata.forEach(blobMetadata -> { + ClusterMetadataManifest clusterMetadataManifest = fetchRemoteClusterMetadataManifest( + clusterName, + clusterUUID, + blobMetadata.name() ); clusterMetadataManifest.getIndices() .forEach(uploadedIndexMetadata -> filesToKeep.add(uploadedIndexMetadata.getUploadedFilename())); }); - staleManifestBlobMetadata.forEach(clusterMetadataManifest -> { - stalePaths.add( - new BlobPath().add("manifest").buildAsString() + getManifestFileName( - clusterMetadataManifest.getClusterTerm(), - clusterMetadataManifest.getStateVersion() - ) + staleManifestBlobMetadata.forEach(blobMetadata -> { + ClusterMetadataManifest clusterMetadataManifest = fetchRemoteClusterMetadataManifest( + clusterName, + clusterUUID, + blobMetadata.name() ); + stalePaths.add(new BlobPath().add("manifest").buildAsString() + blobMetadata.name()); clusterMetadataManifest.getIndices().forEach(uploadedIndexMetadata -> { if (filesToKeep.contains(uploadedIndexMetadata.getUploadedFilename()) == false) { stalePaths.add( From 8e654732ce69d1935d5ecd7f98c6beabb93357e7 Mon Sep 17 00:00:00 2001 From: bansvaru Date: Thu, 7 Sep 2023 20:13:02 +0530 Subject: [PATCH 07/13] minor fix Signed-off-by: bansvaru --- .../gateway/remote/RemoteClusterStateServiceIT.java | 2 ++ .../gateway/remote/RemoteClusterStateService.java | 11 +---------- 2 files changed, 3 insertions(+), 10 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateServiceIT.java b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateServiceIT.java index 3e7840600c80a..a55bbf95084cb 100644 --- a/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateServiceIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateServiceIT.java @@ -14,6 +14,7 @@ import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.blobstore.BlobStoreRepository; +import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.junit.annotations.TestIssueLogging; import java.nio.charset.StandardCharsets; @@ -23,6 +24,7 @@ import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING; +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) public class RemoteClusterStateServiceIT extends RemoteStoreBaseIntegTestCase { private static String INDEX_NAME = "test-index"; diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index 34c4f384bb0ad..cbe8064c0e08b 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -778,20 +778,11 @@ private void deleteStaleClusterMetadata(String clusterName, String clusterUUID, @Override public void onResponse(List blobMetadata) { if (blobMetadata.size() > manifestsToRetain) { - List allManifests = blobMetadata.stream() - .map( - mainfestBlobMetadata -> fetchRemoteClusterMetadataManifest( - clusterName, - clusterUUID, - mainfestBlobMetadata.name() - ) - ) - .collect(Collectors.toList()); deleteClusterMetadata( clusterName, clusterUUID, blobMetadata.subList(0, manifestsToRetain - 1), - blobMetadata.subList(manifestsToRetain - 1, allManifests.size()) + blobMetadata.subList(manifestsToRetain - 1, blobMetadata.size()) ); } deleteStaleMetadataRunning.set(false); From 60c56227f6ff11bf14cdef5a716b03d0a05e583a Mon Sep 17 00:00:00 2001 From: bansvaru Date: Thu, 7 Sep 2023 20:23:45 +0530 Subject: [PATCH 08/13] add try/catch Signed-off-by: bansvaru --- .../remote/RemoteClusterStateService.java | 60 ++++++++++--------- 1 file changed, 32 insertions(+), 28 deletions(-) diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index cbe8064c0e08b..6f6da59ff5065 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -769,37 +769,41 @@ private void deleteStaleClusterMetadata(String clusterName, String clusterUUID, logger.info("Delete stale cluster metadata task is already in progress."); return; } - getBlobStoreTransferService().listAllInSortedOrderAsync( - ThreadPool.Names.REMOTE_PURGE, - getManifestFolderPath(clusterName, clusterUUID), - "manifest", - Integer.MAX_VALUE, - new ActionListener<>() { - @Override - public void onResponse(List blobMetadata) { - if (blobMetadata.size() > manifestsToRetain) { - deleteClusterMetadata( - clusterName, - clusterUUID, - blobMetadata.subList(0, manifestsToRetain - 1), - blobMetadata.subList(manifestsToRetain - 1, blobMetadata.size()) - ); + try { + getBlobStoreTransferService().listAllInSortedOrderAsync( + ThreadPool.Names.REMOTE_PURGE, + getManifestFolderPath(clusterName, clusterUUID), + "manifest", + Integer.MAX_VALUE, + new ActionListener<>() { + @Override + public void onResponse(List blobMetadata) { + if (blobMetadata.size() > manifestsToRetain) { + deleteClusterMetadata( + clusterName, + clusterUUID, + blobMetadata.subList(0, manifestsToRetain - 1), + blobMetadata.subList(manifestsToRetain - 1, blobMetadata.size()) + ); + } + deleteStaleMetadataRunning.set(false); } - deleteStaleMetadataRunning.set(false); - } - @Override - public void onFailure(Exception e) { - logger.error( - new ParameterizedMessage( - "Exception occurred while deleting Remote Cluster Metadata for clusterUUIDs {}", - clusterUUID - ) - ); - deleteStaleMetadataRunning.set(false); + @Override + public void onFailure(Exception e) { + logger.error( + new ParameterizedMessage( + "Exception occurred while deleting Remote Cluster Metadata for clusterUUIDs {}", + clusterUUID + ) + ); + deleteStaleMetadataRunning.set(false); + } } - } - ); + ); + } finally { + deleteStaleMetadataRunning.set(false); + } } private void deleteClusterMetadata( From 0ad1bb4c9db2e90ea5040e354236c749aa0b6631 Mon Sep 17 00:00:00 2001 From: bansvaru Date: Thu, 7 Sep 2023 20:50:40 +0530 Subject: [PATCH 09/13] address PR comments Signed-off-by: bansvaru --- .../gateway/remote/RemoteClusterStateService.java | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index 6f6da59ff5065..08df2b55177fc 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -814,7 +814,8 @@ private void deleteClusterMetadata( ) { try { Set filesToKeep = new HashSet<>(); - List stalePaths = new ArrayList<>(); + Set staleManifestPaths = new HashSet<>(); + Set staleIndexMetadataPaths = new HashSet<>(); activeManifestBlobMetadata.forEach(blobMetadata -> { ClusterMetadataManifest clusterMetadataManifest = fetchRemoteClusterMetadataManifest( clusterName, @@ -830,10 +831,10 @@ private void deleteClusterMetadata( clusterUUID, blobMetadata.name() ); - stalePaths.add(new BlobPath().add("manifest").buildAsString() + blobMetadata.name()); + staleManifestPaths.add(new BlobPath().add("manifest").buildAsString() + blobMetadata.name()); clusterMetadataManifest.getIndices().forEach(uploadedIndexMetadata -> { if (filesToKeep.contains(uploadedIndexMetadata.getUploadedFilename()) == false) { - stalePaths.add( + staleIndexMetadataPaths.add( new BlobPath().add("index").add(uploadedIndexMetadata.getIndexUUID()).buildAsString() + uploadedIndexMetadata.getUploadedFilename() + ".dat" @@ -842,12 +843,13 @@ private void deleteClusterMetadata( }); }); - if (stalePaths.isEmpty()) { + if (staleManifestPaths.isEmpty()) { logger.info("No stale Remote Cluster Metadata files found"); return; } - deleteStalePaths(clusterName, clusterUUID, stalePaths); + deleteStalePaths(clusterName, clusterUUID, new ArrayList<>(staleIndexMetadataPaths)); + deleteStalePaths(clusterName, clusterUUID, new ArrayList<>(staleManifestPaths)); } catch (IllegalStateException e) { logger.error("Error while fetching Remote Cluster Metadata manifests", e); } catch (IOException e) { From de75b52a861d9ee13a4707ed6b4f806d087401ba Mon Sep 17 00:00:00 2001 From: bansvaru Date: Thu, 7 Sep 2023 20:58:34 +0530 Subject: [PATCH 10/13] minor refactor Signed-off-by: bansvaru --- .../opensearch/gateway/remote/RemoteClusterStateService.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index 08df2b55177fc..b2ab0dcd59aaf 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -831,11 +831,11 @@ private void deleteClusterMetadata( clusterUUID, blobMetadata.name() ); - staleManifestPaths.add(new BlobPath().add("manifest").buildAsString() + blobMetadata.name()); + staleManifestPaths.add(new BlobPath().add(MANIFEST_PATH_TOKEN).buildAsString() + blobMetadata.name()); clusterMetadataManifest.getIndices().forEach(uploadedIndexMetadata -> { if (filesToKeep.contains(uploadedIndexMetadata.getUploadedFilename()) == false) { staleIndexMetadataPaths.add( - new BlobPath().add("index").add(uploadedIndexMetadata.getIndexUUID()).buildAsString() + new BlobPath().add(INDEX_PATH_TOKEN).add(uploadedIndexMetadata.getIndexUUID()).buildAsString() + uploadedIndexMetadata.getUploadedFilename() + ".dat" ); From ad9b1b32bb63c94a81a605fd4c04703f77350875 Mon Sep 17 00:00:00 2001 From: bansvaru Date: Thu, 7 Sep 2023 21:58:56 +0530 Subject: [PATCH 11/13] fix thread leak in tests Signed-off-by: bansvaru --- .../gateway/remote/RemoteClusterStateServiceTests.java | 1 + .../cluster/coordination/AbstractCoordinatorTestCase.java | 8 +++++--- .../java/org/opensearch/gateway/MockGatewayMetaState.java | 1 - 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java index 14723af29979c..fe8dc0b564cda 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java @@ -130,6 +130,7 @@ public void setup() { public void teardown() throws Exception { super.tearDown(); remoteClusterStateService.close(); + threadPool.shutdown(); } public void testFailWriteFullMetadataNonClusterManagerNode() throws IOException { diff --git a/test/framework/src/main/java/org/opensearch/cluster/coordination/AbstractCoordinatorTestCase.java b/test/framework/src/main/java/org/opensearch/cluster/coordination/AbstractCoordinatorTestCase.java index d49d3d290b8a8..1f56615959618 100644 --- a/test/framework/src/main/java/org/opensearch/cluster/coordination/AbstractCoordinatorTestCase.java +++ b/test/framework/src/main/java/org/opensearch/cluster/coordination/AbstractCoordinatorTestCase.java @@ -844,14 +844,16 @@ class MockPersistedState implements CoordinationState.PersistedState { private final CoordinationState.PersistedState delegate; private final NodeEnvironment nodeEnvironment; + private MockGatewayMetaState mockGatewayMetaState; + MockPersistedState(DiscoveryNode localNode) { try { if (rarely()) { nodeEnvironment = newNodeEnvironment(); nodeEnvironments.add(nodeEnvironment); - final MockGatewayMetaState gatewayMetaState = new MockGatewayMetaState(localNode, bigArrays); - gatewayMetaState.start(Settings.EMPTY, nodeEnvironment, xContentRegistry(), persistedStateRegistry()); - delegate = gatewayMetaState.getPersistedState(); + mockGatewayMetaState = new MockGatewayMetaState(localNode, bigArrays); + mockGatewayMetaState.start(Settings.EMPTY, nodeEnvironment, xContentRegistry(), persistedStateRegistry()); + delegate = mockGatewayMetaState.getPersistedState(); } else { nodeEnvironment = null; delegate = new InMemoryPersistedState( diff --git a/test/framework/src/main/java/org/opensearch/gateway/MockGatewayMetaState.java b/test/framework/src/main/java/org/opensearch/gateway/MockGatewayMetaState.java index bc48c5c236980..704c32ab2b5bc 100644 --- a/test/framework/src/main/java/org/opensearch/gateway/MockGatewayMetaState.java +++ b/test/framework/src/main/java/org/opensearch/gateway/MockGatewayMetaState.java @@ -108,7 +108,6 @@ ClusterState prepareInitialClusterState(TransportService transportService, Clust @Override public void close() throws IOException { super.close(); - threadPool.shutdown(); } public void start( From 59d8daf516f57314ff9310339fbe62eea7e3043b Mon Sep 17 00:00:00 2001 From: bansvaru Date: Thu, 7 Sep 2023 22:43:17 +0530 Subject: [PATCH 12/13] fix failing ITs after rebase Signed-off-by: bansvaru --- .../opensearch/gateway/remote/RemoteClusterStateServiceIT.java | 2 -- .../opensearch/gateway/remote/RemoteClusterStateService.java | 2 +- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateServiceIT.java b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateServiceIT.java index a55bbf95084cb..6fcc89cfe9e9a 100644 --- a/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateServiceIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateServiceIT.java @@ -15,7 +15,6 @@ import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.test.OpenSearchIntegTestCase; -import org.opensearch.test.junit.annotations.TestIssueLogging; import java.nio.charset.StandardCharsets; import java.util.Base64; @@ -52,7 +51,6 @@ private Map initialTestSetup(int shardCount, int replicaCount, int return indexStats; } - @TestIssueLogging(value = "_root:INFO", issueUrl = "https://github.com/opensearch-project/OpenSearch/issues/7923") public void testFullClusterRestoreStaleDelete() throws Exception { int shardCount = randomIntBetween(1, 2); int replicaCount = 1; diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index b2ab0dcd59aaf..cf750bb11f3f8 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -607,7 +607,7 @@ private Map getLatestManifestForAllClusterUUIDs for (String clusterUUID : clusterUUIDs) { try { Optional manifest = getLatestClusterMetadataManifest(clusterName, clusterUUID); - manifestsByClusterUUID.put(clusterUUID, manifest.get()); + manifest.ifPresent(clusterMetadataManifest -> manifestsByClusterUUID.put(clusterUUID, clusterMetadataManifest)); } catch (Exception e) { throw new IllegalStateException( String.format(Locale.ROOT, "Exception in fetching manifest for clusterUUID: %s", clusterUUID) From 72fffd26e6671b943f85f08da72a847dc1b57d15 Mon Sep 17 00:00:00 2001 From: bansvaru Date: Thu, 7 Sep 2023 23:01:49 +0530 Subject: [PATCH 13/13] remove unused imports Signed-off-by: bansvaru --- .../main/java/org/opensearch/gateway/MockGatewayMetaState.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/test/framework/src/main/java/org/opensearch/gateway/MockGatewayMetaState.java b/test/framework/src/main/java/org/opensearch/gateway/MockGatewayMetaState.java index 704c32ab2b5bc..d77596cf5cdd1 100644 --- a/test/framework/src/main/java/org/opensearch/gateway/MockGatewayMetaState.java +++ b/test/framework/src/main/java/org/opensearch/gateway/MockGatewayMetaState.java @@ -48,8 +48,6 @@ import org.opensearch.gateway.remote.RemoteClusterStateService; import org.opensearch.index.recovery.RemoteStoreRestoreService; import org.opensearch.plugins.MetadataUpgrader; -import org.opensearch.repositories.RepositoriesService; -import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportService;