From 9624d45b0f1a182a5136aa912e49ed97c6f3bfa9 Mon Sep 17 00:00:00 2001 From: Arpit Bandejiya Date: Wed, 10 Jul 2024 18:27:06 +0530 Subject: [PATCH] Fix repo initialisation in remote routing table Signed-off-by: Arpit Bandejiya --- CHANGELOG.md | 1 + .../InternalRemoteRoutingTableService.java | 30 ++++++++++--------- .../RemoteRoutingTableServiceFactory.java | 17 +---------- .../common/remote/BlobPathParameters.java | 4 --- .../remote/RemoteClusterStateService.java | 25 +++++++--------- ...RemoteRoutingTableServiceFactoryTests.java | 17 ----------- .../RemoteRoutingTableServiceTests.java | 13 +++----- 7 files changed, 32 insertions(+), 75 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 813eecbaabfa3..6d87a71ab802d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add allowlist setting for ingest-common and search-pipeline-common processors ([#14439](https://github.com/opensearch-project/OpenSearch/issues/14439)) - Create SystemIndexRegistry with helper method matchesSystemIndex ([#14415](https://github.com/opensearch-project/OpenSearch/pull/14415)) - Print reason why parent task was cancelled ([#14604](https://github.com/opensearch-project/OpenSearch/issues/14604)) +- Refactor remote-routing-table service inline with remote state interfaces([#14668](https://github.com/opensearch-project/OpenSearch/pull/14668)) ### Dependencies - Bump `org.gradle.test-retry` from 1.5.8 to 1.5.9 ([#13442](https://github.com/opensearch-project/OpenSearch/pull/13442)) diff --git a/server/src/main/java/org/opensearch/cluster/routing/remote/InternalRemoteRoutingTableService.java b/server/src/main/java/org/opensearch/cluster/routing/remote/InternalRemoteRoutingTableService.java index 21e6cdb74c75b..f3f245ee9f8f0 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/remote/InternalRemoteRoutingTableService.java +++ b/server/src/main/java/org/opensearch/cluster/routing/remote/InternalRemoteRoutingTableService.java @@ -57,34 +57,26 @@ public class InternalRemoteRoutingTableService extends AbstractLifecycleComponen private static final Logger logger = LogManager.getLogger(InternalRemoteRoutingTableService.class); private final Settings settings; private final Supplier repositoriesService; - private final Compressor compressor; - private final RemoteWritableEntityStore remoteIndexRoutingTableStore; + private Compressor compressor; + private RemoteWritableEntityStore remoteIndexRoutingTableStore; + private final ClusterSettings clusterSettings; private BlobStoreRepository blobStoreRepository; private final ThreadPool threadPool; + private final String clusterName; public InternalRemoteRoutingTableService( Supplier repositoriesService, Settings settings, ClusterSettings clusterSettings, ThreadPool threadpool, - Compressor compressor, - BlobStoreTransferService blobStoreTransferService, - BlobStoreRepository blobStoreRepository, String clusterName ) { assert isRemoteRoutingTableEnabled(settings) : "Remote routing table is not enabled"; this.repositoriesService = repositoriesService; this.settings = settings; this.threadPool = threadpool; - this.compressor = compressor; - this.remoteIndexRoutingTableStore = new RemoteRoutingTableBlobStore<>( - blobStoreTransferService, - blobStoreRepository, - clusterName, - threadpool, - ThreadPool.Names.REMOTE_STATE_READ, - clusterSettings - ); + this.clusterName = clusterName; + this.clusterSettings = clusterSettings; } public List getIndicesRouting(RoutingTable routingTable) { @@ -211,6 +203,16 @@ protected void doStart() { final Repository repository = repositoriesService.get().repository(remoteStoreRepo); assert repository instanceof BlobStoreRepository : "Repository should be instance of BlobStoreRepository"; blobStoreRepository = (BlobStoreRepository) repository; + compressor = blobStoreRepository.getCompressor(); + + this.remoteIndexRoutingTableStore = new RemoteRoutingTableBlobStore<>( + new BlobStoreTransferService(blobStoreRepository.blobStore(), threadPool), + blobStoreRepository, + clusterName, + threadPool, + ThreadPool.Names.REMOTE_STATE_READ, + clusterSettings + ); } @Override diff --git a/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceFactory.java b/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceFactory.java index cbdebcb3dea43..56dfa03215a64 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceFactory.java +++ b/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceFactory.java @@ -10,10 +10,7 @@ import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; -import org.opensearch.core.compress.Compressor; -import org.opensearch.index.translog.transfer.BlobStoreTransferService; import org.opensearch.repositories.RepositoriesService; -import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.threadpool.ThreadPool; import java.util.function.Supplier; @@ -38,22 +35,10 @@ public static RemoteRoutingTableService getService( Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool, - Compressor compressor, - BlobStoreTransferService blobStoreTransferService, - BlobStoreRepository blobStoreRepository, String clusterName ) { if (isRemoteRoutingTableEnabled(settings)) { - return new InternalRemoteRoutingTableService( - repositoriesService, - settings, - clusterSettings, - threadPool, - compressor, - blobStoreTransferService, - blobStoreRepository, - clusterName - ); + return new InternalRemoteRoutingTableService(repositoriesService, settings, clusterSettings, threadPool, clusterName); } return new NoopRemoteRoutingTableService(); } diff --git a/server/src/main/java/org/opensearch/common/remote/BlobPathParameters.java b/server/src/main/java/org/opensearch/common/remote/BlobPathParameters.java index 7d15e2875886f..58c73a804b66a 100644 --- a/server/src/main/java/org/opensearch/common/remote/BlobPathParameters.java +++ b/server/src/main/java/org/opensearch/common/remote/BlobPathParameters.java @@ -24,10 +24,6 @@ public BlobPathParameters(final List pathTokens, final String filePrefix this.filePrefix = filePrefix; } - public BlobPathParameters(final List pathTokens) { - this(pathTokens, null); - } - public List getPathTokens() { return pathTokens; } diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index f756ed1949a13..82ed6a06870a2 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -105,7 +105,6 @@ import static org.opensearch.gateway.remote.model.RemoteTemplatesMetadata.TEMPLATES_METADATA; import static org.opensearch.gateway.remote.model.RemoteTransientSettingsMetadata.TRANSIENT_SETTING_METADATA; import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_METADATA_PREFIX; -import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_TABLE; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled; /** @@ -201,6 +200,14 @@ public RemoteClusterStateService( this.isPublicationEnabled = FeatureFlags.isEnabled(REMOTE_PUBLICATION_EXPERIMENTAL) && RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled(settings) && RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled(settings); + this.remoteRoutingTableService = RemoteRoutingTableServiceFactory.getService( + repositoriesService, + settings, + clusterSettings, + threadpool, + ClusterName.CLUSTER_NAME_SETTING.get(settings).value() + ); + remoteClusterStateCleanupManager = new RemoteClusterStateCleanupManager(this, clusterService, remoteRoutingTableService); } /** @@ -657,7 +664,7 @@ UploadedMetadataResults writeMetadataInParallel( }); indicesRoutingToUpload.forEach(indexRoutingTable -> { uploadTasks.put( - String.join(CUSTOM_DELIMITER, INDEX_ROUTING_TABLE, indexRoutingTable.getIndex().getName()), + INDEX_ROUTING_METADATA_PREFIX + indexRoutingTable.getIndex().getName(), remoteRoutingTableService.getAsyncIndexRoutingWriteAction( clusterState.metadata().clusterUUID(), clusterState.term(), @@ -889,18 +896,7 @@ public void start() { assert repository instanceof BlobStoreRepository : "Repository should be instance of BlobStoreRepository"; blobStoreRepository = (BlobStoreRepository) repository; String clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings).value(); - blobStoreTransferService = new BlobStoreTransferService(getBlobStore(), threadpool); - this.remoteRoutingTableService = RemoteRoutingTableServiceFactory.getService( - repositoriesService, - settings, - clusterSettings, - threadpool, - blobStoreRepository.getCompressor(), - blobStoreTransferService, - blobStoreRepository, - ClusterName.CLUSTER_NAME_SETTING.get(settings).value() - ); remoteGlobalMetadataManager = new RemoteGlobalMetadataManager( clusterSettings, @@ -932,10 +928,9 @@ public void start() { namedWriteableRegistry, threadpool ); - remoteClusterStateCleanupManager = new RemoteClusterStateCleanupManager(this, clusterService, remoteRoutingTableService); - remoteClusterStateCleanupManager.start(); remoteRoutingTableService.start(); + remoteClusterStateCleanupManager.start(); } private void setSlowWriteLoggingThreshold(TimeValue slowWriteLoggingThreshold) { diff --git a/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceFactoryTests.java b/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceFactoryTests.java index 5fd8f087e2073..86f4b9502d6ab 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceFactoryTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceFactoryTests.java @@ -11,11 +11,7 @@ import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.FeatureFlags; -import org.opensearch.core.compress.Compressor; -import org.opensearch.core.compress.NoneCompressor; -import org.opensearch.index.translog.transfer.BlobStoreTransferService; import org.opensearch.repositories.RepositoriesService; -import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.repositories.fs.FsRepository; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.TestThreadPool; @@ -26,7 +22,6 @@ import static org.opensearch.common.util.FeatureFlags.REMOTE_PUBLICATION_EXPERIMENTAL; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY; -import static org.mockito.Mockito.mock; public class RemoteRoutingTableServiceFactoryTests extends OpenSearchTestCase { @@ -41,17 +36,11 @@ public void teardown() throws Exception { public void testGetServiceWhenRemoteRoutingDisabled() { Settings settings = Settings.builder().build(); - BlobStoreRepository blobStoreRepository = mock(BlobStoreRepository.class); - Compressor compressor = new NoneCompressor(); - BlobStoreTransferService blobStoreTransferService = mock(BlobStoreTransferService.class); RemoteRoutingTableService service = RemoteRoutingTableServiceFactory.getService( repositoriesService, settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), threadPool, - compressor, - blobStoreTransferService, - blobStoreRepository, "test-cluster" ); assertTrue(service instanceof NoopRemoteRoutingTableService); @@ -62,9 +51,6 @@ public void testGetServiceWhenRemoteRoutingEnabled() { .put("node.attr." + REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY, "routing_repository") .put(FsRepository.REPOSITORIES_COMPRESS_SETTING.getKey(), false) .build(); - BlobStoreRepository blobStoreRepository = mock(BlobStoreRepository.class); - Compressor compressor = new NoneCompressor(); - BlobStoreTransferService blobStoreTransferService = mock(BlobStoreTransferService.class); Settings nodeSettings = Settings.builder().put(REMOTE_PUBLICATION_EXPERIMENTAL, "true").build(); FeatureFlags.initializeFeatureFlags(nodeSettings); RemoteRoutingTableService service = RemoteRoutingTableServiceFactory.getService( @@ -72,9 +58,6 @@ public void testGetServiceWhenRemoteRoutingEnabled() { settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), threadPool, - compressor, - blobStoreTransferService, - blobStoreRepository, "test-cluster" ); assertTrue(service instanceof InternalRemoteRoutingTableService); diff --git a/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java b/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java index da057dfac4c5c..8a263a1ba9180 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java @@ -113,8 +113,9 @@ public void setup() { when(blobStoreRepository.getCompressor()).thenReturn(new DeflateCompressor()); blobStore = mock(BlobStore.class); blobContainer = mock(BlobContainer.class); - when(repositoriesService.repository("routing_repository")).thenReturn(blobStoreRepository); + when(repositoriesService.repository(anyString())).thenReturn(blobStoreRepository); when(blobStoreRepository.blobStore()).thenReturn(blobStore); + when(blobStore.blobContainer(any())).thenReturn(blobContainer); Settings nodeSettings = Settings.builder().put(REMOTE_PUBLICATION_EXPERIMENTAL, "true").build(); FeatureFlags.initializeFeatureFlags(nodeSettings); compressor = new NoneCompressor(); @@ -125,12 +126,9 @@ public void setup() { settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), threadPool, - compressor, - blobStoreTransferService, - blobStoreRepository, "test-cluster" ); - + remoteRoutingTableService.doStart(); } @After @@ -149,9 +147,6 @@ public void testFailInitializationWhenRemoteRoutingDisabled() { settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), threadPool, - compressor, - blobStoreTransferService, - blobStoreRepository, "test-cluster" ) ); @@ -532,7 +527,7 @@ public void testGetAsyncIndexRoutingReadAction() throws Exception { String indexName = randomAlphaOfLength(randomIntBetween(1, 50)); ClusterState clusterState = createClusterState(indexName); String uploadedFileName = String.format(Locale.ROOT, "index-routing/" + indexName); - when(blobStoreTransferService.downloadBlob(anyIterable(), anyString())).thenReturn( + when(blobContainer.readBlob(indexName)).thenReturn( INDEX_ROUTING_TABLE_FORMAT.serialize( clusterState.getRoutingTable().getIndicesRouting().get(indexName), uploadedFileName,