From c8e2947cbcab8dd8f1c69763d084967e0ea7b239 Mon Sep 17 00:00:00 2001 From: Shailendra Singh Date: Sun, 9 Jun 2024 10:09:35 +0530 Subject: [PATCH] Delete stale index routing table files Signed-off-by: Shailendra Singh --- .../InternalRemoteRoutingTableService.java | 11 +++ .../remote/NoopRemoteRoutingTableService.java | 5 ++ .../remote/RemoteRoutingTableService.java | 1 + .../RemoteClusterStateCleanupManager.java | 29 ++++++- .../remote/RemoteClusterStateService.java | 2 +- .../RemoteRoutingTableServiceTests.java | 42 +++++++++++ ...RemoteClusterStateCleanupManagerTests.java | 75 ++++++++++++++++++- 7 files changed, 159 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/routing/remote/InternalRemoteRoutingTableService.java b/server/src/main/java/org/opensearch/cluster/routing/remote/InternalRemoteRoutingTableService.java index dcf106914c571..8e0edf8e09fc3 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/remote/InternalRemoteRoutingTableService.java +++ b/server/src/main/java/org/opensearch/cluster/routing/remote/InternalRemoteRoutingTableService.java @@ -300,4 +300,15 @@ protected void doStart() { @Override protected void doStop() {} + @Override + public void deleteStaleIndexRoutingPaths(List stalePaths) throws IOException { + try { + logger.debug(() -> "Deleting stale index routing files from remote - " + stalePaths); + blobStoreRepository.blobStore().blobContainer(BlobPath.cleanPath()).deleteBlobsIgnoringIfNotExists(stalePaths); + } catch (IOException e) { + logger.error("Error while deleting stale index routing files", e); + throw e; + } + } + } diff --git a/server/src/main/java/org/opensearch/cluster/routing/remote/NoopRemoteRoutingTableService.java b/server/src/main/java/org/opensearch/cluster/routing/remote/NoopRemoteRoutingTableService.java index b52c00f1f8576..95688c55c6519 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/remote/NoopRemoteRoutingTableService.java +++ b/server/src/main/java/org/opensearch/cluster/routing/remote/NoopRemoteRoutingTableService.java @@ -74,4 +74,9 @@ protected void doStop() { protected void doClose() throws IOException { // noop } + + @Override + public void deleteStaleIndexRoutingPaths(List stalePaths) throws IOException { + // noop + } } diff --git a/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java b/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java index dbf01904116ed..c2d4f74658643 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java +++ b/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java @@ -60,5 +60,6 @@ List getAllUploadedIndicesRouting List indicesRoutingUploaded, List indicesRoutingToDelete ); + public void deleteStaleIndexRoutingPaths(List stalePaths) throws IOException; } diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManager.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManager.java index 2fca239b10efd..323f49a8866b4 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManager.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManager.java @@ -13,6 +13,8 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Strings; import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.routing.remote.RemoteRoutingTableService; +import org.opensearch.cluster.routing.remote.RemoteRoutingTableServiceFactory; import org.opensearch.cluster.service.ClusterApplierService; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.blobstore.BlobMetadata; @@ -31,6 +33,7 @@ import java.util.HashSet; import java.util.List; import java.util.Locale; +import java.util.Optional; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; @@ -74,8 +77,13 @@ public class RemoteClusterStateCleanupManager implements Closeable { private long lastCleanupAttemptStateVersion; private final ThreadPool threadpool; private final ClusterApplierService clusterApplierService; + private final RemoteRoutingTableService remoteRoutingTableService; - public RemoteClusterStateCleanupManager(RemoteClusterStateService remoteClusterStateService, ClusterService clusterService) { + public RemoteClusterStateCleanupManager( + RemoteClusterStateService remoteClusterStateService, + ClusterService clusterService, + RemoteRoutingTableService remoteRoutingTableService + ) { this.remoteClusterStateService = remoteClusterStateService; this.remoteStateStats = remoteClusterStateService.getStats(); ClusterSettings clusterSettings = clusterService.getClusterSettings(); @@ -85,6 +93,7 @@ public RemoteClusterStateCleanupManager(RemoteClusterStateService remoteClusterS // initialize with 0, a cleanup will be done when this node is elected master node and version is incremented more than threshold this.lastCleanupAttemptStateVersion = 0; clusterSettings.addSettingsUpdateConsumer(REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING, this::updateCleanupInterval); + this.remoteRoutingTableService = remoteRoutingTableService; } void start() { @@ -171,6 +180,7 @@ void deleteClusterMetadata( Set staleManifestPaths = new HashSet<>(); Set staleIndexMetadataPaths = new HashSet<>(); Set staleGlobalMetadataPaths = new HashSet<>(); + Set staleIndexRoutingPaths = new HashSet<>(); activeManifestBlobMetadata.forEach(blobMetadata -> { ClusterMetadataManifest clusterMetadataManifest = remoteClusterStateService.fetchRemoteClusterMetadataManifest( clusterName, @@ -189,6 +199,10 @@ void deleteClusterMetadata( .values() .forEach(attribute -> filesToKeep.add(attribute.getUploadedFilename())); } + if (clusterMetadataManifest.getIndicesRouting() != null) { + clusterMetadataManifest.getIndicesRouting() + .forEach(uploadedIndicesRouting -> filesToKeep.add(uploadedIndicesRouting.getUploadedFilename())); + } }); staleManifestBlobMetadata.forEach(blobMetadata -> { ClusterMetadataManifest clusterMetadataManifest = remoteClusterStateService.fetchRemoteClusterMetadataManifest( @@ -221,6 +235,14 @@ void deleteClusterMetadata( attribute -> addStaleGlobalMetadataPath(attribute.getUploadedFilename(), filesToKeep, staleGlobalMetadataPaths) ); } + if (clusterMetadataManifest.getIndicesRouting() != null) { + clusterMetadataManifest.getIndicesRouting().forEach(uploadedIndicesRouting -> { + if (!filesToKeep.contains(uploadedIndicesRouting.getUploadedFilename())) { + staleIndexRoutingPaths.add(uploadedIndicesRouting.getUploadedFilename()); + logger.debug(() -> "Indices routing paths in stale manifest: " + uploadedIndicesRouting.getUploadedFilename()); + } + }); + } clusterMetadataManifest.getIndices().forEach(uploadedIndexMetadata -> { if (filesToKeep.contains(uploadedIndexMetadata.getUploadedFilename()) == false) { @@ -240,6 +262,11 @@ void deleteClusterMetadata( deleteStalePaths(clusterName, clusterUUID, new ArrayList<>(staleGlobalMetadataPaths)); deleteStalePaths(clusterName, clusterUUID, new ArrayList<>(staleIndexMetadataPaths)); deleteStalePaths(clusterName, clusterUUID, new ArrayList<>(staleManifestPaths)); + try { + remoteRoutingTableService.deleteStaleIndexRoutingPaths(new ArrayList<>(staleIndexRoutingPaths)); + } catch (IOException e){ + throw new RuntimeException(e); + } } catch (IllegalStateException e) { logger.error("Error while fetching Remote Cluster Metadata manifests", e); } catch (IOException e) { diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index 1617efd6200c3..02e7c8e2543a6 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -264,9 +264,9 @@ public RemoteClusterStateService( clusterSettings.addSettingsUpdateConsumer(GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING, this::setGlobalMetadataUploadTimeout); clusterSettings.addSettingsUpdateConsumer(METADATA_MANIFEST_UPLOAD_TIMEOUT_SETTING, this::setMetadataManifestUploadTimeout); this.remoteStateStats = new RemotePersistenceStats(); - this.remoteClusterStateCleanupManager = new RemoteClusterStateCleanupManager(this, clusterService); this.indexMetadataUploadListeners = indexMetadataUploadListeners; this.remoteRoutingTableService = RemoteRoutingTableServiceFactory.getService(repositoriesService, settings, clusterSettings); + this.remoteClusterStateCleanupManager = new RemoteClusterStateCleanupManager(this, clusterService, remoteRoutingTableService); } /** diff --git a/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java b/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java index 8fd410e774332..915b486f05f7c 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java @@ -25,6 +25,9 @@ import org.opensearch.common.blobstore.BlobStore; import org.opensearch.common.blobstore.stream.write.WriteContext; import org.opensearch.common.blobstore.stream.write.WritePriority; +import org.opensearch.common.blobstore.BlobContainer; +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.blobstore.BlobStore; import org.opensearch.common.compress.DeflateCompressor; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; @@ -56,6 +59,12 @@ import static org.opensearch.cluster.routing.remote.InternalRemoteRoutingTableService.INDEX_ROUTING_FILE_PREFIX; import static org.opensearch.cluster.routing.remote.InternalRemoteRoutingTableService.INDEX_ROUTING_PATH_TOKEN; +import java.util.Arrays; +import java.util.List; +import java.util.function.Supplier; + +import org.mockito.Mockito; + import static org.opensearch.common.util.FeatureFlags.REMOTE_PUBLICATION_EXPERIMENTAL; import static org.opensearch.gateway.remote.RemoteClusterStateService.DELIMITER; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY; @@ -68,12 +77,19 @@ import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class RemoteRoutingTableServiceTests extends OpenSearchTestCase { private InternalRemoteRoutingTableService remoteRoutingTableService; + + //private RemoteRoutingTableService remoteRoutingTableService; + + private ClusterSettings clusterSettings; private Supplier repositoriesServiceSupplier; private RepositoriesService repositoriesService; private BlobStoreRepository blobStoreRepository; @@ -92,6 +108,8 @@ public void setup() { .put(FsRepository.REPOSITORIES_COMPRESS_SETTING.getKey(), false) .build(); + clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + blobStoreRepository = mock(BlobStoreRepository.class); when(blobStoreRepository.getCompressor()).thenReturn(new DeflateCompressor()); blobStore = mock(BlobStore.class); @@ -99,6 +117,7 @@ public void setup() { when(repositoriesService.repository("routing_repository")).thenReturn(blobStoreRepository); when(blobStoreRepository.blobStore()).thenReturn(blobStore); + when(blobStoreRepository.blobStore()).thenReturn(blobStore); Settings nodeSettings = Settings.builder().put(REMOTE_PUBLICATION_EXPERIMENTAL, "true").build(); FeatureFlags.initializeFeatureFlags(nodeSettings); @@ -552,4 +571,27 @@ private BlobPath getPath() { RemoteStoreEnums.PathHashAlgorithm.FNV_1A_BASE64 ); } + public void testDeleteStaleIndexRoutingPaths() throws IOException { + doNothing().when(blobContainer).deleteBlobsIgnoringIfNotExists(any()); + when(blobStore.blobContainer(any())).thenReturn(blobContainer); + List stalePaths = Arrays.asList("path1", "path2"); + remoteRoutingTableService.doStart(); + remoteRoutingTableService.deleteStaleIndexRoutingPaths(stalePaths); + verify(blobContainer).deleteBlobsIgnoringIfNotExists(stalePaths); + } + + public void testDeleteStaleIndexRoutingPathsThrowsIOException() throws IOException { + when(blobStore.blobContainer(any())).thenReturn(blobContainer); + List stalePaths = Arrays.asList("path1", "path2"); + // Simulate an IOException + doThrow(new IOException("test exception")).when(blobContainer).deleteBlobsIgnoringIfNotExists(Mockito.anyList()); + + remoteRoutingTableService.doStart(); + IOException thrown = assertThrows(IOException.class, () -> { + remoteRoutingTableService.deleteStaleIndexRoutingPaths(stalePaths); + }); + assertEquals("test exception", thrown.getMessage()); + verify(blobContainer).deleteBlobsIgnoringIfNotExists(stalePaths); + } + } diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManagerTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManagerTests.java index 24fd1b164a4ff..ab906b27f8849 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManagerTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManagerTests.java @@ -12,6 +12,10 @@ import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.routing.remote.InternalRemoteRoutingTableService; +import org.opensearch.cluster.routing.remote.NoopRemoteRoutingTableService; +import org.opensearch.cluster.routing.remote.RemoteRoutingTableService; +import org.opensearch.cluster.routing.remote.RemoteRoutingTableServiceFactory; import org.opensearch.cluster.service.ClusterApplierService; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.blobstore.BlobContainer; @@ -47,6 +51,7 @@ import static org.opensearch.gateway.remote.ClusterMetadataManifest.CODEC_V1; import static org.opensearch.gateway.remote.ClusterMetadataManifest.CODEC_V2; +import static org.opensearch.gateway.remote.ClusterMetadataManifest.CODEC_V3; import static org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata; import static org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadataAttribute; import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.AsyncStaleFileDeletion; @@ -69,6 +74,7 @@ import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT; +import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.eq; @@ -76,6 +82,7 @@ import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -92,6 +99,8 @@ public class RemoteClusterStateCleanupManagerTests extends OpenSearchTestCase { private ClusterState clusterState; private Metadata metadata; private RemoteClusterStateService remoteClusterStateService; + private RemoteRoutingTableService remoteRoutingTableService; + private final ThreadPool threadPool = new TestThreadPool(getClass().getName()); @Before @@ -113,6 +122,7 @@ public void setup() { Settings settings = Settings.builder() .put("node.attr." + REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY, "remote_store_repository") + .put("node.attr." + REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY, "routing_repository") .put(stateRepoTypeAttributeKey, FsRepository.TYPE) .put(stateRepoSettingsAttributeKeyPrefix + "location", "randomRepoPath") .put(RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true) @@ -139,7 +149,13 @@ public void setup() { when(remoteClusterStateService.getStats()).thenReturn(new RemotePersistenceStats()); when(remoteClusterStateService.getThreadpool()).thenReturn(threadPool); when(remoteClusterStateService.getBlobStore()).thenReturn(blobStore); - remoteClusterStateCleanupManager = new RemoteClusterStateCleanupManager(remoteClusterStateService, clusterService); + remoteRoutingTableService = mock(InternalRemoteRoutingTableService.class); + when(remoteClusterStateService.getBlobStore()).thenReturn(blobStore); + remoteClusterStateCleanupManager = new RemoteClusterStateCleanupManager( + remoteClusterStateService, + clusterService, + remoteRoutingTableService + ); } @After @@ -155,11 +171,13 @@ public void testDeleteClusterMetadata() throws IOException { List inactiveBlobs = Arrays.asList( new PlainBlobMetadata("manifest1.dat", 1L), new PlainBlobMetadata("manifest2.dat", 1L), - new PlainBlobMetadata("manifest3.dat", 1L) + new PlainBlobMetadata("manifest3.dat", 1L), + new PlainBlobMetadata("manifest6.dat", 1L) ); List activeBlobs = Arrays.asList( new PlainBlobMetadata("manifest4.dat", 1L), - new PlainBlobMetadata("manifest5.dat", 1L) + new PlainBlobMetadata("manifest5.dat", 1L), + new PlainBlobMetadata("manifest7.dat", 1L) ); UploadedIndexMetadata index1Metadata = new UploadedIndexMetadata("index1", "indexUUID1", "index_metadata1"); UploadedIndexMetadata index2Metadata = new UploadedIndexMetadata("index2", "indexUUID2", "index_metadata2"); @@ -199,6 +217,45 @@ public void testDeleteClusterMetadata() throws IOException { .settingMetadata(settingMetadataUpdated) .build(); + UploadedIndexMetadata index3Metadata = new UploadedIndexMetadata("index3", "indexUUID3", "index_metadata3"); + UploadedIndexMetadata index4Metadata = new UploadedIndexMetadata("index4", "indexUUID4", "index_metadata4"); + List indicesRouting1 = List.of(index3Metadata, index4Metadata); + List indicesRouting2 = List.of(index4Metadata); + ClusterMetadataManifest manifest6 = ClusterMetadataManifest.builder() + .indices(List.of(index1Metadata)) + .coordinationMetadata(coordinationMetadataUpdated) + .templatesMetadata(templateMetadataUpdated) + .settingMetadata(settingMetadataUpdated) + .clusterTerm(1L) + .stateVersion(1L) + .codecVersion(CODEC_V3) + .stateUUID(randomAlphaOfLength(10)) + .clusterUUID(clusterUUID) + .nodeId("nodeA") + .opensearchVersion(VersionUtils.randomOpenSearchVersion(random())) + .previousClusterUUID(ClusterState.UNKNOWN_UUID) + .committed(true) + .routingTableVersion(0L) + .indicesRouting(indicesRouting1) + .build(); + ClusterMetadataManifest manifest7 = ClusterMetadataManifest.builder() + .indices(List.of(index2Metadata)) + .coordinationMetadata(coordinationMetadataUpdated) + .templatesMetadata(templateMetadataUpdated) + .settingMetadata(settingMetadataUpdated) + .clusterTerm(1L) + .stateVersion(1L) + .codecVersion(CODEC_V3) + .stateUUID(randomAlphaOfLength(10)) + .clusterUUID(clusterUUID) + .nodeId("nodeA") + .opensearchVersion(VersionUtils.randomOpenSearchVersion(random())) + .previousClusterUUID(ClusterState.UNKNOWN_UUID) + .committed(true) + .routingTableVersion(0L) + .indicesRouting(indicesRouting2) + .build(); + // active manifest have reference to index1Updated, index2, settingsUpdated, coordinationUpdated, templates, templatesUpdated ClusterMetadataManifest manifest4 = ClusterMetadataManifest.builder(manifest3) .coordinationMetadata(coordinationMetadataUpdated) @@ -208,10 +265,13 @@ public void testDeleteClusterMetadata() throws IOException { when(remoteClusterStateService.fetchRemoteClusterMetadataManifest(eq(clusterName), eq(clusterUUID), any())).thenReturn( manifest4, manifest5, + manifest7, manifest1, manifest2, - manifest3 + manifest3, + manifest6 ); + BlobContainer container = mock(BlobContainer.class); when(blobStore.blobContainer(any())).thenReturn(container); doNothing().when(container).deleteBlobsIgnoringIfNotExists(any()); @@ -231,9 +291,16 @@ public void testDeleteClusterMetadata() throws IOException { + ".dat" ) ); + + verify(remoteRoutingTableService).deleteStaleIndexRoutingPaths(List.of(index3Metadata.getUploadedFilename())); Set staleManifest = new HashSet<>(); inactiveBlobs.forEach(blob -> staleManifest.add(new BlobPath().add(MANIFEST_PATH_TOKEN).buildAsString() + blob.name())); verify(container).deleteBlobsIgnoringIfNotExists(new ArrayList<>(staleManifest)); + + // Test case when remoteRoutingTableService is not enabled + remoteRoutingTableService = mock(NoopRemoteRoutingTableService.class); + remoteClusterStateCleanupManager.deleteClusterMetadata(clusterName, clusterUUID, activeBlobs, inactiveBlobs); + verify(remoteRoutingTableService, never()).deleteStaleIndexRoutingPaths(any()); } public void testDeleteStaleClusterUUIDs() throws IOException {