diff --git a/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java b/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java index ba2208e17df1f..9df0d32f045d2 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java +++ b/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java @@ -10,6 +10,8 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.blobstore.BlobStore; import org.opensearch.common.lifecycle.AbstractLifecycleComponent; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.io.IOUtils; @@ -20,6 +22,7 @@ import org.opensearch.repositories.blobstore.BlobStoreRepository; import java.io.IOException; +import java.util.List; import java.util.function.Supplier; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled; @@ -61,6 +64,11 @@ protected void doStart() { blobStoreRepository = (BlobStoreRepository) repository; } + public void deleteStaleIndexRoutingPaths(List stalePaths) throws IOException { + logger.debug(() -> "Deleting stale index routing files from remote - " + stalePaths); + blobStoreRepository.blobStore().blobContainer(BlobPath.cleanPath()).deleteBlobsIgnoringIfNotExists(stalePaths); + } + @Override protected void doStop() {} diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManager.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManager.java index 2fca239b10efd..645ca3e2816d0 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManager.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManager.java @@ -13,6 +13,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Strings; import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.routing.remote.RemoteRoutingTableService; import org.opensearch.cluster.service.ClusterApplierService; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.blobstore.BlobMetadata; @@ -32,6 +33,7 @@ import java.util.List; import java.util.Locale; import java.util.Set; +import java.util.Optional; import java.util.concurrent.atomic.AtomicBoolean; import static org.opensearch.gateway.remote.RemoteClusterStateService.GLOBAL_METADATA_FORMAT; @@ -74,8 +76,13 @@ public class RemoteClusterStateCleanupManager implements Closeable { private long lastCleanupAttemptStateVersion; private final ThreadPool threadpool; private final ClusterApplierService clusterApplierService; + private final Optional remoteRoutingTableService; - public RemoteClusterStateCleanupManager(RemoteClusterStateService remoteClusterStateService, ClusterService clusterService) { + public RemoteClusterStateCleanupManager( + RemoteClusterStateService remoteClusterStateService, + ClusterService clusterService, + Optional remoteRoutingTableService + ) { this.remoteClusterStateService = remoteClusterStateService; this.remoteStateStats = remoteClusterStateService.getStats(); ClusterSettings clusterSettings = clusterService.getClusterSettings(); @@ -84,6 +91,7 @@ public RemoteClusterStateCleanupManager(RemoteClusterStateService remoteClusterS this.threadpool = remoteClusterStateService.getThreadpool(); // initialize with 0, a cleanup will be done when this node is elected master node and version is incremented more than threshold this.lastCleanupAttemptStateVersion = 0; + this.remoteRoutingTableService = remoteRoutingTableService; clusterSettings.addSettingsUpdateConsumer(REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING, this::updateCleanupInterval); } @@ -171,6 +179,7 @@ void deleteClusterMetadata( Set staleManifestPaths = new HashSet<>(); Set staleIndexMetadataPaths = new HashSet<>(); Set staleGlobalMetadataPaths = new HashSet<>(); + Set staleIndexRoutingPaths = new HashSet<>(); activeManifestBlobMetadata.forEach(blobMetadata -> { ClusterMetadataManifest clusterMetadataManifest = remoteClusterStateService.fetchRemoteClusterMetadataManifest( clusterName, @@ -189,6 +198,10 @@ void deleteClusterMetadata( .values() .forEach(attribute -> filesToKeep.add(attribute.getUploadedFilename())); } + if (remoteRoutingTableService.isPresent() && clusterMetadataManifest.getIndicesRouting() != null) { + clusterMetadataManifest.getIndicesRouting() + .forEach(uploadedIndicesRouting -> filesToKeep.add(uploadedIndicesRouting.getUploadedFilename())); + } }); staleManifestBlobMetadata.forEach(blobMetadata -> { ClusterMetadataManifest clusterMetadataManifest = remoteClusterStateService.fetchRemoteClusterMetadataManifest( @@ -221,6 +234,14 @@ void deleteClusterMetadata( attribute -> addStaleGlobalMetadataPath(attribute.getUploadedFilename(), filesToKeep, staleGlobalMetadataPaths) ); } + if (remoteRoutingTableService.isPresent() && clusterMetadataManifest.getIndicesRouting() != null) { + clusterMetadataManifest.getIndicesRouting().forEach(uploadedIndicesRouting -> { + if (!filesToKeep.contains(uploadedIndicesRouting.getUploadedFilename())) { + staleIndexRoutingPaths.add(uploadedIndicesRouting.getUploadedFilename()); + logger.debug(() -> "Indices routing paths in stale manifest: " + uploadedIndicesRouting.getUploadedFilename()); + } + }); + } clusterMetadataManifest.getIndices().forEach(uploadedIndexMetadata -> { if (filesToKeep.contains(uploadedIndexMetadata.getUploadedFilename()) == false) { @@ -240,6 +261,9 @@ void deleteClusterMetadata( deleteStalePaths(clusterName, clusterUUID, new ArrayList<>(staleGlobalMetadataPaths)); deleteStalePaths(clusterName, clusterUUID, new ArrayList<>(staleIndexMetadataPaths)); deleteStalePaths(clusterName, clusterUUID, new ArrayList<>(staleManifestPaths)); + if (remoteRoutingTableService.isPresent()) { + remoteRoutingTableService.get().deleteStaleIndexRoutingPaths(new ArrayList<>(staleIndexRoutingPaths)); + } } catch (IllegalStateException e) { logger.error("Error while fetching Remote Cluster Metadata manifests", e); } catch (IOException e) { diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index d0593dcd51475..e717cc76770e9 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -254,11 +254,15 @@ public RemoteClusterStateService( clusterSettings.addSettingsUpdateConsumer(GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING, this::setGlobalMetadataUploadTimeout); clusterSettings.addSettingsUpdateConsumer(METADATA_MANIFEST_UPLOAD_TIMEOUT_SETTING, this::setMetadataManifestUploadTimeout); this.remoteStateStats = new RemotePersistenceStats(); - this.remoteClusterStateCleanupManager = new RemoteClusterStateCleanupManager(this, clusterService); this.indexMetadataUploadListeners = indexMetadataUploadListeners; this.remoteRoutingTableService = isRemoteRoutingTableEnabled(settings) ? Optional.of(new RemoteRoutingTableService(repositoriesService, settings)) : Optional.empty(); + this.remoteClusterStateCleanupManager = new RemoteClusterStateCleanupManager( + this, + clusterService, + this.remoteRoutingTableService + ); } /** diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManagerTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManagerTests.java index 24fd1b164a4ff..fe1397dd95c12 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManagerTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManagerTests.java @@ -12,6 +12,7 @@ import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.routing.remote.RemoteRoutingTableService; import org.opensearch.cluster.service.ClusterApplierService; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.blobstore.BlobContainer; @@ -40,6 +41,7 @@ import java.util.List; import java.util.Locale; import java.util.Set; +import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -47,6 +49,7 @@ import static org.opensearch.gateway.remote.ClusterMetadataManifest.CODEC_V1; import static org.opensearch.gateway.remote.ClusterMetadataManifest.CODEC_V2; +import static org.opensearch.gateway.remote.ClusterMetadataManifest.CODEC_V3; import static org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata; import static org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadataAttribute; import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.AsyncStaleFileDeletion; @@ -76,6 +79,7 @@ import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -92,6 +96,9 @@ public class RemoteClusterStateCleanupManagerTests extends OpenSearchTestCase { private ClusterState clusterState; private Metadata metadata; private RemoteClusterStateService remoteClusterStateService; + private Optional remoteRoutingTableService; + + private RemoteRoutingTableService mockedRemoteRoutingTableService; private final ThreadPool threadPool = new TestThreadPool(getClass().getName()); @Before @@ -139,7 +146,14 @@ public void setup() { when(remoteClusterStateService.getStats()).thenReturn(new RemotePersistenceStats()); when(remoteClusterStateService.getThreadpool()).thenReturn(threadPool); when(remoteClusterStateService.getBlobStore()).thenReturn(blobStore); - remoteClusterStateCleanupManager = new RemoteClusterStateCleanupManager(remoteClusterStateService, clusterService); + mockedRemoteRoutingTableService = mock(RemoteRoutingTableService.class); + remoteRoutingTableService = Optional.of(mockedRemoteRoutingTableService); + when(remoteClusterStateService.getBlobStore()).thenReturn(blobStore); + remoteClusterStateCleanupManager = new RemoteClusterStateCleanupManager( + remoteClusterStateService, + clusterService, + remoteRoutingTableService + ); } @After @@ -155,11 +169,13 @@ public void testDeleteClusterMetadata() throws IOException { List inactiveBlobs = Arrays.asList( new PlainBlobMetadata("manifest1.dat", 1L), new PlainBlobMetadata("manifest2.dat", 1L), - new PlainBlobMetadata("manifest3.dat", 1L) + new PlainBlobMetadata("manifest3.dat", 1L), + new PlainBlobMetadata("manifest6.dat", 1L) ); List activeBlobs = Arrays.asList( new PlainBlobMetadata("manifest4.dat", 1L), - new PlainBlobMetadata("manifest5.dat", 1L) + new PlainBlobMetadata("manifest5.dat", 1L), + new PlainBlobMetadata("manifest7.dat", 1L) ); UploadedIndexMetadata index1Metadata = new UploadedIndexMetadata("index1", "indexUUID1", "index_metadata1"); UploadedIndexMetadata index2Metadata = new UploadedIndexMetadata("index2", "indexUUID2", "index_metadata2"); @@ -199,6 +215,45 @@ public void testDeleteClusterMetadata() throws IOException { .settingMetadata(settingMetadataUpdated) .build(); + UploadedIndexMetadata index3Metadata = new UploadedIndexMetadata("index3", "indexUUID3", "index_metadata3"); + UploadedIndexMetadata index4Metadata = new UploadedIndexMetadata("index4", "indexUUID4", "index_metadata4"); + List indicesRouting1 = List.of(index3Metadata, index4Metadata); + List indicesRouting2 = List.of(index4Metadata); + ClusterMetadataManifest manifest6 = ClusterMetadataManifest.builder() + .indices(List.of(index1Metadata)) + .coordinationMetadata(coordinationMetadataUpdated) + .templatesMetadata(templateMetadataUpdated) + .settingMetadata(settingMetadataUpdated) + .clusterTerm(1L) + .stateVersion(1L) + .codecVersion(CODEC_V3) + .stateUUID(randomAlphaOfLength(10)) + .clusterUUID(clusterUUID) + .nodeId("nodeA") + .opensearchVersion(VersionUtils.randomOpenSearchVersion(random())) + .previousClusterUUID(ClusterState.UNKNOWN_UUID) + .committed(true) + .routingTableVersion(0L) + .indicesRouting(indicesRouting1) + .build(); + ClusterMetadataManifest manifest7 = ClusterMetadataManifest.builder() + .indices(List.of(index2Metadata)) + .coordinationMetadata(coordinationMetadataUpdated) + .templatesMetadata(templateMetadataUpdated) + .settingMetadata(settingMetadataUpdated) + .clusterTerm(1L) + .stateVersion(1L) + .codecVersion(CODEC_V3) + .stateUUID(randomAlphaOfLength(10)) + .clusterUUID(clusterUUID) + .nodeId("nodeA") + .opensearchVersion(VersionUtils.randomOpenSearchVersion(random())) + .previousClusterUUID(ClusterState.UNKNOWN_UUID) + .committed(true) + .routingTableVersion(0L) + .indicesRouting(indicesRouting2) + .build(); + // active manifest have reference to index1Updated, index2, settingsUpdated, coordinationUpdated, templates, templatesUpdated ClusterMetadataManifest manifest4 = ClusterMetadataManifest.builder(manifest3) .coordinationMetadata(coordinationMetadataUpdated) @@ -208,10 +263,13 @@ public void testDeleteClusterMetadata() throws IOException { when(remoteClusterStateService.fetchRemoteClusterMetadataManifest(eq(clusterName), eq(clusterUUID), any())).thenReturn( manifest4, manifest5, + manifest7, manifest1, manifest2, - manifest3 + manifest3, + manifest6 ); + BlobContainer container = mock(BlobContainer.class); when(blobStore.blobContainer(any())).thenReturn(container); doNothing().when(container).deleteBlobsIgnoringIfNotExists(any()); @@ -231,9 +289,27 @@ public void testDeleteClusterMetadata() throws IOException { + ".dat" ) ); + + verify(mockedRemoteRoutingTableService).deleteStaleIndexRoutingPaths(List.of(index3Metadata.getUploadedFilename())); Set staleManifest = new HashSet<>(); inactiveBlobs.forEach(blob -> staleManifest.add(new BlobPath().add(MANIFEST_PATH_TOKEN).buildAsString() + blob.name())); verify(container).deleteBlobsIgnoringIfNotExists(new ArrayList<>(staleManifest)); + + // Test case when remoteRoutingTableService is null + ClusterService clusterService = mock(ClusterService.class); + when(clusterService.getClusterSettings()).thenReturn(clusterSettings); + when(clusterState.getClusterName()).thenReturn(new ClusterName("test")); + when(metadata.clusterUUID()).thenReturn("testUUID"); + when(clusterState.metadata()).thenReturn(metadata); + when(clusterApplierService.state()).thenReturn(clusterState); + when(clusterService.getClusterApplierService()).thenReturn(clusterApplierService); + + mockedRemoteRoutingTableService = mock(RemoteRoutingTableService.class); + remoteRoutingTableService = Optional.of(mockedRemoteRoutingTableService); + + remoteClusterStateCleanupManager = new RemoteClusterStateCleanupManager(remoteClusterStateService, clusterService, Optional.empty()); + remoteClusterStateCleanupManager.deleteClusterMetadata(clusterName, clusterUUID, activeBlobs, inactiveBlobs); + verify(mockedRemoteRoutingTableService, never()).deleteStaleIndexRoutingPaths(any()); } public void testDeleteStaleClusterUUIDs() throws IOException {