From 545ce679c0553a9ee102604ab45363456e14d4a5 Mon Sep 17 00:00:00 2001 From: Arpit Bandejiya Date: Mon, 8 Jul 2024 17:38:30 +0530 Subject: [PATCH] Fix test files Signed-off-by: Arpit Bandejiya --- .../InternalRemoteRoutingTableService.java | 33 +- .../remote/NoopRemoteRoutingTableService.java | 6 +- .../remote/RemoteRoutingTableService.java | 10 +- .../common/remote/BlobPathParameters.java | 4 + .../remote/RemoteClusterStateService.java | 12 +- .../model/RemoteRoutingTableBlobStore.java | 9 +- .../routingtable/RemoteIndexRoutingTable.java | 47 ++- ...RemoteRoutingTableServiceFactoryTests.java | 23 +- .../RemoteRoutingTableServiceTests.java | 292 +++++------------ .../remote/ClusterMetadataManifestTests.java | 4 +- .../RemoteClusterStateServiceTests.java | 11 +- .../IndexRoutingTableHeaderTests.java | 32 -- .../RemoteIndexRoutingTableTests.java | 308 +++++++++++++++--- 13 files changed, 442 insertions(+), 349 deletions(-) delete mode 100644 server/src/test/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableHeaderTests.java diff --git a/server/src/main/java/org/opensearch/cluster/routing/remote/InternalRemoteRoutingTableService.java b/server/src/main/java/org/opensearch/cluster/routing/remote/InternalRemoteRoutingTableService.java index 3a1ab73b398e3..21e6cdb74c75b 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/remote/InternalRemoteRoutingTableService.java +++ b/server/src/main/java/org/opensearch/cluster/routing/remote/InternalRemoteRoutingTableService.java @@ -12,7 +12,6 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.action.LatchedActionListener; -import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.DiffableUtils; import org.opensearch.cluster.routing.IndexRoutingTable; import org.opensearch.cluster.routing.RoutingTable; @@ -25,7 +24,6 @@ import org.opensearch.common.util.io.IOUtils; import org.opensearch.core.action.ActionListener; import org.opensearch.core.compress.Compressor; -import org.opensearch.core.index.Index; import org.opensearch.gateway.remote.ClusterMetadataManifest; import org.opensearch.gateway.remote.RemoteStateTransferException; import org.opensearch.gateway.remote.model.RemoteRoutingTableBlobStore; @@ -56,17 +54,13 @@ */ public class InternalRemoteRoutingTableService extends AbstractLifecycleComponent implements RemoteRoutingTableService { - public static final String INDEX_ROUTING_PATH_TOKEN = "index-routing"; - public static final String INDEX_ROUTING_FILE_PREFIX = "index_routing"; - public static final String INDEX_ROUTING_METADATA_PREFIX = "indexRouting--"; - private static final Logger logger = LogManager.getLogger(InternalRemoteRoutingTableService.class); private final Settings settings; private final Supplier repositoriesService; private final Compressor compressor; - private final RemoteWritableEntityStore remoteWritableEntityStore; + private final RemoteWritableEntityStore remoteIndexRoutingTableStore; private BlobStoreRepository blobStoreRepository; - private ThreadPool threadPool; + private final ThreadPool threadPool; public InternalRemoteRoutingTableService( Supplier repositoriesService, @@ -83,7 +77,7 @@ public InternalRemoteRoutingTableService( this.settings = settings; this.threadPool = threadpool; this.compressor = compressor; - this.remoteWritableEntityStore = new RemoteRoutingTableBlobStore<>( + this.remoteIndexRoutingTableStore = new RemoteRoutingTableBlobStore<>( blobStoreTransferService, blobStoreRepository, clusterName, @@ -117,7 +111,8 @@ public DiffableUtils.MapDiff getAsyncIndexRoutingWriteAction( - ClusterState clusterState, String clusterUUID, + long term, + long version, IndexRoutingTable indexRouting, LatchedActionListener latchedActionListener ) { - RemoteIndexRoutingTable remoteIndexRoutingTable = new RemoteIndexRoutingTable( - indexRouting, - clusterUUID, - compressor, - clusterState.term(), - clusterState.version() - ); + RemoteIndexRoutingTable remoteIndexRoutingTable = new RemoteIndexRoutingTable(indexRouting, clusterUUID, compressor, term, version); ActionListener completionListener = ActionListener.wrap( resp -> latchedActionListener.onResponse(remoteIndexRoutingTable.getUploadedMetadata()), @@ -146,7 +136,7 @@ public CheckedRunnable getAsyncIndexRoutingWriteAction( ) ); - return () -> remoteWritableEntityStore.writeAsync(remoteIndexRoutingTable, completionListener); + return () -> remoteIndexRoutingTableStore.writeAsync(remoteIndexRoutingTable, completionListener); } /** @@ -177,7 +167,6 @@ public List getAllUploadedIndices public CheckedRunnable getAsyncIndexRoutingReadAction( String clusterUUID, String uploadedFilename, - Index index, LatchedActionListener latchedActionListener ) { @@ -186,9 +175,9 @@ public CheckedRunnable getAsyncIndexRoutingReadAction( latchedActionListener::onFailure ); - RemoteIndexRoutingTable remoteIndexRoutingTable = new RemoteIndexRoutingTable(uploadedFilename, index, clusterUUID, compressor); + RemoteIndexRoutingTable remoteIndexRoutingTable = new RemoteIndexRoutingTable(uploadedFilename, clusterUUID, compressor); - return () -> remoteWritableEntityStore.readAsync(remoteIndexRoutingTable, actionListener); + return () -> remoteIndexRoutingTableStore.readAsync(remoteIndexRoutingTable, actionListener); } @Override diff --git a/server/src/main/java/org/opensearch/cluster/routing/remote/NoopRemoteRoutingTableService.java b/server/src/main/java/org/opensearch/cluster/routing/remote/NoopRemoteRoutingTableService.java index fde4f381b3e8a..4636e492df28f 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/remote/NoopRemoteRoutingTableService.java +++ b/server/src/main/java/org/opensearch/cluster/routing/remote/NoopRemoteRoutingTableService.java @@ -9,13 +9,11 @@ package org.opensearch.cluster.routing.remote; import org.opensearch.action.LatchedActionListener; -import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.DiffableUtils; import org.opensearch.cluster.routing.IndexRoutingTable; import org.opensearch.cluster.routing.RoutingTable; import org.opensearch.common.CheckedRunnable; import org.opensearch.common.lifecycle.AbstractLifecycleComponent; -import org.opensearch.core.index.Index; import org.opensearch.gateway.remote.ClusterMetadataManifest; import java.io.IOException; @@ -42,8 +40,9 @@ public DiffableUtils.MapDiff getAsyncIndexRoutingWriteAction( - ClusterState clusterState, String clusterUUID, + long term, + long version, IndexRoutingTable indexRouting, LatchedActionListener latchedActionListener ) { @@ -65,7 +64,6 @@ public List getAllUploadedIndices public CheckedRunnable getAsyncIndexRoutingReadAction( String clusterUUID, String uploadedFilename, - Index index, LatchedActionListener latchedActionListener ) { // noop diff --git a/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java b/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java index 823487132c132..828c65353cdd9 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java +++ b/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java @@ -9,7 +9,6 @@ package org.opensearch.cluster.routing.remote; import org.opensearch.action.LatchedActionListener; -import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.DiffableUtils; import org.opensearch.cluster.routing.IndexRoutingTable; import org.opensearch.cluster.routing.RoutingTable; @@ -17,7 +16,6 @@ import org.opensearch.common.lifecycle.LifecycleComponent; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; -import org.opensearch.core.index.Index; import org.opensearch.gateway.remote.ClusterMetadataManifest; import java.io.IOException; @@ -30,8 +28,8 @@ * @opensearch.internal */ public interface RemoteRoutingTableService extends LifecycleComponent { - public static final DiffableUtils.NonDiffableValueSerializer CUSTOM_ROUTING_TABLE_VALUE_SERIALIZER = - new DiffableUtils.NonDiffableValueSerializer() { + DiffableUtils.NonDiffableValueSerializer CUSTOM_ROUTING_TABLE_VALUE_SERIALIZER = + new DiffableUtils.NonDiffableValueSerializer<>() { @Override public void write(IndexRoutingTable value, StreamOutput out) throws IOException { value.writeTo(out); @@ -48,7 +46,6 @@ public IndexRoutingTable read(StreamInput in, String key) throws IOException { CheckedRunnable getAsyncIndexRoutingReadAction( String clusterUUID, String uploadedFilename, - Index index, LatchedActionListener latchedActionListener ); @@ -63,8 +60,9 @@ DiffableUtils.MapDiff> ); CheckedRunnable getAsyncIndexRoutingWriteAction( - ClusterState clusterState, String clusterUUID, + long term, + long version, IndexRoutingTable indexRouting, LatchedActionListener latchedActionListener ); diff --git a/server/src/main/java/org/opensearch/common/remote/BlobPathParameters.java b/server/src/main/java/org/opensearch/common/remote/BlobPathParameters.java index 58c73a804b66a..7d15e2875886f 100644 --- a/server/src/main/java/org/opensearch/common/remote/BlobPathParameters.java +++ b/server/src/main/java/org/opensearch/common/remote/BlobPathParameters.java @@ -24,6 +24,10 @@ public BlobPathParameters(final List pathTokens, final String filePrefix this.filePrefix = filePrefix; } + public BlobPathParameters(final List pathTokens) { + this(pathTokens, null); + } + public List getPathTokens() { return pathTokens; } diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index f6d75805e9483..a64aabc4f8306 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -26,7 +26,6 @@ import org.opensearch.cluster.node.DiscoveryNodes.Builder; import org.opensearch.cluster.routing.IndexRoutingTable; import org.opensearch.cluster.routing.RoutingTable; -import org.opensearch.cluster.routing.remote.InternalRemoteRoutingTableService; import org.opensearch.cluster.routing.remote.RemoteRoutingTableService; import org.opensearch.cluster.routing.remote.RemoteRoutingTableServiceFactory; import org.opensearch.cluster.service.ClusterService; @@ -43,7 +42,6 @@ import org.opensearch.common.util.io.IOUtils; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.io.stream.NamedWriteableRegistry; -import org.opensearch.core.index.Index; import org.opensearch.core.xcontent.ToXContent; import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata; import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadataAttribute; @@ -106,6 +104,8 @@ import static org.opensearch.gateway.remote.model.RemotePersistentSettingsMetadata.SETTING_METADATA; import static org.opensearch.gateway.remote.model.RemoteTemplatesMetadata.TEMPLATES_METADATA; import static org.opensearch.gateway.remote.model.RemoteTransientSettingsMetadata.TRANSIENT_SETTING_METADATA; +import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_TABLE; +import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_TABLE_PREFIX; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled; /** @@ -657,10 +657,11 @@ UploadedMetadataResults writeMetadataInParallel( }); indicesRoutingToUpload.forEach(indexRoutingTable -> { uploadTasks.put( - InternalRemoteRoutingTableService.INDEX_ROUTING_METADATA_PREFIX + indexRoutingTable.getIndex().getName(), + String.join(CUSTOM_DELIMITER, INDEX_ROUTING_TABLE, indexRoutingTable.getIndex().getName()), remoteRoutingTableService.getAsyncIndexRoutingWriteAction( - clusterState, clusterState.metadata().clusterUUID(), + clusterState.term(), + clusterState.version(), indexRoutingTable, listener ) @@ -713,7 +714,7 @@ UploadedMetadataResults writeMetadataInParallel( UploadedMetadataResults response = new UploadedMetadataResults(); results.forEach((name, uploadedMetadata) -> { if (uploadedMetadata.getClass().equals(UploadedIndexMetadata.class) - && uploadedMetadata.getComponent().contains(InternalRemoteRoutingTableService.INDEX_ROUTING_METADATA_PREFIX)) { + && uploadedMetadata.getComponent().contains(INDEX_ROUTING_TABLE_PREFIX)) { response.uploadedIndicesRoutingMetadata.add((UploadedIndexMetadata) uploadedMetadata); } else if (name.startsWith(CUSTOM_METADATA)) { // component name for custom metadata will look like custom-- @@ -1039,7 +1040,6 @@ ClusterState readClusterStateInParallel( remoteRoutingTableService.getAsyncIndexRoutingReadAction( clusterUUID, indexRouting.getUploadedFilename(), - new Index(indexRouting.getIndexName(), indexRouting.getIndexUUID()), routingTableLatchedActionListener ) ); diff --git a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteRoutingTableBlobStore.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteRoutingTableBlobStore.java index 17354d16c1b0f..ecadb3159bb9f 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteRoutingTableBlobStore.java +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteRoutingTableBlobStore.java @@ -21,11 +21,7 @@ import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.threadpool.ThreadPool; -import java.io.EOFException; -import java.io.IOException; -import java.io.InputStream; - -import static org.opensearch.cluster.routing.remote.InternalRemoteRoutingTableService.INDEX_ROUTING_PATH_TOKEN; +import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_TABLE; /** * Extends the RemoteClusterStateBlobStore to support {@link RemoteIndexRoutingTable} @@ -60,7 +56,6 @@ public class RemoteRoutingTableBlobStore { - public static final String INDEX_ROUTING_TABLE = "index_routing_table"; + public static final String INDEX_ROUTING_TABLE = "index-routing"; + public static final String INDEX_ROUTING_TABLE_PREFIX = "index-routing--"; + public static final String INDEX_ROUTING_METADATA_PREFIX = "indexRouting--"; + public static final String INDEX_ROUTING_FILE = "index_routing"; private IndexRoutingTable indexRoutingTable; private final Index index; private long term; private long version; - public static final ChecksumWritableBlobStoreFormat INDEX_ROUTING_TABLE_FORMAT = new ChecksumWritableBlobStoreFormat<>( - "index-routing-table", - IndexRoutingTable::readFrom - ); + public static final ChecksumWritableBlobStoreFormat INDEX_ROUTING_TABLE_FORMAT = + new ChecksumWritableBlobStoreFormat<>("index-routing-table", IndexRoutingTable::readFrom); public RemoteIndexRoutingTable( IndexRoutingTable indexRoutingTable, @@ -58,17 +52,17 @@ public RemoteIndexRoutingTable( this.indexRoutingTable = indexRoutingTable; this.term = term; this.version = version; - this.blobFileName = generateBlobFileName(); } /** * Reads data from inputStream and creates RemoteIndexRoutingTable object with the {@link IndexRoutingTable} * @param blobName name of the blob, which contains the index routing data - * @param index index for the current routing data + * @param clusterUUID UUID of the cluster + * @param compressor Compressor object */ - public RemoteIndexRoutingTable(String blobName, Index index, String clusterUUID, Compressor compressor) { + public RemoteIndexRoutingTable(String blobName, String clusterUUID, Compressor compressor) { super(clusterUUID, compressor, null); - this.index = index; + this.index = null; this.term = -1; this.version = -1; this.blobName = blobName; @@ -84,7 +78,7 @@ public Index getIndex() { @Override public BlobPathParameters getBlobPathParameters() { - return new BlobPathParameters(List.of(indexRoutingTable.getIndex().getUUID()), ""); + return new BlobPathParameters(List.of(indexRoutingTable.getIndex().getUUID())); } @Override @@ -94,17 +88,22 @@ public String getType() { @Override public String generateBlobFileName() { - return String.join( - DELIMITER, - INDEX_ROUTING_FILE_PREFIX, - RemoteStoreUtils.invertLong(term), - RemoteStoreUtils.invertLong(version), - RemoteStoreUtils.invertLong(System.currentTimeMillis()) - ); + if (blobFileName == null) { + blobFileName = String.join( + DELIMITER, + INDEX_ROUTING_FILE, + RemoteStoreUtils.invertLong(term), + RemoteStoreUtils.invertLong(version), + RemoteStoreUtils.invertLong(System.currentTimeMillis()) + ); + } + return blobFileName; } @Override public ClusterMetadataManifest.UploadedMetadata getUploadedMetadata() { + assert blobName != null; + assert index != null; return new ClusterMetadataManifest.UploadedIndexMetadata(index.getName(), index.getUUID(), blobName, INDEX_ROUTING_METADATA_PREFIX); } diff --git a/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceFactoryTests.java b/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceFactoryTests.java index 39294ee8da41e..5fd8f087e2073 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceFactoryTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceFactoryTests.java @@ -11,7 +11,11 @@ import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.FeatureFlags; +import org.opensearch.core.compress.Compressor; +import org.opensearch.core.compress.NoneCompressor; +import org.opensearch.index.translog.transfer.BlobStoreTransferService; import org.opensearch.repositories.RepositoriesService; +import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.repositories.fs.FsRepository; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.TestThreadPool; @@ -22,6 +26,7 @@ import static org.opensearch.common.util.FeatureFlags.REMOTE_PUBLICATION_EXPERIMENTAL; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY; +import static org.mockito.Mockito.mock; public class RemoteRoutingTableServiceFactoryTests extends OpenSearchTestCase { @@ -36,11 +41,18 @@ public void teardown() throws Exception { public void testGetServiceWhenRemoteRoutingDisabled() { Settings settings = Settings.builder().build(); + BlobStoreRepository blobStoreRepository = mock(BlobStoreRepository.class); + Compressor compressor = new NoneCompressor(); + BlobStoreTransferService blobStoreTransferService = mock(BlobStoreTransferService.class); RemoteRoutingTableService service = RemoteRoutingTableServiceFactory.getService( repositoriesService, settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), - threadPool + threadPool, + compressor, + blobStoreTransferService, + blobStoreRepository, + "test-cluster" ); assertTrue(service instanceof NoopRemoteRoutingTableService); } @@ -50,13 +62,20 @@ public void testGetServiceWhenRemoteRoutingEnabled() { .put("node.attr." + REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY, "routing_repository") .put(FsRepository.REPOSITORIES_COMPRESS_SETTING.getKey(), false) .build(); + BlobStoreRepository blobStoreRepository = mock(BlobStoreRepository.class); + Compressor compressor = new NoneCompressor(); + BlobStoreTransferService blobStoreTransferService = mock(BlobStoreTransferService.class); Settings nodeSettings = Settings.builder().put(REMOTE_PUBLICATION_EXPERIMENTAL, "true").build(); FeatureFlags.initializeFeatureFlags(nodeSettings); RemoteRoutingTableService service = RemoteRoutingTableServiceFactory.getService( repositoriesService, settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), - threadPool + threadPool, + compressor, + blobStoreTransferService, + blobStoreRepository, + "test-cluster" ); assertTrue(service instanceof InternalRemoteRoutingTableService); } diff --git a/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java b/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java index 839ebe1ff8301..e287fb9799d1d 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java @@ -19,27 +19,24 @@ import org.opensearch.cluster.routing.IndexRoutingTable; import org.opensearch.cluster.routing.RoutingTable; import org.opensearch.cluster.service.ClusterService; -import org.opensearch.common.CheckedRunnable; -import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer; import org.opensearch.common.blobstore.BlobContainer; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.blobstore.BlobStore; -import org.opensearch.common.blobstore.stream.write.WriteContext; import org.opensearch.common.blobstore.stream.write.WritePriority; import org.opensearch.common.compress.DeflateCompressor; -import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.FeatureFlags; +import org.opensearch.common.util.TestCapturingListener; import org.opensearch.core.action.ActionListener; -import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.compress.Compressor; +import org.opensearch.core.compress.NoneCompressor; import org.opensearch.core.index.Index; import org.opensearch.gateway.remote.ClusterMetadataManifest; -import org.opensearch.gateway.remote.RemoteStateTransferException; -import org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable; import org.opensearch.index.remote.RemoteStoreEnums; import org.opensearch.index.remote.RemoteStorePathStrategy; import org.opensearch.index.remote.RemoteStoreUtils; +import org.opensearch.index.translog.transfer.BlobStoreTransferService; import org.opensearch.repositories.FilterRepository; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.RepositoryMissingException; @@ -51,33 +48,34 @@ import org.junit.Before; import java.io.IOException; +import java.io.InputStream; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Locale; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; import java.util.function.Supplier; -import org.mockito.ArgumentCaptor; import org.mockito.Mockito; -import static org.opensearch.cluster.routing.remote.InternalRemoteRoutingTableService.INDEX_ROUTING_FILE_PREFIX; -import static org.opensearch.cluster.routing.remote.InternalRemoteRoutingTableService.INDEX_ROUTING_PATH_TOKEN; import static org.opensearch.common.util.FeatureFlags.REMOTE_PUBLICATION_EXPERIMENTAL; import static org.opensearch.gateway.remote.ClusterMetadataManifestTests.randomUploadedIndexMetadataList; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER; +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.PATH_DELIMITER; +import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_TABLE; +import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_TABLE_FORMAT; +import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_TABLE_PREFIX; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY; -import static org.mockito.ArgumentMatchers.anyLong; +import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.mockito.ArgumentMatchers.anyIterable; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.ArgumentMatchers.startsWith; import static org.mockito.Mockito.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -92,6 +90,8 @@ public class RemoteRoutingTableServiceTests extends OpenSearchTestCase { private BlobPath basePath; private ClusterSettings clusterSettings; private ClusterService clusterService; + private Compressor compressor; + private BlobStoreTransferService blobStoreTransferService; private final ThreadPool threadPool = new TestThreadPool(getClass().getName()); @Before @@ -105,6 +105,7 @@ public void setup() { .build(); clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); clusterService = mock(ClusterService.class); + blobStoreTransferService = mock(BlobStoreTransferService.class); when(clusterService.getClusterSettings()).thenReturn(clusterSettings); blobStoreRepository = mock(BlobStoreRepository.class); when(blobStoreRepository.getCompressor()).thenReturn(new DeflateCompressor()); @@ -114,14 +115,18 @@ public void setup() { when(blobStoreRepository.blobStore()).thenReturn(blobStore); Settings nodeSettings = Settings.builder().put(REMOTE_PUBLICATION_EXPERIMENTAL, "true").build(); FeatureFlags.initializeFeatureFlags(nodeSettings); - + compressor = new NoneCompressor(); basePath = BlobPath.cleanPath().add("base-path"); - + when(blobStoreRepository.basePath()).thenReturn(basePath); remoteRoutingTableService = new InternalRemoteRoutingTableService( repositoriesServiceSupplier, settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), - threadPool + threadPool, + compressor, + blobStoreTransferService, + blobStoreRepository, + "test-cluster" ); } @@ -141,7 +146,11 @@ public void testFailInitializationWhenRemoteRoutingDisabled() { repositoriesServiceSupplier, settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), - threadPool + threadPool, + compressor, + blobStoreTransferService, + blobStoreRepository, + "test-cluster" ) ); } @@ -347,136 +356,13 @@ public void testGetIndicesRoutingMapDiffIndexDeleted() { assertEquals(indexName, diff.getDeletes().get(0)); } - public void testGetIndexRoutingAsyncAction() throws IOException { - String indexName = randomAlphaOfLength(randomIntBetween(1, 50)); - ClusterState clusterState = createClusterState(indexName); - BlobPath expectedPath = getPath(); - - LatchedActionListener listener = mock(LatchedActionListener.class); - when(blobStore.blobContainer(expectedPath)).thenReturn(blobContainer); - - remoteRoutingTableService.start(); - CheckedRunnable runnable = remoteRoutingTableService.getIndexRoutingAsyncAction( - clusterState, - clusterState.routingTable().getIndicesRouting().get(indexName), - listener, - basePath - ); - assertNotNull(runnable); - runnable.run(); - - String expectedFilePrefix = String.join( - DELIMITER, - INDEX_ROUTING_FILE_PREFIX, - RemoteStoreUtils.invertLong(clusterState.term()), - RemoteStoreUtils.invertLong(clusterState.version()) - ); - verify(blobContainer, times(1)).writeBlob(startsWith(expectedFilePrefix), any(StreamInput.class), anyLong(), eq(true)); - verify(listener, times(1)).onResponse(any(ClusterMetadataManifest.UploadedMetadata.class)); - } - - public void testGetIndexRoutingAsyncActionFailureInBlobRepo() throws IOException { - String indexName = randomAlphaOfLength(randomIntBetween(1, 50)); - ClusterState clusterState = createClusterState(indexName); - BlobPath expectedPath = getPath(); - - LatchedActionListener listener = mock(LatchedActionListener.class); - when(blobStore.blobContainer(expectedPath)).thenReturn(blobContainer); - doThrow(new IOException("testing failure")).when(blobContainer).writeBlob(anyString(), any(StreamInput.class), anyLong(), eq(true)); - - remoteRoutingTableService.start(); - CheckedRunnable runnable = remoteRoutingTableService.getIndexRoutingAsyncAction( - clusterState, - clusterState.routingTable().getIndicesRouting().get(indexName), - listener, - basePath - ); - assertNotNull(runnable); - runnable.run(); - String expectedFilePrefix = String.join( - DELIMITER, - INDEX_ROUTING_FILE_PREFIX, - RemoteStoreUtils.invertLong(clusterState.term()), - RemoteStoreUtils.invertLong(clusterState.version()) - ); - verify(blobContainer, times(1)).writeBlob(startsWith(expectedFilePrefix), any(StreamInput.class), anyLong(), eq(true)); - verify(listener, times(1)).onFailure(any(RemoteStateTransferException.class)); - } - - public void testGetIndexRoutingAsyncActionAsyncRepo() throws IOException { - String indexName = randomAlphaOfLength(randomIntBetween(1, 50)); - ClusterState clusterState = createClusterState(indexName); - BlobPath expectedPath = getPath(); - - LatchedActionListener listener = mock(LatchedActionListener.class); - blobContainer = mock(AsyncMultiStreamBlobContainer.class); - when(blobStore.blobContainer(expectedPath)).thenReturn(blobContainer); - ArgumentCaptor> actionListenerArgumentCaptor = ArgumentCaptor.forClass(ActionListener.class); - ArgumentCaptor writeContextArgumentCaptor = ArgumentCaptor.forClass(WriteContext.class); - ConcurrentHashMap capturedWriteContext = new ConcurrentHashMap<>(); - - doAnswer((i) -> { - actionListenerArgumentCaptor.getValue().onResponse(null); - WriteContext writeContext = writeContextArgumentCaptor.getValue(); - capturedWriteContext.put(writeContext.getFileName().split(DELIMITER)[0], writeContextArgumentCaptor.getValue()); - return null; - }).when((AsyncMultiStreamBlobContainer) blobContainer) - .asyncBlobUpload(writeContextArgumentCaptor.capture(), actionListenerArgumentCaptor.capture()); - - remoteRoutingTableService.start(); - CheckedRunnable runnable = remoteRoutingTableService.getIndexRoutingAsyncAction( - clusterState, - clusterState.routingTable().getIndicesRouting().get(indexName), - listener, - basePath - ); - assertNotNull(runnable); - runnable.run(); - - String expectedFilePrefix = String.join( - DELIMITER, - INDEX_ROUTING_FILE_PREFIX, - RemoteStoreUtils.invertLong(clusterState.term()), - RemoteStoreUtils.invertLong(clusterState.version()) - ); - assertEquals(1, actionListenerArgumentCaptor.getAllValues().size()); - assertEquals(1, writeContextArgumentCaptor.getAllValues().size()); - assertNotNull(capturedWriteContext.get("index_routing")); - assertEquals(capturedWriteContext.get("index_routing").getWritePriority(), WritePriority.URGENT); - assertTrue(capturedWriteContext.get("index_routing").getFileName().startsWith(expectedFilePrefix)); - } - - public void testGetIndexRoutingAsyncActionAsyncRepoFailureInRepo() throws IOException { - String indexName = randomAlphaOfLength(randomIntBetween(1, 50)); - ClusterState clusterState = createClusterState(indexName); - BlobPath expectedPath = getPath(); - - LatchedActionListener listener = mock(LatchedActionListener.class); - blobContainer = mock(AsyncMultiStreamBlobContainer.class); - when(blobStore.blobContainer(expectedPath)).thenReturn(blobContainer); - - doThrow(new IOException("Testing failure")).when((AsyncMultiStreamBlobContainer) blobContainer) - .asyncBlobUpload(any(WriteContext.class), any(ActionListener.class)); - - remoteRoutingTableService.start(); - CheckedRunnable runnable = remoteRoutingTableService.getIndexRoutingAsyncAction( - clusterState, - clusterState.routingTable().getIndicesRouting().get(indexName), - listener, - basePath - ); - assertNotNull(runnable); - runnable.run(); - verify(listener, times(1)).onFailure(any(RemoteStateTransferException.class)); - } - public void testGetAllUploadedIndicesRouting() { final ClusterMetadataManifest previousManifest = ClusterMetadataManifest.builder().build(); final ClusterMetadataManifest.UploadedIndexMetadata uploadedIndexMetadata = new ClusterMetadataManifest.UploadedIndexMetadata( "test-index", "index-uuid", "index-filename", - InternalRemoteRoutingTableService.INDEX_ROUTING_METADATA_PREFIX + INDEX_ROUTING_TABLE_PREFIX ); List allIndiceRoutingMetadata = remoteRoutingTableService @@ -491,7 +377,7 @@ public void testGetAllUploadedIndicesRoutingExistingIndexInManifest() { "test-index", "index-uuid", "index-filename", - InternalRemoteRoutingTableService.INDEX_ROUTING_METADATA_PREFIX + INDEX_ROUTING_TABLE_PREFIX ); final ClusterMetadataManifest previousManifest = ClusterMetadataManifest.builder() .indicesRouting(List.of(uploadedIndexMetadata)) @@ -509,7 +395,7 @@ public void testGetAllUploadedIndicesRoutingNewIndexFromManifest() { "test-index", "index-uuid", "index-filename", - InternalRemoteRoutingTableService.INDEX_ROUTING_METADATA_PREFIX + INDEX_ROUTING_TABLE_PREFIX ); final ClusterMetadataManifest previousManifest = ClusterMetadataManifest.builder() .indicesRouting(List.of(uploadedIndexMetadata)) @@ -518,7 +404,7 @@ public void testGetAllUploadedIndicesRoutingNewIndexFromManifest() { "test-index2", "index-uuid", "index-filename", - InternalRemoteRoutingTableService.INDEX_ROUTING_METADATA_PREFIX + INDEX_ROUTING_TABLE_PREFIX ); List allIndiceRoutingMetadata = remoteRoutingTableService @@ -534,13 +420,13 @@ public void testGetAllUploadedIndicesRoutingIndexDeleted() { "test-index", "index-uuid", "index-filename", - InternalRemoteRoutingTableService.INDEX_ROUTING_METADATA_PREFIX + INDEX_ROUTING_TABLE_PREFIX ); final ClusterMetadataManifest.UploadedIndexMetadata uploadedIndexMetadata2 = new ClusterMetadataManifest.UploadedIndexMetadata( "test-index2", "index-uuid", "index-filename", - InternalRemoteRoutingTableService.INDEX_ROUTING_METADATA_PREFIX + INDEX_ROUTING_TABLE_PREFIX ); final ClusterMetadataManifest previousManifest = ClusterMetadataManifest.builder() .indicesRouting(List.of(uploadedIndexMetadata, uploadedIndexMetadata2)) @@ -558,13 +444,13 @@ public void testGetAllUploadedIndicesRoutingNoChange() { "test-index", "index-uuid", "index-filename", - InternalRemoteRoutingTableService.INDEX_ROUTING_METADATA_PREFIX + INDEX_ROUTING_TABLE_PREFIX ); final ClusterMetadataManifest.UploadedIndexMetadata uploadedIndexMetadata2 = new ClusterMetadataManifest.UploadedIndexMetadata( "test-index2", "index-uuid", "index-filename", - InternalRemoteRoutingTableService.INDEX_ROUTING_METADATA_PREFIX + INDEX_ROUTING_TABLE_PREFIX ); final ClusterMetadataManifest previousManifest = ClusterMetadataManifest.builder() .indicesRouting(List.of(uploadedIndexMetadata, uploadedIndexMetadata2)) @@ -640,69 +526,69 @@ public void testIndicesRoutingDiffWhenIndexDeletedAndAdded() { ); } - public void testGetAsyncIndexMetadataReadAction() throws Exception { + public void testGetAsyncIndexRoutingReadAction() throws Exception { String indexName = randomAlphaOfLength(randomIntBetween(1, 50)); ClusterState clusterState = createClusterState(indexName); String uploadedFileName = String.format(Locale.ROOT, "index-routing/" + indexName); - Index index = new Index(indexName, "uuid-01"); - - LatchedActionListener listener = mock(LatchedActionListener.class); - when(blobStore.blobContainer(any())).thenReturn(blobContainer); - BytesStreamOutput streamOutput = new BytesStreamOutput(); - RemoteIndexRoutingTable remoteIndexRoutingTable = new RemoteIndexRoutingTable( - clusterState.routingTable().getIndicesRouting().get(indexName) + when(blobStoreTransferService.downloadBlob(anyIterable(), anyString())).thenReturn( + INDEX_ROUTING_TABLE_FORMAT.serialize( + clusterState.getRoutingTable().getIndicesRouting().get(indexName), + uploadedFileName, + compressor + ).streamInput() ); - remoteIndexRoutingTable.writeTo(streamOutput); - when(blobContainer.readBlob(indexName)).thenReturn(streamOutput.bytes().streamInput()); - remoteRoutingTableService.start(); + TestCapturingListener listener = new TestCapturingListener<>(); + CountDownLatch latch = new CountDownLatch(1); - CheckedRunnable runnable = remoteRoutingTableService.getAsyncIndexRoutingReadAction(uploadedFileName, index, listener); - assertNotNull(runnable); - runnable.run(); + remoteRoutingTableService.getAsyncIndexRoutingReadAction( + "cluster-uuid", + uploadedFileName, + new LatchedActionListener<>(listener, latch) + ).run(); + latch.await(); - assertBusy(() -> verify(blobContainer, times(1)).readBlob(any())); - assertBusy(() -> verify(listener, times(1)).onResponse(any(IndexRoutingTable.class))); + assertNull(listener.getFailure()); + assertNotNull(listener.getResult()); + IndexRoutingTable indexRoutingTable = listener.getResult(); + assertEquals(clusterState.getRoutingTable().getIndicesRouting().get(indexName), indexRoutingTable); } - public void testGetAsyncIndexMetadataReadActionFailureForIncorrectIndex() throws Exception { + public void testGetAsyncIndexRoutingWriteAction() throws Exception { String indexName = randomAlphaOfLength(randomIntBetween(1, 50)); ClusterState clusterState = createClusterState(indexName); - String uploadedFileName = String.format(Locale.ROOT, "index-routing/" + indexName); - Index index = new Index("incorrect-index", "uuid-01"); - - LatchedActionListener listener = mock(LatchedActionListener.class); - when(blobStore.blobContainer(any())).thenReturn(blobContainer); - BytesStreamOutput streamOutput = new BytesStreamOutput(); - RemoteIndexRoutingTable remoteIndexRoutingTable = new RemoteIndexRoutingTable( - clusterState.routingTable().getIndicesRouting().get(indexName) - ); - remoteIndexRoutingTable.writeTo(streamOutput); - when(blobContainer.readBlob(anyString())).thenReturn(streamOutput.bytes().streamInput()); - remoteRoutingTableService.doStart(); - - CheckedRunnable runnable = remoteRoutingTableService.getAsyncIndexRoutingReadAction(uploadedFileName, index, listener); - assertNotNull(runnable); - runnable.run(); - - assertBusy(() -> verify(blobContainer, times(1)).readBlob(any())); - assertBusy(() -> verify(listener, times(1)).onFailure(any(Exception.class))); - } - - public void testGetAsyncIndexMetadataReadActionFailureInBlobRepo() throws Exception { - String indexName = randomAlphaOfLength(randomIntBetween(1, 50)); - String uploadedFileName = String.format(Locale.ROOT, "index-routing/" + indexName); - Index index = new Index(indexName, "uuid-01"); - - LatchedActionListener listener = mock(LatchedActionListener.class); - when(blobStore.blobContainer(any())).thenReturn(blobContainer); - doThrow(new IOException("testing failure")).when(blobContainer).readBlob(indexName); - remoteRoutingTableService.doStart(); - - CheckedRunnable runnable = remoteRoutingTableService.getAsyncIndexRoutingReadAction(uploadedFileName, index, listener); - assertNotNull(runnable); - runnable.run(); - - assertBusy(() -> verify(listener, times(1)).onFailure(any(RemoteStateTransferException.class))); + doAnswer(invocationOnMock -> { + invocationOnMock.getArgument(4, ActionListener.class).onResponse(null); + return null; + }).when(blobStoreTransferService) + .uploadBlob(any(InputStream.class), anyIterable(), anyString(), eq(WritePriority.URGENT), any(ActionListener.class)); + + TestCapturingListener listener = new TestCapturingListener<>(); + CountDownLatch latch = new CountDownLatch(1); + + remoteRoutingTableService.getAsyncIndexRoutingWriteAction( + clusterState.metadata().clusterUUID(), + clusterState.term(), + clusterState.version(), + clusterState.getRoutingTable().indicesRouting().get(indexName), + new LatchedActionListener<>(listener, latch) + ).run(); + latch.await(); + assertNull(listener.getFailure()); + assertNotNull(listener.getResult()); + ClusterMetadataManifest.UploadedMetadata uploadedMetadata = listener.getResult(); + + assertEquals(INDEX_ROUTING_TABLE_PREFIX + indexName, uploadedMetadata.getComponent()); + String uploadedFileName = uploadedMetadata.getUploadedFilename(); + String[] pathTokens = uploadedFileName.split(PATH_DELIMITER); + assertEquals(8, pathTokens.length); + assertEquals(pathTokens[1], "base-path"); + String[] fileNameTokens = pathTokens[7].split(DELIMITER); + + assertEquals(4, fileNameTokens.length); + assertEquals(fileNameTokens[0], INDEX_ROUTING_TABLE); + assertEquals(fileNameTokens[1], RemoteStoreUtils.invertLong(1L)); + assertEquals(fileNameTokens[2], RemoteStoreUtils.invertLong(2L)); + assertThat(RemoteStoreUtils.invertLong(fileNameTokens[3]), lessThanOrEqualTo(System.currentTimeMillis())); } public void testGetUpdatedIndexRoutingTableMetadataWhenNoChange() { @@ -758,7 +644,7 @@ private ClusterState createClusterState(String indexName) { } private BlobPath getPath() { - BlobPath indexRoutingPath = basePath.add(INDEX_ROUTING_PATH_TOKEN); + BlobPath indexRoutingPath = basePath.add(INDEX_ROUTING_TABLE); return RemoteStoreEnums.PathType.HASHED_PREFIX.path( RemoteStorePathStrategy.PathInput.builder().basePath(indexRoutingPath).indexUUID("uuid").build(), RemoteStoreEnums.PathHashAlgorithm.FNV_1A_BASE64 diff --git a/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java b/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java index 02471c9cdbbbe..37e3fd1a175de 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java @@ -13,7 +13,6 @@ import org.opensearch.cluster.metadata.IndexGraveyard; import org.opensearch.cluster.metadata.RepositoriesMetadata; import org.opensearch.cluster.metadata.WeightedRoutingMetadata; -import org.opensearch.cluster.routing.remote.InternalRemoteRoutingTableService; import org.opensearch.common.xcontent.json.JsonXContent; import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.core.common.io.stream.NamedWriteableRegistry; @@ -44,6 +43,7 @@ import static org.opensearch.gateway.remote.model.RemotePersistentSettingsMetadata.SETTING_METADATA; import static org.opensearch.gateway.remote.model.RemoteTemplatesMetadata.TEMPLATES_METADATA; import static org.opensearch.gateway.remote.model.RemoteTransientSettingsMetadata.TRANSIENT_SETTING_METADATA; +import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_TABLE_PREFIX; public class ClusterMetadataManifestTests extends OpenSearchTestCase { @@ -545,7 +545,7 @@ public void testClusterMetadataManifestXContentV2WithoutEphemeral() throws IOExc "test-index", "test-uuid", "routing-path", - InternalRemoteRoutingTableService.INDEX_ROUTING_METADATA_PREFIX + INDEX_ROUTING_TABLE_PREFIX ); ClusterMetadataManifest originalManifest = ClusterMetadataManifest.builder() .clusterTerm(1L) diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java index 91ddd64cc2ccc..3683c93cc99e4 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java @@ -117,6 +117,7 @@ import static org.opensearch.gateway.remote.RemoteClusterStateTestUtils.TestClusterStateCustom1; import static org.opensearch.gateway.remote.RemoteClusterStateTestUtils.TestClusterStateCustom2; import static org.opensearch.gateway.remote.RemoteClusterStateTestUtils.TestClusterStateCustom3; +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.CUSTOM_DELIMITER; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.FORMAT_PARAMS; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.getFormattedIndexFileName; @@ -126,7 +127,6 @@ import static org.opensearch.gateway.remote.model.RemoteClusterStateCustoms.CLUSTER_STATE_CUSTOM; import static org.opensearch.gateway.remote.model.RemoteCoordinationMetadata.COORDINATION_METADATA; import static org.opensearch.gateway.remote.model.RemoteCoordinationMetadata.COORDINATION_METADATA_FORMAT; -import static org.opensearch.gateway.remote.model.RemoteCustomMetadata.CUSTOM_DELIMITER; import static org.opensearch.gateway.remote.model.RemoteCustomMetadata.CUSTOM_METADATA; import static org.opensearch.gateway.remote.model.RemoteCustomMetadata.readFrom; import static org.opensearch.gateway.remote.model.RemoteDiscoveryNodes.DISCOVERY_NODES; @@ -142,6 +142,7 @@ import static org.opensearch.gateway.remote.model.RemotePersistentSettingsMetadata.SETTING_METADATA; import static org.opensearch.gateway.remote.model.RemoteTemplatesMetadata.TEMPLATES_METADATA; import static org.opensearch.gateway.remote.model.RemoteTemplatesMetadata.TEMPLATES_METADATA_FORMAT; +import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_TABLE; import static org.opensearch.gateway.remote.model.RemoteTemplatesMetadataTests.getTemplatesMetadata; import static org.opensearch.gateway.remote.model.RemoteTransientSettingsMetadata.TRANSIENT_SETTING_METADATA; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY; @@ -253,6 +254,7 @@ public void setup() { List.of(new RemoteIndexPathUploader(threadPool, settings, repositoriesServiceSupplier, clusterSettings)), namedWriteableRegistry ); + remoteClusterStateService.start(); } @After @@ -2585,6 +2587,7 @@ public void testRemoteRoutingTableInitializedWhenEnabled() { List.of(new RemoteIndexPathUploader(threadPool, newSettings, repositoriesServiceSupplier, clusterSettings)), writableRegistry() ); + remoteClusterStateService.start(); assertTrue(remoteClusterStateService.getRemoteRoutingTableService() instanceof InternalRemoteRoutingTableService); } @@ -2602,7 +2605,7 @@ public void testWriteFullMetadataSuccessWithRoutingTable() throws IOException { "test-index", "index-uuid", "routing-filename", - InternalRemoteRoutingTableService.INDEX_ROUTING_METADATA_PREFIX + INDEX_ROUTING_TABLE + CUSTOM_DELIMITER ); final ClusterMetadataManifest expectedManifest = ClusterMetadataManifest.builder() .indices(List.of(uploadedIndexMetadata)) @@ -2653,7 +2656,7 @@ public void testWriteFullMetadataInParallelSuccessWithRoutingTable() throws IOEx "test-index", "index-uuid", "routing-filename", - InternalRemoteRoutingTableService.INDEX_ROUTING_METADATA_PREFIX + INDEX_ROUTING_TABLE + CUSTOM_DELIMITER ); final ClusterMetadataManifest expectedManifest = ClusterMetadataManifest.builder() @@ -2708,7 +2711,7 @@ public void testWriteIncrementalMetadataSuccessWithRoutingTable() throws IOExcep "test-index", "index-uuid", "routing-filename", - InternalRemoteRoutingTableService.INDEX_ROUTING_METADATA_PREFIX + INDEX_ROUTING_TABLE + CUSTOM_DELIMITER ); final ClusterMetadataManifest expectedManifest = ClusterMetadataManifest.builder() .indices(List.of(uploadedIndexMetadata)) diff --git a/server/src/test/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableHeaderTests.java b/server/src/test/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableHeaderTests.java deleted file mode 100644 index a3f0ac36a40f1..0000000000000 --- a/server/src/test/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableHeaderTests.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.gateway.remote.routingtable; - -import org.opensearch.common.io.stream.BytesStreamOutput; -import org.opensearch.core.common.io.stream.BytesStreamInput; -import org.opensearch.test.OpenSearchTestCase; - -import java.io.IOException; - -public class IndexRoutingTableHeaderTests extends OpenSearchTestCase { - - public void testIndexRoutingTableHeader() throws IOException { - String indexName = randomAlphaOfLength(randomIntBetween(1, 50)); - IndexRoutingTableHeader header = new IndexRoutingTableHeader(indexName); - try (BytesStreamOutput out = new BytesStreamOutput()) { - header.writeTo(out); - - BytesStreamInput in = new BytesStreamInput(out.bytes().toBytesRef().bytes); - IndexRoutingTableHeader headerRead = new IndexRoutingTableHeader(in); - assertEquals(indexName, headerRead.getIndexName()); - - } - } - -} diff --git a/server/src/test/java/org/opensearch/gateway/remote/routingtable/RemoteIndexRoutingTableTests.java b/server/src/test/java/org/opensearch/gateway/remote/routingtable/RemoteIndexRoutingTableTests.java index 72066d8afb45b..dee32c64e1327 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/routingtable/RemoteIndexRoutingTableTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/routingtable/RemoteIndexRoutingTableTests.java @@ -13,16 +13,137 @@ import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.routing.IndexRoutingTable; import org.opensearch.cluster.routing.RoutingTable; -import org.opensearch.cluster.routing.ShardRoutingState; -import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.compress.DeflateCompressor; +import org.opensearch.common.remote.BlobPathParameters; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.common.io.stream.NamedWriteableRegistry; +import org.opensearch.core.compress.Compressor; +import org.opensearch.core.compress.NoneCompressor; +import org.opensearch.gateway.remote.ClusterMetadataManifest; +import org.opensearch.gateway.remote.RemoteClusterStateUtils; +import org.opensearch.index.remote.RemoteStoreUtils; +import org.opensearch.index.translog.transfer.BlobStoreTransferService; +import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; +import org.junit.After; +import org.junit.Before; import java.io.IOException; -import java.util.concurrent.atomic.AtomicInteger; +import java.io.InputStream; +import java.util.List; + +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.CUSTOM_DELIMITER; +import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_TABLE; +import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_TABLE_PREFIX; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.hamcrest.Matchers.nullValue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class RemoteIndexRoutingTableTests extends OpenSearchTestCase { - public void testRoutingTableInput() { + private static final String TEST_BLOB_NAME = "/test-path/test-blob-name"; + private static final String TEST_BLOB_PATH = "test-path"; + private static final String TEST_BLOB_FILE_NAME = "test-blob-name"; + private static final String INDEX_ROUTING_TABLE_TYPE = "test-index-routing-table"; + private static final long STATE_VERSION = 3L; + private static final long STATE_TERM = 2L; + private String clusterUUID; + private BlobStoreTransferService blobStoreTransferService; + private BlobStoreRepository blobStoreRepository; + private String clusterName; + private ClusterSettings clusterSettings; + private Compressor compressor; + private NamedWriteableRegistry namedWriteableRegistry; + private final ThreadPool threadPool = new TestThreadPool(getClass().getName()); + + @Before + public void setup() { + clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + this.clusterUUID = "test-cluster-uuid"; + this.blobStoreTransferService = mock(BlobStoreTransferService.class); + this.blobStoreRepository = mock(BlobStoreRepository.class); + BlobPath blobPath = new BlobPath().add("/path"); + when(blobStoreRepository.basePath()).thenReturn(blobPath); + when(blobStoreRepository.getCompressor()).thenReturn(new DeflateCompressor()); + compressor = new NoneCompressor(); + namedWriteableRegistry = writableRegistry(); + this.clusterName = "test-cluster-name"; + } + + @After + public void tearDown() throws Exception { + super.tearDown(); + threadPool.shutdown(); + } + + public void testClusterUUID() { + String indexName = randomAlphaOfLength(randomIntBetween(1, 50)); + int numberOfShards = randomIntBetween(1, 10); + int numberOfReplicas = randomIntBetween(1, 10); + Metadata metadata = Metadata.builder() + .put( + IndexMetadata.builder(indexName) + .settings(settings(Version.CURRENT)) + .numberOfShards(numberOfShards) + .numberOfReplicas(numberOfReplicas) + ) + .build(); + + RoutingTable initialRoutingTable = RoutingTable.builder().addAsNew(metadata.index(indexName)).build(); + + initialRoutingTable.getIndicesRouting().values().forEach(indexRoutingTable -> { + RemoteIndexRoutingTable remoteObjectForUpload = new RemoteIndexRoutingTable( + indexRoutingTable, + clusterUUID, + compressor, + STATE_TERM, + STATE_VERSION + ); + assertEquals(remoteObjectForUpload.clusterUUID(), clusterUUID); + + RemoteIndexRoutingTable remoteObjectForDownload = new RemoteIndexRoutingTable(TEST_BLOB_NAME, clusterUUID, compressor); + assertEquals(remoteObjectForDownload.clusterUUID(), clusterUUID); + }); + } + + public void testFullBlobName() { + String indexName = randomAlphaOfLength(randomIntBetween(1, 50)); + int numberOfShards = randomIntBetween(1, 10); + int numberOfReplicas = randomIntBetween(1, 10); + Metadata metadata = Metadata.builder() + .put( + IndexMetadata.builder(indexName) + .settings(settings(Version.CURRENT)) + .numberOfShards(numberOfShards) + .numberOfReplicas(numberOfReplicas) + ) + .build(); + + RoutingTable initialRoutingTable = RoutingTable.builder().addAsNew(metadata.index(indexName)).build(); + + initialRoutingTable.getIndicesRouting().values().forEach(indexRoutingTable -> { + RemoteIndexRoutingTable remoteObjectForUpload = new RemoteIndexRoutingTable( + indexRoutingTable, + clusterUUID, + compressor, + STATE_TERM, + STATE_VERSION + ); + assertThat(remoteObjectForUpload.getFullBlobName(), nullValue()); + + RemoteIndexRoutingTable remoteObjectForDownload = new RemoteIndexRoutingTable(TEST_BLOB_NAME, clusterUUID, compressor); + assertThat(remoteObjectForDownload.getFullBlobName(), is(TEST_BLOB_NAME)); + }); + } + + public void testBlobFileName() { String indexName = randomAlphaOfLength(randomIntBetween(1, 50)); int numberOfShards = randomIntBetween(1, 10); int numberOfReplicas = randomIntBetween(1, 10); @@ -37,51 +158,164 @@ public void testRoutingTableInput() { RoutingTable initialRoutingTable = RoutingTable.builder().addAsNew(metadata.index(indexName)).build(); - initialRoutingTable.getIndicesRouting().values().forEach(indexShardRoutingTables -> { - RemoteIndexRoutingTable indexRouting = new RemoteIndexRoutingTable(indexShardRoutingTables); - try (BytesStreamOutput streamOutput = new BytesStreamOutput();) { - indexRouting.writeTo(streamOutput); - RemoteIndexRoutingTable remoteIndexRoutingTable = new RemoteIndexRoutingTable( - streamOutput.bytes().streamInput(), - metadata.index(indexName).getIndex() - ); - IndexRoutingTable indexRoutingTable = remoteIndexRoutingTable.getIndexRoutingTable(); - assertEquals(numberOfShards, indexRoutingTable.getShards().size()); - assertEquals(metadata.index(indexName).getIndex(), indexRoutingTable.getIndex()); - assertEquals( - numberOfShards * (1 + numberOfReplicas), - indexRoutingTable.shardsWithState(ShardRoutingState.UNASSIGNED).size() - ); + initialRoutingTable.getIndicesRouting().values().forEach(indexRoutingTable -> { + RemoteIndexRoutingTable remoteObjectForUpload = new RemoteIndexRoutingTable( + indexRoutingTable, + clusterUUID, + compressor, + STATE_TERM, + STATE_VERSION + ); + assertThat(remoteObjectForUpload.getBlobFileName(), nullValue()); + + RemoteIndexRoutingTable remoteObjectForDownload = new RemoteIndexRoutingTable(TEST_BLOB_NAME, clusterUUID, compressor); + assertThat(remoteObjectForDownload.getBlobFileName(), is(TEST_BLOB_FILE_NAME)); + }); + } + + public void testBlobPathTokens() { + String uploadedFile = "user/local/opensearch/routingTable"; + RemoteIndexRoutingTable remoteObjectForDownload = new RemoteIndexRoutingTable(uploadedFile, clusterUUID, compressor); + assertThat(remoteObjectForDownload.getBlobPathTokens(), is(new String[] { "user", "local", "opensearch", "routingTable" })); + } + + public void testBlobPathParameters() { + String indexName = randomAlphaOfLength(randomIntBetween(1, 50)); + int numberOfShards = randomIntBetween(1, 10); + int numberOfReplicas = randomIntBetween(1, 10); + Metadata metadata = Metadata.builder() + .put( + IndexMetadata.builder(indexName) + .settings(settings(Version.CURRENT)) + .numberOfShards(numberOfShards) + .numberOfReplicas(numberOfReplicas) + ) + .build(); + + RoutingTable initialRoutingTable = RoutingTable.builder().addAsNew(metadata.index(indexName)).build(); + + initialRoutingTable.getIndicesRouting().values().forEach(indexRoutingTable -> { + RemoteIndexRoutingTable remoteObjectForUpload = new RemoteIndexRoutingTable( + indexRoutingTable, + clusterUUID, + compressor, + STATE_TERM, + STATE_VERSION + ); + assertThat(remoteObjectForUpload.getBlobFileName(), nullValue()); + + BlobPathParameters params = remoteObjectForUpload.getBlobPathParameters(); + assertThat(params.getPathTokens(), is(List.of(indexRoutingTable.getIndex().getUUID()))); + String expectedPrefix = ""; + assertThat(params.getFilePrefix(), is(expectedPrefix)); + }); + } + + public void testGenerateBlobFileName() { + String indexName = randomAlphaOfLength(randomIntBetween(1, 50)); + int numberOfShards = randomIntBetween(1, 10); + int numberOfReplicas = randomIntBetween(1, 10); + Metadata metadata = Metadata.builder() + .put( + IndexMetadata.builder(indexName) + .settings(settings(Version.CURRENT)) + .numberOfShards(numberOfShards) + .numberOfReplicas(numberOfReplicas) + ) + .build(); + + RoutingTable initialRoutingTable = RoutingTable.builder().addAsNew(metadata.index(indexName)).build(); + + initialRoutingTable.getIndicesRouting().values().forEach(indexRoutingTable -> { + RemoteIndexRoutingTable remoteObjectForUpload = new RemoteIndexRoutingTable( + indexRoutingTable, + clusterUUID, + compressor, + STATE_TERM, + STATE_VERSION + ); + + String blobFileName = remoteObjectForUpload.generateBlobFileName(); + String[] nameTokens = blobFileName.split(RemoteClusterStateUtils.DELIMITER); + assertEquals(nameTokens[0], INDEX_ROUTING_TABLE_PREFIX); + assertEquals(nameTokens[1], RemoteStoreUtils.invertLong(STATE_TERM)); + assertEquals(nameTokens[2], RemoteStoreUtils.invertLong(STATE_VERSION)); + assertThat(RemoteStoreUtils.invertLong(nameTokens[3]), lessThanOrEqualTo(System.currentTimeMillis())); + }); + } + + public void testGetUploadedMetadata() throws IOException { + String indexName = randomAlphaOfLength(randomIntBetween(1, 50)); + int numberOfShards = randomIntBetween(1, 10); + int numberOfReplicas = randomIntBetween(1, 10); + Metadata metadata = Metadata.builder() + .put( + IndexMetadata.builder(indexName) + .settings(settings(Version.CURRENT)) + .numberOfShards(numberOfShards) + .numberOfReplicas(numberOfReplicas) + ) + .build(); + + RoutingTable initialRoutingTable = RoutingTable.builder().addAsNew(metadata.index(indexName)).build(); + + initialRoutingTable.getIndicesRouting().values().forEach(indexRoutingTable -> { + RemoteIndexRoutingTable remoteObjectForUpload = new RemoteIndexRoutingTable( + indexRoutingTable, + clusterUUID, + compressor, + STATE_TERM, + STATE_VERSION + ); + + assertThrows(AssertionError.class, remoteObjectForUpload::getUploadedMetadata); + + try (InputStream inputStream = remoteObjectForUpload.serialize()) { + remoteObjectForUpload.setFullBlobName(new BlobPath().add(TEST_BLOB_PATH)); + ClusterMetadataManifest.UploadedMetadata uploadedMetadata = remoteObjectForUpload.getUploadedMetadata(); + String expectedPrefix = String.join(CUSTOM_DELIMITER, INDEX_ROUTING_TABLE, indexRoutingTable.getIndex().getName()); + assertThat(uploadedMetadata.getComponent(), is(expectedPrefix)); + assertThat(uploadedMetadata.getUploadedFilename(), is(remoteObjectForUpload.getFullBlobName())); } catch (IOException e) { throw new RuntimeException(e); } }); } - public void testRoutingTableInputStreamWithInvalidIndex() { + public void testSerDe() throws IOException { + String indexName = randomAlphaOfLength(randomIntBetween(1, 50)); + int numberOfShards = randomIntBetween(1, 10); + int numberOfReplicas = randomIntBetween(1, 10); Metadata metadata = Metadata.builder() - .put(IndexMetadata.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1)) - .put(IndexMetadata.builder("invalid-index").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1)) + .put( + IndexMetadata.builder(indexName) + .settings(settings(Version.CURRENT)) + .numberOfShards(numberOfShards) + .numberOfReplicas(numberOfReplicas) + ) .build(); - RoutingTable initialRoutingTable = RoutingTable.builder().addAsNew(metadata.index("test")).build(); - AtomicInteger assertionError = new AtomicInteger(); - initialRoutingTable.getIndicesRouting().values().forEach(indexShardRoutingTables -> { - RemoteIndexRoutingTable indexRouting = new RemoteIndexRoutingTable(indexShardRoutingTables); - try (BytesStreamOutput streamOutput = new BytesStreamOutput()) { - indexRouting.writeTo(streamOutput); - RemoteIndexRoutingTable remoteIndexRoutingTable = new RemoteIndexRoutingTable( - streamOutput.bytes().streamInput(), - metadata.index("invalid-index").getIndex() - ); - } catch (AssertionError e) { - assertionError.getAndIncrement(); + RoutingTable initialRoutingTable = RoutingTable.builder().addAsNew(metadata.index(indexName)).build(); + + initialRoutingTable.getIndicesRouting().values().forEach(indexRoutingTable -> { + RemoteIndexRoutingTable remoteObjectForUpload = new RemoteIndexRoutingTable( + indexRoutingTable, + clusterUUID, + compressor, + STATE_TERM, + STATE_VERSION + ); + + assertThrows(AssertionError.class, remoteObjectForUpload::getUploadedMetadata); + + try (InputStream inputStream = remoteObjectForUpload.serialize()) { + remoteObjectForUpload.setFullBlobName(BlobPath.cleanPath()); + assertThat(inputStream.available(), greaterThan(0)); + IndexRoutingTable readIndexRoutingTable = remoteObjectForUpload.deserialize(inputStream); + assertEquals(readIndexRoutingTable, indexRoutingTable); } catch (IOException e) { throw new RuntimeException(e); } }); - - assertEquals(1, assertionError.get()); } - }