From 63fce8091c8a52ec54d0474041190d650685ef5e Mon Sep 17 00:00:00 2001 From: Shailendra Singh Date: Sat, 8 Jun 2024 15:22:07 +0530 Subject: [PATCH] Delete stale index routing files. Signed-off-by: Shailendra Singh --- .../remote/RemoteRoutingTableService.java | 12 +++++ .../RemoteClusterStateCleanupManager.java | 31 ++++++------ .../remote/RemoteClusterStateService.java | 6 ++- ...RemoteClusterStateCleanupManagerTests.java | 50 +++++++++++++++---- 4 files changed, 72 insertions(+), 27 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java b/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java index ba2208e17df1f..0a09ef26e265d 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java +++ b/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java @@ -10,6 +10,8 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.blobstore.BlobStore; import org.opensearch.common.lifecycle.AbstractLifecycleComponent; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.io.IOUtils; @@ -20,6 +22,7 @@ import org.opensearch.repositories.blobstore.BlobStoreRepository; import java.io.IOException; +import java.util.List; import java.util.function.Supplier; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled; @@ -61,6 +64,15 @@ protected void doStart() { blobStoreRepository = (BlobStoreRepository) repository; } + public BlobStore getBlobStore() { + return blobStoreRepository.blobStore(); + } + + public void DeleteStaleIndexRoutingPaths(List stalePaths) throws IOException { + logger.debug(() -> "Deleting stale index routing files from remote - " + stalePaths); + getBlobStore().blobContainer(BlobPath.cleanPath()).deleteBlobsIgnoringIfNotExists(stalePaths); + } + @Override protected void doStop() {} diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManager.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManager.java index 15db589a25e5a..cbd67d14a8d80 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManager.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManager.java @@ -13,6 +13,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Strings; import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.routing.remote.RemoteRoutingTableService; import org.opensearch.cluster.service.ClusterApplierService; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.blobstore.BlobMetadata; @@ -24,11 +25,9 @@ import org.opensearch.core.action.ActionListener; import org.opensearch.index.translog.transfer.BlobStoreTransferService; import org.opensearch.threadpool.ThreadPool; -import org.opensearch.cluster.routing.remote.RemoteRoutingTableService; import java.io.Closeable; import java.io.IOException; -import java.sql.Blob; import java.util.ArrayList; import java.util.HashSet; import java.util.List; @@ -76,8 +75,13 @@ public class RemoteClusterStateCleanupManager implements Closeable { private long lastCleanupAttemptStateVersion; private final ThreadPool threadpool; private final ClusterApplierService clusterApplierService; + private final RemoteRoutingTableService remoteRoutingTableService; - public RemoteClusterStateCleanupManager(RemoteClusterStateService remoteClusterStateService, ClusterService clusterService) { + public RemoteClusterStateCleanupManager( + RemoteClusterStateService remoteClusterStateService, + ClusterService clusterService, + RemoteRoutingTableService remoteRoutingTableService + ) { this.remoteClusterStateService = remoteClusterStateService; this.remoteStateStats = remoteClusterStateService.getStats(); ClusterSettings clusterSettings = clusterService.getClusterSettings(); @@ -86,6 +90,7 @@ public RemoteClusterStateCleanupManager(RemoteClusterStateService remoteClusterS this.threadpool = remoteClusterStateService.getThreadpool(); // initialize with 0, a cleanup will be done when this node is elected master node and version is incremented more than threshold this.lastCleanupAttemptStateVersion = 0; + this.remoteRoutingTableService = remoteRoutingTableService; clusterSettings.addSettingsUpdateConsumer(REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING, this::updateCleanupInterval); } @@ -192,7 +197,7 @@ void deleteClusterMetadata( .values() .forEach(attribute -> filesToKeep.add(attribute.getUploadedFilename())); } - if (clusterMetadataManifest.getCodecVersion() >= ClusterMetadataManifest.CODEC_V3) { + if (clusterMetadataManifest.getIndicesRouting() != null) { clusterMetadataManifest.getIndicesRouting() .forEach(uploadedIndicesRouting -> filesToKeep.add(uploadedIndicesRouting.getUploadedFilename())); } @@ -228,10 +233,10 @@ void deleteClusterMetadata( attribute -> addStaleGlobalMetadataPath(attribute.getUploadedFilename(), filesToKeep, staleGlobalMetadataPaths) ); } - if (clusterMetadataManifest.getCodecVersion() >= ClusterMetadataManifest.CODEC_V3) { + if (clusterMetadataManifest.getIndicesRouting() != null) { clusterMetadataManifest.getIndicesRouting().forEach(uploadedIndicesRouting -> { - if (filesToKeep.contains(uploadedIndicesRouting.getUploadedFilename()) == false){ - staleIndexRoutingPaths.add(new BlobPath().buildAsString() + uploadedIndicesRouting.getUploadedFilename()); + if (!filesToKeep.contains(uploadedIndicesRouting.getUploadedFilename())) { + staleIndexRoutingPaths.add(uploadedIndicesRouting.getUploadedFilename()); logger.debug(() -> "Indices routing paths in stale manifest: " + uploadedIndicesRouting.getUploadedFilename()); } }); @@ -255,7 +260,9 @@ void deleteClusterMetadata( deleteStalePaths(clusterName, clusterUUID, new ArrayList<>(staleGlobalMetadataPaths)); deleteStalePaths(clusterName, clusterUUID, new ArrayList<>(staleIndexMetadataPaths)); deleteStalePaths(clusterName, clusterUUID, new ArrayList<>(staleManifestPaths)); - deleteStaleIndexRoutingPaths(new ArrayList<>(staleIndexRoutingPaths)); + if (remoteRoutingTableService != null) { + remoteRoutingTableService.DeleteStaleIndexRoutingPaths(new ArrayList<>(staleIndexRoutingPaths)); + } } catch (IllegalStateException e) { logger.error("Error while fetching Remote Cluster Metadata manifests", e); } catch (IOException e) { @@ -360,14 +367,6 @@ void deleteStalePaths(String clusterName, String clusterUUID, List stale ); } - void deleteStaleIndexRoutingPaths(List stalePaths) throws IOException { - logger.debug(() -> String.format(Locale.ROOT, "Deleting stale index routing files from remote - %s", stalePaths)); - getBlobStoreTransferService().deleteBlobs( - BlobPath.cleanPath(), - stalePaths - ); - } - /** * Purges all remote cluster state against provided cluster UUIDs * @param clusterState current state of the cluster diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index d0593dcd51475..cf7163ea855f0 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -254,11 +254,15 @@ public RemoteClusterStateService( clusterSettings.addSettingsUpdateConsumer(GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING, this::setGlobalMetadataUploadTimeout); clusterSettings.addSettingsUpdateConsumer(METADATA_MANIFEST_UPLOAD_TIMEOUT_SETTING, this::setMetadataManifestUploadTimeout); this.remoteStateStats = new RemotePersistenceStats(); - this.remoteClusterStateCleanupManager = new RemoteClusterStateCleanupManager(this, clusterService); this.indexMetadataUploadListeners = indexMetadataUploadListeners; this.remoteRoutingTableService = isRemoteRoutingTableEnabled(settings) ? Optional.of(new RemoteRoutingTableService(repositoriesService, settings)) : Optional.empty(); + this.remoteClusterStateCleanupManager = new RemoteClusterStateCleanupManager( + this, + clusterService, + this.remoteRoutingTableService.orElse(null) + ); } /** diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManagerTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManagerTests.java index a547ffb1ee591..5016c82eb50de 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManagerTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManagerTests.java @@ -12,6 +12,7 @@ import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.routing.remote.RemoteRoutingTableService; import org.opensearch.cluster.service.ClusterApplierService; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.blobstore.BlobContainer; @@ -47,6 +48,7 @@ import static org.opensearch.gateway.remote.ClusterMetadataManifest.CODEC_V1; import static org.opensearch.gateway.remote.ClusterMetadataManifest.CODEC_V2; +import static org.opensearch.gateway.remote.ClusterMetadataManifest.CODEC_V3; import static org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata; import static org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadataAttribute; import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.AsyncStaleFileDeletion; @@ -76,6 +78,7 @@ import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -92,6 +95,7 @@ public class RemoteClusterStateCleanupManagerTests extends OpenSearchTestCase { private ClusterState clusterState; private Metadata metadata; private RemoteClusterStateService remoteClusterStateService; + private RemoteRoutingTableService remoteRoutingTableService; private final ThreadPool threadPool = new TestThreadPool(getClass().getName()); @Before @@ -139,7 +143,13 @@ public void setup() { when(remoteClusterStateService.getStats()).thenReturn(new RemotePersistenceStats()); when(remoteClusterStateService.getThreadpool()).thenReturn(threadPool); when(remoteClusterStateService.getBlobStore()).thenReturn(blobStore); - remoteClusterStateCleanupManager = new RemoteClusterStateCleanupManager(remoteClusterStateService, clusterService); + remoteRoutingTableService = mock(RemoteRoutingTableService.class); + when(remoteClusterStateService.getBlobStore()).thenReturn(blobStore); + remoteClusterStateCleanupManager = new RemoteClusterStateCleanupManager( + remoteClusterStateService, + clusterService, + remoteRoutingTableService + ); } @After @@ -201,11 +211,15 @@ public void testDeleteClusterMetadata() throws IOException { .settingMetadata(settingMetadataUpdated) .build(); - List indicesRouting1 = List.of(index1Metadata, index2Metadata); - List indicesRouting2 = List.of(index1Metadata); + UploadedIndexMetadata index3Metadata = new UploadedIndexMetadata("index3", "indexUUID3", "index_metadata3"); + UploadedIndexMetadata index4Metadata = new UploadedIndexMetadata("index4", "indexUUID4", "index_metadata4"); + List indicesRouting1 = List.of(index3Metadata, index4Metadata); + List indicesRouting2 = List.of(index4Metadata); ClusterMetadataManifest manifest6 = ClusterMetadataManifest.builder() - .indices(List.of(index1UpdatedMetadata)) - .globalMetadataFileName("global_metadata") + .indices(List.of(index1Metadata)) + .coordinationMetadata(coordinationMetadataUpdated) + .templatesMetadata(templateMetadataUpdated) + .settingMetadata(settingMetadataUpdated) .clusterTerm(1L) .stateVersion(1L) .codecVersion(CODEC_V3) @@ -219,8 +233,10 @@ public void testDeleteClusterMetadata() throws IOException { .indicesRouting(indicesRouting1) .build(); ClusterMetadataManifest manifest7 = ClusterMetadataManifest.builder() - .indices(List.of(index1UpdatedMetadata)) - .globalMetadataFileName("global_metadata") + .indices(List.of(index2Metadata)) + .coordinationMetadata(coordinationMetadataUpdated) + .templatesMetadata(templateMetadataUpdated) + .settingMetadata(settingMetadataUpdated) .clusterTerm(1L) .stateVersion(1L) .codecVersion(CODEC_V3) @@ -243,11 +259,11 @@ public void testDeleteClusterMetadata() throws IOException { when(remoteClusterStateService.fetchRemoteClusterMetadataManifest(eq(clusterName), eq(clusterUUID), any())).thenReturn( manifest4, manifest5, + manifest7, manifest1, manifest2, manifest3, - manifest6, - manifest7 + manifest6 ); BlobContainer container = mock(BlobContainer.class); @@ -269,10 +285,24 @@ public void testDeleteClusterMetadata() throws IOException { + ".dat" ) ); - verify(container).deleteBlobsIgnoringIfNotExists(List.of(new BlobPath().buildAsString() + index1Metadata.getUploadedFilename())); + verify(remoteRoutingTableService).DeleteStaleIndexRoutingPaths(List.of(index3Metadata.getUploadedFilename())); Set staleManifest = new HashSet<>(); inactiveBlobs.forEach(blob -> staleManifest.add(new BlobPath().add(MANIFEST_PATH_TOKEN).buildAsString() + blob.name())); verify(container).deleteBlobsIgnoringIfNotExists(new ArrayList<>(staleManifest)); + + // Test case when remoteRoutingTableService is null + remoteRoutingTableService = mock(RemoteRoutingTableService.class); + ClusterService clusterService = mock(ClusterService.class); + when(clusterService.getClusterSettings()).thenReturn(clusterSettings); + when(clusterState.getClusterName()).thenReturn(new ClusterName("test")); + when(metadata.clusterUUID()).thenReturn("testUUID"); + when(clusterState.metadata()).thenReturn(metadata); + when(clusterApplierService.state()).thenReturn(clusterState); + when(clusterService.getClusterApplierService()).thenReturn(clusterApplierService); + + remoteClusterStateCleanupManager = new RemoteClusterStateCleanupManager(remoteClusterStateService, clusterService, null); + remoteClusterStateCleanupManager.deleteClusterMetadata(clusterName, clusterUUID, activeBlobs, inactiveBlobs); + verify(remoteRoutingTableService, never()).DeleteStaleIndexRoutingPaths(any()); } public void testDeleteStaleClusterUUIDs() throws IOException {