From 5d5842b1775a42f9c710d63c7a2aff7599eaf4b0 Mon Sep 17 00:00:00 2001 From: Shailendra Singh Date: Sun, 9 Jun 2024 10:09:35 +0530 Subject: [PATCH] Delete stale index routing table files. Signed-off-by: Shailendra Singh --- .../remote/RemoteRoutingTableService.java | 14 +++ .../RemoteClusterStateCleanupManager.java | 30 ++++++- .../remote/RemoteClusterStateService.java | 2 +- .../RemoteRoutingTableServiceTests.java | 47 ++++++++++ ...RemoteClusterStateCleanupManagerTests.java | 88 ++++++++++++++++++- 5 files changed, 175 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java b/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java index ba2208e17df1f..be7d491af123d 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java +++ b/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java @@ -10,6 +10,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.lifecycle.AbstractLifecycleComponent; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.io.IOUtils; @@ -20,6 +21,7 @@ import org.opensearch.repositories.blobstore.BlobStoreRepository; import java.io.IOException; +import java.util.List; import java.util.function.Supplier; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled; @@ -61,6 +63,18 @@ protected void doStart() { blobStoreRepository = (BlobStoreRepository) repository; } + public void deleteStaleIndexRoutingPaths(List stalePaths) throws IOException { + try { + System.out.println("Deleting stale index routing files from remote - " + stalePaths); + logger.debug(() -> "Deleting stale index routing files from remote - " + stalePaths); + + blobStoreRepository.blobStore().blobContainer(BlobPath.cleanPath()).deleteBlobsIgnoringIfNotExists(stalePaths); + } catch (IOException e) { + logger.error("Error while deleting stale index routing files", e); + throw e; + } + } + @Override protected void doStop() {} diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManager.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManager.java index 2fca239b10efd..39bc28fcce4ab 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManager.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManager.java @@ -13,6 +13,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Strings; import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.routing.remote.RemoteRoutingTableService; import org.opensearch.cluster.service.ClusterApplierService; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.blobstore.BlobMetadata; @@ -31,6 +32,7 @@ import java.util.HashSet; import java.util.List; import java.util.Locale; +import java.util.Optional; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; @@ -74,8 +76,13 @@ public class RemoteClusterStateCleanupManager implements Closeable { private long lastCleanupAttemptStateVersion; private final ThreadPool threadpool; private final ClusterApplierService clusterApplierService; + private final Optional remoteRoutingTableService; - public RemoteClusterStateCleanupManager(RemoteClusterStateService remoteClusterStateService, ClusterService clusterService) { + public RemoteClusterStateCleanupManager( + RemoteClusterStateService remoteClusterStateService, + ClusterService clusterService, + Optional remoteRoutingTableService + ) { this.remoteClusterStateService = remoteClusterStateService; this.remoteStateStats = remoteClusterStateService.getStats(); ClusterSettings clusterSettings = clusterService.getClusterSettings(); @@ -84,6 +91,7 @@ public RemoteClusterStateCleanupManager(RemoteClusterStateService remoteClusterS this.threadpool = remoteClusterStateService.getThreadpool(); // initialize with 0, a cleanup will be done when this node is elected master node and version is incremented more than threshold this.lastCleanupAttemptStateVersion = 0; + this.remoteRoutingTableService = remoteRoutingTableService; clusterSettings.addSettingsUpdateConsumer(REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING, this::updateCleanupInterval); } @@ -171,6 +179,7 @@ void deleteClusterMetadata( Set staleManifestPaths = new HashSet<>(); Set staleIndexMetadataPaths = new HashSet<>(); Set staleGlobalMetadataPaths = new HashSet<>(); + Set staleIndexRoutingPaths = new HashSet<>(); activeManifestBlobMetadata.forEach(blobMetadata -> { ClusterMetadataManifest clusterMetadataManifest = remoteClusterStateService.fetchRemoteClusterMetadataManifest( clusterName, @@ -189,6 +198,10 @@ void deleteClusterMetadata( .values() .forEach(attribute -> filesToKeep.add(attribute.getUploadedFilename())); } + if (remoteRoutingTableService.isPresent() && clusterMetadataManifest.getIndicesRouting() != null) { + clusterMetadataManifest.getIndicesRouting() + .forEach(uploadedIndicesRouting -> filesToKeep.add(uploadedIndicesRouting.getUploadedFilename())); + } }); staleManifestBlobMetadata.forEach(blobMetadata -> { ClusterMetadataManifest clusterMetadataManifest = remoteClusterStateService.fetchRemoteClusterMetadataManifest( @@ -221,6 +234,14 @@ void deleteClusterMetadata( attribute -> addStaleGlobalMetadataPath(attribute.getUploadedFilename(), filesToKeep, staleGlobalMetadataPaths) ); } + if (remoteRoutingTableService.isPresent() && clusterMetadataManifest.getIndicesRouting() != null) { + clusterMetadataManifest.getIndicesRouting().forEach(uploadedIndicesRouting -> { + if (!filesToKeep.contains(uploadedIndicesRouting.getUploadedFilename())) { + staleIndexRoutingPaths.add(uploadedIndicesRouting.getUploadedFilename()); + logger.debug(() -> "Indices routing paths in stale manifest: " + uploadedIndicesRouting.getUploadedFilename()); + } + }); + } clusterMetadataManifest.getIndices().forEach(uploadedIndexMetadata -> { if (filesToKeep.contains(uploadedIndexMetadata.getUploadedFilename()) == false) { @@ -240,6 +261,13 @@ void deleteClusterMetadata( deleteStalePaths(clusterName, clusterUUID, new ArrayList<>(staleGlobalMetadataPaths)); deleteStalePaths(clusterName, clusterUUID, new ArrayList<>(staleIndexMetadataPaths)); deleteStalePaths(clusterName, clusterUUID, new ArrayList<>(staleManifestPaths)); + remoteRoutingTableService.ifPresent(remoteroutingTableService -> { + try { + remoteroutingTableService.deleteStaleIndexRoutingPaths(new ArrayList<>(staleIndexRoutingPaths)); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); } catch (IllegalStateException e) { logger.error("Error while fetching Remote Cluster Metadata manifests", e); } catch (IOException e) { diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index d0593dcd51475..f65667a973897 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -254,11 +254,11 @@ public RemoteClusterStateService( clusterSettings.addSettingsUpdateConsumer(GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING, this::setGlobalMetadataUploadTimeout); clusterSettings.addSettingsUpdateConsumer(METADATA_MANIFEST_UPLOAD_TIMEOUT_SETTING, this::setMetadataManifestUploadTimeout); this.remoteStateStats = new RemotePersistenceStats(); - this.remoteClusterStateCleanupManager = new RemoteClusterStateCleanupManager(this, clusterService); this.indexMetadataUploadListeners = indexMetadataUploadListeners; this.remoteRoutingTableService = isRemoteRoutingTableEnabled(settings) ? Optional.of(new RemoteRoutingTableService(repositoriesService, settings)) : Optional.empty(); + this.remoteClusterStateCleanupManager = new RemoteClusterStateCleanupManager(this, clusterService, this.remoteRoutingTableService); } /** diff --git a/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java b/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java index 9a9cbfa153259..eea32d2e61171 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java @@ -8,6 +8,11 @@ package org.opensearch.cluster.routing.remote; +import org.opensearch.common.blobstore.BlobContainer; +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.blobstore.BlobStore; +import org.opensearch.common.compress.DeflateCompressor; +import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.FeatureFlags; import org.opensearch.repositories.FilterRepository; @@ -18,20 +23,33 @@ import org.junit.After; import org.junit.Before; +import java.io.IOException; +import java.util.Arrays; +import java.util.List; import java.util.function.Supplier; +import org.mockito.Mockito; + import static org.opensearch.common.util.FeatureFlags.REMOTE_PUBLICATION_EXPERIMENTAL; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class RemoteRoutingTableServiceTests extends OpenSearchTestCase { private RemoteRoutingTableService remoteRoutingTableService; + + private ClusterSettings clusterSettings; private Supplier repositoriesServiceSupplier; private RepositoriesService repositoriesService; private BlobStoreRepository blobStoreRepository; + private BlobStore blobStore; + private BlobContainer blobContainer; + private BlobPath basePath; @Before public void setup() { @@ -43,9 +61,15 @@ public void setup() { .put("node.attr." + REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY, "routing_repository") .build(); + clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + blobStoreRepository = mock(BlobStoreRepository.class); + when(blobStoreRepository.getCompressor()).thenReturn(new DeflateCompressor()); + blobStore = mock(BlobStore.class); + blobContainer = mock(BlobContainer.class); when(repositoriesService.repository("routing_repository")).thenReturn(blobStoreRepository); + when(blobStoreRepository.blobStore()).thenReturn(blobStore); Settings nodeSettings = Settings.builder().put(REMOTE_PUBLICATION_EXPERIMENTAL, "true").build(); FeatureFlags.initializeFeatureFlags(nodeSettings); @@ -74,4 +98,27 @@ public void testFailStartWhenNotBlobRepository() { assertThrows(AssertionError.class, () -> remoteRoutingTableService.start()); } + public void testDeleteStaleIndexRoutingPaths() throws IOException { + doNothing().when(blobContainer).deleteBlobsIgnoringIfNotExists(any()); + when(blobStore.blobContainer(any())).thenReturn(blobContainer); + List stalePaths = Arrays.asList("path1", "path2"); + remoteRoutingTableService.doStart(); + remoteRoutingTableService.deleteStaleIndexRoutingPaths(stalePaths); + verify(blobContainer).deleteBlobsIgnoringIfNotExists(stalePaths); + } + + public void testDeleteStaleIndexRoutingPathsThrowsIOException() throws IOException { + when(blobStore.blobContainer(any())).thenReturn(blobContainer); + List stalePaths = Arrays.asList("path1", "path2"); + // Simulate an IOException + doThrow(new IOException("test exception")).when(blobContainer).deleteBlobsIgnoringIfNotExists(Mockito.anyList()); + + remoteRoutingTableService.doStart(); + IOException thrown = assertThrows(IOException.class, () -> { + remoteRoutingTableService.deleteStaleIndexRoutingPaths(stalePaths); + }); + assertEquals("test exception", thrown.getMessage()); + verify(blobContainer).deleteBlobsIgnoringIfNotExists(stalePaths); + } + } diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManagerTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManagerTests.java index 24fd1b164a4ff..8ad327e5415f5 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManagerTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManagerTests.java @@ -12,6 +12,7 @@ import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.routing.remote.RemoteRoutingTableService; import org.opensearch.cluster.service.ClusterApplierService; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.blobstore.BlobContainer; @@ -39,6 +40,7 @@ import java.util.HashSet; import java.util.List; import java.util.Locale; +import java.util.Optional; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -47,6 +49,7 @@ import static org.opensearch.gateway.remote.ClusterMetadataManifest.CODEC_V1; import static org.opensearch.gateway.remote.ClusterMetadataManifest.CODEC_V2; +import static org.opensearch.gateway.remote.ClusterMetadataManifest.CODEC_V3; import static org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata; import static org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadataAttribute; import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.AsyncStaleFileDeletion; @@ -76,6 +79,7 @@ import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -92,6 +96,9 @@ public class RemoteClusterStateCleanupManagerTests extends OpenSearchTestCase { private ClusterState clusterState; private Metadata metadata; private RemoteClusterStateService remoteClusterStateService; + private Optional remoteRoutingTableService; + + private RemoteRoutingTableService mockedRemoteRoutingTableService; private final ThreadPool threadPool = new TestThreadPool(getClass().getName()); @Before @@ -139,7 +146,14 @@ public void setup() { when(remoteClusterStateService.getStats()).thenReturn(new RemotePersistenceStats()); when(remoteClusterStateService.getThreadpool()).thenReturn(threadPool); when(remoteClusterStateService.getBlobStore()).thenReturn(blobStore); - remoteClusterStateCleanupManager = new RemoteClusterStateCleanupManager(remoteClusterStateService, clusterService); + mockedRemoteRoutingTableService = mock(RemoteRoutingTableService.class); + remoteRoutingTableService = Optional.of(mockedRemoteRoutingTableService); + when(remoteClusterStateService.getBlobStore()).thenReturn(blobStore); + remoteClusterStateCleanupManager = new RemoteClusterStateCleanupManager( + remoteClusterStateService, + clusterService, + remoteRoutingTableService + ); } @After @@ -155,11 +169,13 @@ public void testDeleteClusterMetadata() throws IOException { List inactiveBlobs = Arrays.asList( new PlainBlobMetadata("manifest1.dat", 1L), new PlainBlobMetadata("manifest2.dat", 1L), - new PlainBlobMetadata("manifest3.dat", 1L) + new PlainBlobMetadata("manifest3.dat", 1L), + new PlainBlobMetadata("manifest6.dat", 1L) ); List activeBlobs = Arrays.asList( new PlainBlobMetadata("manifest4.dat", 1L), - new PlainBlobMetadata("manifest5.dat", 1L) + new PlainBlobMetadata("manifest5.dat", 1L), + new PlainBlobMetadata("manifest7.dat", 1L) ); UploadedIndexMetadata index1Metadata = new UploadedIndexMetadata("index1", "indexUUID1", "index_metadata1"); UploadedIndexMetadata index2Metadata = new UploadedIndexMetadata("index2", "indexUUID2", "index_metadata2"); @@ -199,6 +215,45 @@ public void testDeleteClusterMetadata() throws IOException { .settingMetadata(settingMetadataUpdated) .build(); + UploadedIndexMetadata index3Metadata = new UploadedIndexMetadata("index3", "indexUUID3", "index_metadata3"); + UploadedIndexMetadata index4Metadata = new UploadedIndexMetadata("index4", "indexUUID4", "index_metadata4"); + List indicesRouting1 = List.of(index3Metadata, index4Metadata); + List indicesRouting2 = List.of(index4Metadata); + ClusterMetadataManifest manifest6 = ClusterMetadataManifest.builder() + .indices(List.of(index1Metadata)) + .coordinationMetadata(coordinationMetadataUpdated) + .templatesMetadata(templateMetadataUpdated) + .settingMetadata(settingMetadataUpdated) + .clusterTerm(1L) + .stateVersion(1L) + .codecVersion(CODEC_V3) + .stateUUID(randomAlphaOfLength(10)) + .clusterUUID(clusterUUID) + .nodeId("nodeA") + .opensearchVersion(VersionUtils.randomOpenSearchVersion(random())) + .previousClusterUUID(ClusterState.UNKNOWN_UUID) + .committed(true) + .routingTableVersion(0L) + .indicesRouting(indicesRouting1) + .build(); + ClusterMetadataManifest manifest7 = ClusterMetadataManifest.builder() + .indices(List.of(index2Metadata)) + .coordinationMetadata(coordinationMetadataUpdated) + .templatesMetadata(templateMetadataUpdated) + .settingMetadata(settingMetadataUpdated) + .clusterTerm(1L) + .stateVersion(1L) + .codecVersion(CODEC_V3) + .stateUUID(randomAlphaOfLength(10)) + .clusterUUID(clusterUUID) + .nodeId("nodeA") + .opensearchVersion(VersionUtils.randomOpenSearchVersion(random())) + .previousClusterUUID(ClusterState.UNKNOWN_UUID) + .committed(true) + .routingTableVersion(0L) + .indicesRouting(indicesRouting2) + .build(); + // active manifest have reference to index1Updated, index2, settingsUpdated, coordinationUpdated, templates, templatesUpdated ClusterMetadataManifest manifest4 = ClusterMetadataManifest.builder(manifest3) .coordinationMetadata(coordinationMetadataUpdated) @@ -208,10 +263,13 @@ public void testDeleteClusterMetadata() throws IOException { when(remoteClusterStateService.fetchRemoteClusterMetadataManifest(eq(clusterName), eq(clusterUUID), any())).thenReturn( manifest4, manifest5, + manifest7, manifest1, manifest2, - manifest3 + manifest3, + manifest6 ); + BlobContainer container = mock(BlobContainer.class); when(blobStore.blobContainer(any())).thenReturn(container); doNothing().when(container).deleteBlobsIgnoringIfNotExists(any()); @@ -231,9 +289,31 @@ public void testDeleteClusterMetadata() throws IOException { + ".dat" ) ); + + verify(mockedRemoteRoutingTableService).deleteStaleIndexRoutingPaths(List.of(index3Metadata.getUploadedFilename())); Set staleManifest = new HashSet<>(); inactiveBlobs.forEach(blob -> staleManifest.add(new BlobPath().add(MANIFEST_PATH_TOKEN).buildAsString() + blob.name())); verify(container).deleteBlobsIgnoringIfNotExists(new ArrayList<>(staleManifest)); + + // Test case when remoteRoutingTableService is null + ClusterService clusterService = mock(ClusterService.class); + when(clusterService.getClusterSettings()).thenReturn(clusterSettings); + when(clusterState.getClusterName()).thenReturn(new ClusterName("test")); + when(metadata.clusterUUID()).thenReturn("testUUID"); + when(clusterState.metadata()).thenReturn(metadata); + when(clusterApplierService.state()).thenReturn(clusterState); + when(clusterService.getClusterApplierService()).thenReturn(clusterApplierService); + + mockedRemoteRoutingTableService = mock(RemoteRoutingTableService.class); + remoteRoutingTableService = Optional.of(mockedRemoteRoutingTableService); + + remoteClusterStateCleanupManager = new RemoteClusterStateCleanupManager( + remoteClusterStateService, + clusterService, + Optional.empty() + ); + remoteClusterStateCleanupManager.deleteClusterMetadata(clusterName, clusterUUID, activeBlobs, inactiveBlobs); + verify(mockedRemoteRoutingTableService, never()).deleteStaleIndexRoutingPaths(any()); } public void testDeleteStaleClusterUUIDs() throws IOException {