diff --git a/CHANGELOG.md b/CHANGELOG.md index b863b9d13e789..0417cc14ee86f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Print reason why parent task was cancelled ([#14604](https://github.com/opensearch-project/OpenSearch/issues/14604)) - Add matchesPluginSystemIndexPattern to SystemIndexRegistry ([#14750](https://github.com/opensearch-project/OpenSearch/pull/14750)) - Add Plugin interface for loading application based configuration templates (([#14659](https://github.com/opensearch-project/OpenSearch/issues/14659))) +- Refactor remote-routing-table service inline with remote state interfaces([#14668](https://github.com/opensearch-project/OpenSearch/pull/14668)) ### Dependencies - Bump `org.gradle.test-retry` from 1.5.8 to 1.5.9 ([#13442](https://github.com/opensearch-project/OpenSearch/pull/13442)) diff --git a/server/src/main/java/org/opensearch/cluster/routing/remote/InternalRemoteRoutingTableService.java b/server/src/main/java/org/opensearch/cluster/routing/remote/InternalRemoteRoutingTableService.java index cc1b0713393f3..f3f245ee9f8f0 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/remote/InternalRemoteRoutingTableService.java +++ b/server/src/main/java/org/opensearch/cluster/routing/remote/InternalRemoteRoutingTableService.java @@ -11,35 +11,24 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; -import org.apache.lucene.store.IndexInput; import org.opensearch.action.LatchedActionListener; -import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.DiffableUtils; import org.opensearch.cluster.routing.IndexRoutingTable; import org.opensearch.cluster.routing.RoutingTable; import org.opensearch.common.CheckedRunnable; -import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer; -import org.opensearch.common.blobstore.BlobContainer; import org.opensearch.common.blobstore.BlobPath; -import org.opensearch.common.blobstore.stream.write.WritePriority; -import org.opensearch.common.blobstore.transfer.RemoteTransferContainer; -import org.opensearch.common.blobstore.transfer.stream.OffsetRangeIndexInputStream; -import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.lifecycle.AbstractLifecycleComponent; -import org.opensearch.common.lucene.store.ByteArrayIndexInput; +import org.opensearch.common.remote.RemoteWritableEntityStore; import org.opensearch.common.settings.ClusterSettings; -import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.io.IOUtils; import org.opensearch.core.action.ActionListener; -import org.opensearch.core.common.bytes.BytesReference; -import org.opensearch.core.index.Index; +import org.opensearch.core.compress.Compressor; import org.opensearch.gateway.remote.ClusterMetadataManifest; import org.opensearch.gateway.remote.RemoteStateTransferException; +import org.opensearch.gateway.remote.model.RemoteRoutingTableBlobStore; import org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable; -import org.opensearch.index.remote.RemoteStoreEnums; -import org.opensearch.index.remote.RemoteStorePathStrategy; -import org.opensearch.index.remote.RemoteStoreUtils; +import org.opensearch.index.translog.transfer.BlobStoreTransferService; import org.opensearch.node.Node; import org.opensearch.node.remotestore.RemoteStoreNodeAttribute; import org.opensearch.repositories.RepositoriesService; @@ -52,12 +41,10 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.concurrent.ExecutorService; import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; -import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled; /** @@ -67,64 +54,29 @@ */ public class InternalRemoteRoutingTableService extends AbstractLifecycleComponent implements RemoteRoutingTableService { - /** - * This setting is used to set the remote routing table store blob store path type strategy. - */ - public static final Setting REMOTE_ROUTING_TABLE_PATH_TYPE_SETTING = new Setting<>( - "cluster.remote_store.routing_table.path_type", - RemoteStoreEnums.PathType.HASHED_PREFIX.toString(), - RemoteStoreEnums.PathType::parseString, - Setting.Property.NodeScope, - Setting.Property.Dynamic - ); - - /** - * This setting is used to set the remote routing table store blob store path hash algorithm strategy. - * This setting will come to effect if the {@link #REMOTE_ROUTING_TABLE_PATH_TYPE_SETTING} - * is either {@code HASHED_PREFIX} or {@code HASHED_INFIX}. - */ - public static final Setting REMOTE_ROUTING_TABLE_PATH_HASH_ALGO_SETTING = new Setting<>( - "cluster.remote_store.routing_table.path_hash_algo", - RemoteStoreEnums.PathHashAlgorithm.FNV_1A_BASE64.toString(), - RemoteStoreEnums.PathHashAlgorithm::parseString, - Setting.Property.NodeScope, - Setting.Property.Dynamic - ); - - public static final String INDEX_ROUTING_PATH_TOKEN = "index-routing"; - public static final String INDEX_ROUTING_FILE_PREFIX = "index_routing"; - public static final String INDEX_ROUTING_METADATA_PREFIX = "indexRouting--"; - private static final Logger logger = LogManager.getLogger(InternalRemoteRoutingTableService.class); private final Settings settings; private final Supplier repositoriesService; + private Compressor compressor; + private RemoteWritableEntityStore remoteIndexRoutingTableStore; + private final ClusterSettings clusterSettings; private BlobStoreRepository blobStoreRepository; - private RemoteStoreEnums.PathType pathType; - private RemoteStoreEnums.PathHashAlgorithm pathHashAlgo; - private ThreadPool threadPool; + private final ThreadPool threadPool; + private final String clusterName; public InternalRemoteRoutingTableService( Supplier repositoriesService, Settings settings, ClusterSettings clusterSettings, - ThreadPool threadpool + ThreadPool threadpool, + String clusterName ) { assert isRemoteRoutingTableEnabled(settings) : "Remote routing table is not enabled"; this.repositoriesService = repositoriesService; this.settings = settings; - this.pathType = clusterSettings.get(REMOTE_ROUTING_TABLE_PATH_TYPE_SETTING); - this.pathHashAlgo = clusterSettings.get(REMOTE_ROUTING_TABLE_PATH_HASH_ALGO_SETTING); - clusterSettings.addSettingsUpdateConsumer(REMOTE_ROUTING_TABLE_PATH_TYPE_SETTING, this::setPathTypeSetting); - clusterSettings.addSettingsUpdateConsumer(REMOTE_ROUTING_TABLE_PATH_HASH_ALGO_SETTING, this::setPathHashAlgoSetting); this.threadPool = threadpool; - } - - private void setPathTypeSetting(RemoteStoreEnums.PathType pathType) { - this.pathType = pathType; - } - - private void setPathHashAlgoSetting(RemoteStoreEnums.PathHashAlgorithm pathHashAlgo) { - this.pathHashAlgo = pathHashAlgo; + this.clusterName = clusterName; + this.clusterSettings = clusterSettings; } public List getIndicesRouting(RoutingTable routingTable) { @@ -151,43 +103,32 @@ public DiffableUtils.MapDiff getIndexRoutingAsyncAction( - ClusterState clusterState, + @Override + public CheckedRunnable getAsyncIndexRoutingWriteAction( + String clusterUUID, + long term, + long version, IndexRoutingTable indexRouting, - LatchedActionListener latchedActionListener, - BlobPath clusterBasePath + LatchedActionListener latchedActionListener ) { - BlobPath indexRoutingPath = clusterBasePath.add(INDEX_ROUTING_PATH_TOKEN); - BlobPath path = pathType.path( - RemoteStorePathStrategy.PathInput.builder().basePath(indexRoutingPath).indexUUID(indexRouting.getIndex().getUUID()).build(), - pathHashAlgo - ); - final BlobContainer blobContainer = blobStoreRepository.blobStore().blobContainer(path); - - final String fileName = getIndexRoutingFileName(clusterState.term(), clusterState.version()); + RemoteIndexRoutingTable remoteIndexRoutingTable = new RemoteIndexRoutingTable(indexRouting, clusterUUID, compressor, term, version); ActionListener completionListener = ActionListener.wrap( - resp -> latchedActionListener.onResponse( - new ClusterMetadataManifest.UploadedIndexMetadata( - indexRouting.getIndex().getName(), - indexRouting.getIndex().getUUID(), - path.buildAsString() + fileName, - INDEX_ROUTING_METADATA_PREFIX - ) - ), + resp -> latchedActionListener.onResponse(remoteIndexRoutingTable.getUploadedMetadata()), ex -> latchedActionListener.onFailure( new RemoteStateTransferException("Exception in writing index to remote store: " + indexRouting.getIndex().toString(), ex) ) ); - return () -> uploadIndex(indexRouting, fileName, blobContainer, completionListener); + return () -> remoteIndexRoutingTableStore.writeAsync(remoteIndexRoutingTable, completionListener); } /** @@ -214,111 +155,21 @@ public List getAllUploadedIndices return new ArrayList<>(allUploadedIndicesRouting.values()); } - private void uploadIndex( - IndexRoutingTable indexRouting, - String fileName, - BlobContainer blobContainer, - ActionListener completionListener - ) { - RemoteIndexRoutingTable indexRoutingInput = new RemoteIndexRoutingTable(indexRouting); - BytesReference bytesInput = null; - try (BytesStreamOutput streamOutput = new BytesStreamOutput()) { - indexRoutingInput.writeTo(streamOutput); - bytesInput = streamOutput.bytes(); - } catch (IOException e) { - logger.error("Failed to serialize IndexRoutingTable for [{}]: [{}]", indexRouting, e); - completionListener.onFailure(e); - return; - } - - if (blobContainer instanceof AsyncMultiStreamBlobContainer == false) { - try { - blobContainer.writeBlob(fileName, bytesInput.streamInput(), bytesInput.length(), true); - completionListener.onResponse(null); - } catch (IOException e) { - logger.error("Failed to write IndexRoutingTable to remote store for indexRouting [{}]: [{}]", indexRouting, e); - completionListener.onFailure(e); - } - return; - } - - try (IndexInput input = new ByteArrayIndexInput("indexrouting", BytesReference.toBytes(bytesInput))) { - try ( - RemoteTransferContainer remoteTransferContainer = new RemoteTransferContainer( - fileName, - fileName, - input.length(), - true, - WritePriority.URGENT, - (size, position) -> new OffsetRangeIndexInputStream(input, size, position), - null, - false - ) - ) { - ((AsyncMultiStreamBlobContainer) blobContainer).asyncBlobUpload( - remoteTransferContainer.createWriteContext(), - completionListener - ); - } catch (IOException e) { - logger.error("Failed to write IndexRoutingTable to remote store for indexRouting [{}]: [{}]", indexRouting, e); - completionListener.onFailure(e); - } - } catch (IOException e) { - logger.error( - "Failed to create transfer object for IndexRoutingTable for remote store upload for indexRouting [{}]: [{}]", - indexRouting, - e - ); - completionListener.onFailure(e); - } - } - @Override public CheckedRunnable getAsyncIndexRoutingReadAction( + String clusterUUID, String uploadedFilename, - Index index, LatchedActionListener latchedActionListener ) { - int idx = uploadedFilename.lastIndexOf("/"); - String blobFileName = uploadedFilename.substring(idx + 1); - BlobContainer blobContainer = blobStoreRepository.blobStore() - .blobContainer(BlobPath.cleanPath().add(uploadedFilename.substring(0, idx))); - return () -> readAsync( - blobContainer, - blobFileName, - index, - threadPool.executor(ThreadPool.Names.REMOTE_STATE_READ), - ActionListener.wrap( - response -> latchedActionListener.onResponse(response.getIndexRoutingTable()), - latchedActionListener::onFailure - ) + ActionListener actionListener = ActionListener.wrap( + latchedActionListener::onResponse, + latchedActionListener::onFailure ); - } - private void readAsync( - BlobContainer blobContainer, - String name, - Index index, - ExecutorService executorService, - ActionListener listener - ) { - executorService.execute(() -> { - try { - listener.onResponse(read(blobContainer, name, index)); - } catch (Exception e) { - listener.onFailure(e); - } - }); - } + RemoteIndexRoutingTable remoteIndexRoutingTable = new RemoteIndexRoutingTable(uploadedFilename, clusterUUID, compressor); - private RemoteIndexRoutingTable read(BlobContainer blobContainer, String path, Index index) { - try { - return new RemoteIndexRoutingTable(blobContainer.readBlob(path), index); - } catch (IOException | AssertionError e) { - logger.error(() -> new ParameterizedMessage("RoutingTable read failed for path {}", path), e); - throw new RemoteStateTransferException("Failed to read RemoteRoutingTable from Manifest with error ", e); - } + return () -> remoteIndexRoutingTableStore.readAsync(remoteIndexRoutingTable, actionListener); } @Override @@ -335,16 +186,6 @@ public List getUpdatedIndexRoutin }).collect(Collectors.toList()); } - private String getIndexRoutingFileName(long term, long version) { - return String.join( - DELIMITER, - INDEX_ROUTING_FILE_PREFIX, - RemoteStoreUtils.invertLong(term), - RemoteStoreUtils.invertLong(version), - RemoteStoreUtils.invertLong(System.currentTimeMillis()) - ); - } - @Override protected void doClose() throws IOException { if (blobStoreRepository != null) { @@ -362,6 +203,16 @@ protected void doStart() { final Repository repository = repositoriesService.get().repository(remoteStoreRepo); assert repository instanceof BlobStoreRepository : "Repository should be instance of BlobStoreRepository"; blobStoreRepository = (BlobStoreRepository) repository; + compressor = blobStoreRepository.getCompressor(); + + this.remoteIndexRoutingTableStore = new RemoteRoutingTableBlobStore<>( + new BlobStoreTransferService(blobStoreRepository.blobStore(), threadPool), + blobStoreRepository, + clusterName, + threadPool, + ThreadPool.Names.REMOTE_STATE_READ, + clusterSettings + ); } @Override @@ -377,5 +228,4 @@ public void deleteStaleIndexRoutingPaths(List stalePaths) throws IOExcep throw e; } } - } diff --git a/server/src/main/java/org/opensearch/cluster/routing/remote/NoopRemoteRoutingTableService.java b/server/src/main/java/org/opensearch/cluster/routing/remote/NoopRemoteRoutingTableService.java index 6236d107d0220..4636e492df28f 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/remote/NoopRemoteRoutingTableService.java +++ b/server/src/main/java/org/opensearch/cluster/routing/remote/NoopRemoteRoutingTableService.java @@ -9,14 +9,11 @@ package org.opensearch.cluster.routing.remote; import org.opensearch.action.LatchedActionListener; -import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.DiffableUtils; import org.opensearch.cluster.routing.IndexRoutingTable; import org.opensearch.cluster.routing.RoutingTable; import org.opensearch.common.CheckedRunnable; -import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.lifecycle.AbstractLifecycleComponent; -import org.opensearch.core.index.Index; import org.opensearch.gateway.remote.ClusterMetadataManifest; import java.io.IOException; @@ -42,11 +39,12 @@ public DiffableUtils.MapDiff getIndexRoutingAsyncAction( - ClusterState clusterState, + public CheckedRunnable getAsyncIndexRoutingWriteAction( + String clusterUUID, + long term, + long version, IndexRoutingTable indexRouting, - LatchedActionListener latchedActionListener, - BlobPath clusterBasePath + LatchedActionListener latchedActionListener ) { // noop return () -> {}; @@ -64,8 +62,8 @@ public List getAllUploadedIndices @Override public CheckedRunnable getAsyncIndexRoutingReadAction( + String clusterUUID, String uploadedFilename, - Index index, LatchedActionListener latchedActionListener ) { // noop diff --git a/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java b/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java index d455dfb58eabc..d319123bc2cee 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java +++ b/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java @@ -9,16 +9,13 @@ package org.opensearch.cluster.routing.remote; import org.opensearch.action.LatchedActionListener; -import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.DiffableUtils; import org.opensearch.cluster.routing.IndexRoutingTable; import org.opensearch.cluster.routing.RoutingTable; import org.opensearch.common.CheckedRunnable; -import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.lifecycle.LifecycleComponent; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; -import org.opensearch.core.index.Index; import org.opensearch.gateway.remote.ClusterMetadataManifest; import java.io.IOException; @@ -47,8 +44,8 @@ public IndexRoutingTable read(StreamInput in, String key) throws IOException { List getIndicesRouting(RoutingTable routingTable); CheckedRunnable getAsyncIndexRoutingReadAction( + String clusterUUID, String uploadedFilename, - Index index, LatchedActionListener latchedActionListener ); @@ -62,11 +59,12 @@ DiffableUtils.MapDiff> RoutingTable after ); - CheckedRunnable getIndexRoutingAsyncAction( - ClusterState clusterState, + CheckedRunnable getAsyncIndexRoutingWriteAction( + String clusterUUID, + long term, + long version, IndexRoutingTable indexRouting, - LatchedActionListener latchedActionListener, - BlobPath clusterBasePath + LatchedActionListener latchedActionListener ); List getAllUploadedIndicesRouting( diff --git a/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceFactory.java b/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceFactory.java index 82837191a30b7..56dfa03215a64 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceFactory.java +++ b/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceFactory.java @@ -34,10 +34,11 @@ public static RemoteRoutingTableService getService( Supplier repositoriesService, Settings settings, ClusterSettings clusterSettings, - ThreadPool threadPool + ThreadPool threadPool, + String clusterName ) { if (isRemoteRoutingTableEnabled(settings)) { - return new InternalRemoteRoutingTableService(repositoriesService, settings, clusterSettings, threadPool); + return new InternalRemoteRoutingTableService(repositoriesService, settings, clusterSettings, threadPool, clusterName); } return new NoopRemoteRoutingTableService(); } diff --git a/server/src/main/java/org/opensearch/common/remote/AbstractRemoteWritableBlobEntity.java b/server/src/main/java/org/opensearch/common/remote/AbstractRemoteWritableBlobEntity.java index 23fc9d3ad77cb..237c077cb673c 100644 --- a/server/src/main/java/org/opensearch/common/remote/AbstractRemoteWritableBlobEntity.java +++ b/server/src/main/java/org/opensearch/common/remote/AbstractRemoteWritableBlobEntity.java @@ -40,6 +40,10 @@ public AbstractRemoteWritableBlobEntity( this.namedXContentRegistry = namedXContentRegistry; } + public AbstractRemoteWritableBlobEntity(final String clusterUUID, final Compressor compressor) { + this(clusterUUID, compressor, null); + } + public abstract BlobPathParameters getBlobPathParameters(); public abstract String getType(); diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index b4826e1a59428..49801fd3834b8 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -78,7 +78,6 @@ import org.opensearch.cluster.routing.allocation.decider.SameShardAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider; -import org.opensearch.cluster.routing.remote.InternalRemoteRoutingTableService; import org.opensearch.cluster.service.ClusterApplierService; import org.opensearch.cluster.service.ClusterManagerService; import org.opensearch.cluster.service.ClusterManagerTaskThrottler; @@ -108,6 +107,7 @@ import org.opensearch.gateway.ShardsBatchGatewayAllocator; import org.opensearch.gateway.remote.RemoteClusterStateCleanupManager; import org.opensearch.gateway.remote.RemoteClusterStateService; +import org.opensearch.gateway.remote.model.RemoteRoutingTableBlobStore; import org.opensearch.http.HttpTransportSettings; import org.opensearch.index.IndexModule; import org.opensearch.index.IndexSettings; @@ -730,8 +730,8 @@ public void apply(Settings value, Settings current, Settings previous) { RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING, IndicesService.CLUSTER_REMOTE_INDEX_RESTRICT_ASYNC_DURABILITY_SETTING, IndicesService.CLUSTER_INDEX_RESTRICT_REPLICATION_TYPE_SETTING, - InternalRemoteRoutingTableService.REMOTE_ROUTING_TABLE_PATH_TYPE_SETTING, - InternalRemoteRoutingTableService.REMOTE_ROUTING_TABLE_PATH_HASH_ALGO_SETTING, + RemoteRoutingTableBlobStore.REMOTE_ROUTING_TABLE_PATH_TYPE_SETTING, + RemoteRoutingTableBlobStore.REMOTE_ROUTING_TABLE_PATH_HASH_ALGO_SETTING, // Admission Control Settings AdmissionControlSettings.ADMISSION_CONTROL_TRANSPORT_LAYER_MODE, diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index 3e63f9114ea16..7e7a93e1d42ec 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -26,7 +26,6 @@ import org.opensearch.cluster.node.DiscoveryNodes.Builder; import org.opensearch.cluster.routing.IndexRoutingTable; import org.opensearch.cluster.routing.RoutingTable; -import org.opensearch.cluster.routing.remote.InternalRemoteRoutingTableService; import org.opensearch.cluster.routing.remote.RemoteRoutingTableService; import org.opensearch.cluster.routing.remote.RemoteRoutingTableServiceFactory; import org.opensearch.cluster.service.ClusterService; @@ -43,7 +42,6 @@ import org.opensearch.common.util.io.IOUtils; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.io.stream.NamedWriteableRegistry; -import org.opensearch.core.index.Index; import org.opensearch.core.xcontent.ToXContent; import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata; import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadataAttribute; @@ -98,7 +96,6 @@ import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.UploadedMetadataResults; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.clusterUUIDContainer; -import static org.opensearch.gateway.remote.RemoteClusterStateUtils.getClusterMetadataBasePath; import static org.opensearch.gateway.remote.model.RemoteClusterStateCustoms.CLUSTER_STATE_CUSTOM; import static org.opensearch.gateway.remote.model.RemoteCoordinationMetadata.COORDINATION_METADATA; import static org.opensearch.gateway.remote.model.RemoteCustomMetadata.CUSTOM_DELIMITER; @@ -107,6 +104,7 @@ import static org.opensearch.gateway.remote.model.RemotePersistentSettingsMetadata.SETTING_METADATA; import static org.opensearch.gateway.remote.model.RemoteTemplatesMetadata.TEMPLATES_METADATA; import static org.opensearch.gateway.remote.model.RemoteTransientSettingsMetadata.TRANSIENT_SETTING_METADATA; +import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_METADATA_PREFIX; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled; /** @@ -146,7 +144,7 @@ public class RemoteClusterStateService implements Closeable { private final List indexMetadataUploadListeners; private BlobStoreRepository blobStoreRepository; private BlobStoreTransferService blobStoreTransferService; - private final RemoteRoutingTableService remoteRoutingTableService; + private RemoteRoutingTableService remoteRoutingTableService; private volatile TimeValue slowWriteLoggingThreshold; private final RemotePersistenceStats remoteStateStats; @@ -197,16 +195,17 @@ public RemoteClusterStateService( this.remoteStateStats = new RemotePersistenceStats(); this.namedWriteableRegistry = namedWriteableRegistry; this.indexMetadataUploadListeners = indexMetadataUploadListeners; + this.isPublicationEnabled = FeatureFlags.isEnabled(REMOTE_PUBLICATION_EXPERIMENTAL) + && RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled(settings) + && RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled(settings); this.remoteRoutingTableService = RemoteRoutingTableServiceFactory.getService( repositoriesService, settings, clusterSettings, - threadPool + threadpool, + ClusterName.CLUSTER_NAME_SETTING.get(settings).value() ); - this.remoteClusterStateCleanupManager = new RemoteClusterStateCleanupManager(this, clusterService, remoteRoutingTableService); - this.isPublicationEnabled = FeatureFlags.isEnabled(REMOTE_PUBLICATION_EXPERIMENTAL) - && RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled(settings) - && RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled(settings); + remoteClusterStateCleanupManager = new RemoteClusterStateCleanupManager(this, clusterService, remoteRoutingTableService); } /** @@ -663,16 +662,13 @@ UploadedMetadataResults writeMetadataInParallel( }); indicesRoutingToUpload.forEach(indexRoutingTable -> { uploadTasks.put( - InternalRemoteRoutingTableService.INDEX_ROUTING_METADATA_PREFIX + indexRoutingTable.getIndex().getName(), - remoteRoutingTableService.getIndexRoutingAsyncAction( - clusterState, + INDEX_ROUTING_METADATA_PREFIX + indexRoutingTable.getIndex().getName(), + remoteRoutingTableService.getAsyncIndexRoutingWriteAction( + clusterState.metadata().clusterUUID(), + clusterState.term(), + clusterState.version(), indexRoutingTable, - listener, - getClusterMetadataBasePath( - blobStoreRepository, - clusterState.getClusterName().value(), - clusterState.metadata().clusterUUID() - ) + listener ) ); }); @@ -723,7 +719,7 @@ UploadedMetadataResults writeMetadataInParallel( UploadedMetadataResults response = new UploadedMetadataResults(); results.forEach((name, uploadedMetadata) -> { if (uploadedMetadata.getClass().equals(UploadedIndexMetadata.class) - && uploadedMetadata.getComponent().contains(InternalRemoteRoutingTableService.INDEX_ROUTING_METADATA_PREFIX)) { + && uploadedMetadata.getComponent().contains(INDEX_ROUTING_METADATA_PREFIX)) { response.uploadedIndicesRoutingMetadata.add((UploadedIndexMetadata) uploadedMetadata); } else if (name.startsWith(CUSTOM_METADATA)) { // component name for custom metadata will look like custom-- @@ -897,9 +893,8 @@ public void start() { final Repository repository = repositoriesService.get().repository(remoteStoreRepo); assert repository instanceof BlobStoreRepository : "Repository should be instance of BlobStoreRepository"; blobStoreRepository = (BlobStoreRepository) repository; - this.remoteRoutingTableService.start(); - blobStoreTransferService = new BlobStoreTransferService(getBlobStore(), threadpool); String clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings).value(); + blobStoreTransferService = new BlobStoreTransferService(getBlobStore(), threadpool); remoteGlobalMetadataManager = new RemoteGlobalMetadataManager( clusterSettings, @@ -931,6 +926,8 @@ public void start() { namedWriteableRegistry, threadpool ); + + remoteRoutingTableService.start(); remoteClusterStateCleanupManager.start(); } @@ -1022,10 +1019,10 @@ ClusterState readClusterStateInParallel( LatchedActionListener routingTableLatchedActionListener = new LatchedActionListener<>( ActionListener.wrap(response -> { - logger.debug("Successfully read cluster state component from remote"); + logger.debug(() -> new ParameterizedMessage("Successfully read index-routing for index {}", response.getIndex().getName())); readIndexRoutingTableResults.add(response); }, ex -> { - logger.error("Failed to read cluster state from remote", ex); + logger.error(() -> new ParameterizedMessage("Failed to read index-routing from remote"), ex); exceptionList.add(ex); }), latch @@ -1034,8 +1031,8 @@ ClusterState readClusterStateInParallel( for (UploadedIndexMetadata indexRouting : indicesRoutingToRead) { asyncMetadataReadActions.add( remoteRoutingTableService.getAsyncIndexRoutingReadAction( + clusterUUID, indexRouting.getUploadedFilename(), - new Index(indexRouting.getIndexName(), indexRouting.getIndexUUID()), routingTableLatchedActionListener ) ); diff --git a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterStateBlobStore.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterStateBlobStore.java index 1dd23443f1252..cd8b8aa41ad65 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterStateBlobStore.java +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterStateBlobStore.java @@ -23,6 +23,8 @@ import java.io.InputStream; import java.util.concurrent.ExecutorService; +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.CLUSTER_STATE_PATH_TOKEN; + /** * Abstract class for a blob type storage * @@ -88,18 +90,26 @@ public void readAsync(final U entity, final ActionListener listener) { }); } - private BlobPath getBlobPathForUpload(final AbstractRemoteWritableBlobEntity obj) { - BlobPath blobPath = blobStoreRepository.basePath() - .add(RemoteClusterStateUtils.encodeString(clusterName)) - .add("cluster-state") - .add(obj.clusterUUID()); + public String getClusterName() { + return clusterName; + } + + public BlobPath getBlobPathPrefix(String clusterUUID) { + return blobStoreRepository.basePath() + .add(RemoteClusterStateUtils.encodeString(getClusterName())) + .add(CLUSTER_STATE_PATH_TOKEN) + .add(clusterUUID); + } + + public BlobPath getBlobPathForUpload(final AbstractRemoteWritableBlobEntity obj) { + BlobPath blobPath = getBlobPathPrefix(obj.clusterUUID()); for (String token : obj.getBlobPathParameters().getPathTokens()) { blobPath = blobPath.add(token); } return blobPath; } - private BlobPath getBlobPathForDownload(final AbstractRemoteWritableBlobEntity obj) { + public BlobPath getBlobPathForDownload(final AbstractRemoteWritableBlobEntity obj) { String[] pathTokens = obj.getBlobPathTokens(); BlobPath blobPath = new BlobPath(); if (pathTokens == null || pathTokens.length < 1) { diff --git a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteRoutingTableBlobStore.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteRoutingTableBlobStore.java new file mode 100644 index 0000000000000..7c4a5bf2236a1 --- /dev/null +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteRoutingTableBlobStore.java @@ -0,0 +1,108 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote.model; + +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity; +import org.opensearch.common.remote.RemoteWriteableEntity; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Setting; +import org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable; +import org.opensearch.index.remote.RemoteStoreEnums; +import org.opensearch.index.remote.RemoteStorePathStrategy; +import org.opensearch.index.translog.transfer.BlobStoreTransferService; +import org.opensearch.repositories.blobstore.BlobStoreRepository; +import org.opensearch.threadpool.ThreadPool; + +import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_TABLE; + +/** + * Extends the RemoteClusterStateBlobStore to support {@link RemoteIndexRoutingTable} + * + * @param which can be uploaded to / downloaded from blob store + * @param The concrete class implementing {@link RemoteWriteableEntity} which is used as a wrapper for IndexRoutingTable entity. + */ +public class RemoteRoutingTableBlobStore> extends + RemoteClusterStateBlobStore { + + /** + * This setting is used to set the remote routing table store blob store path type strategy. + */ + public static final Setting REMOTE_ROUTING_TABLE_PATH_TYPE_SETTING = new Setting<>( + "cluster.remote_store.routing_table.path_type", + RemoteStoreEnums.PathType.HASHED_PREFIX.toString(), + RemoteStoreEnums.PathType::parseString, + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + + /** + * This setting is used to set the remote routing table store blob store path hash algorithm strategy. + * This setting will come to effect if the {@link #REMOTE_ROUTING_TABLE_PATH_TYPE_SETTING} + * is either {@code HASHED_PREFIX} or {@code HASHED_INFIX}. + */ + public static final Setting REMOTE_ROUTING_TABLE_PATH_HASH_ALGO_SETTING = new Setting<>( + "cluster.remote_store.routing_table.path_hash_algo", + RemoteStoreEnums.PathHashAlgorithm.FNV_1A_BASE64.toString(), + RemoteStoreEnums.PathHashAlgorithm::parseString, + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + + private RemoteStoreEnums.PathType pathType; + private RemoteStoreEnums.PathHashAlgorithm pathHashAlgo; + + public RemoteRoutingTableBlobStore( + BlobStoreTransferService blobStoreTransferService, + BlobStoreRepository blobStoreRepository, + String clusterName, + ThreadPool threadPool, + String executor, + ClusterSettings clusterSettings + ) { + super(blobStoreTransferService, blobStoreRepository, clusterName, threadPool, executor); + this.pathType = clusterSettings.get(REMOTE_ROUTING_TABLE_PATH_TYPE_SETTING); + this.pathHashAlgo = clusterSettings.get(REMOTE_ROUTING_TABLE_PATH_HASH_ALGO_SETTING); + clusterSettings.addSettingsUpdateConsumer(REMOTE_ROUTING_TABLE_PATH_TYPE_SETTING, this::setPathTypeSetting); + clusterSettings.addSettingsUpdateConsumer(REMOTE_ROUTING_TABLE_PATH_HASH_ALGO_SETTING, this::setPathHashAlgoSetting); + } + + @Override + public BlobPath getBlobPathForUpload(final AbstractRemoteWritableBlobEntity obj) { + assert obj.getBlobPathParameters().getPathTokens().size() == 1 : "Unexpected tokens in RemoteRoutingTableObject"; + BlobPath indexRoutingPath = getBlobPathPrefix(obj.clusterUUID()).add(INDEX_ROUTING_TABLE); + + BlobPath path = pathType.path( + RemoteStorePathStrategy.PathInput.builder() + .basePath(indexRoutingPath) + .indexUUID(String.join("", obj.getBlobPathParameters().getPathTokens())) + .build(), + pathHashAlgo + ); + return path; + } + + private void setPathTypeSetting(RemoteStoreEnums.PathType pathType) { + this.pathType = pathType; + } + + private void setPathHashAlgoSetting(RemoteStoreEnums.PathHashAlgorithm pathHashAlgo) { + this.pathHashAlgo = pathHashAlgo; + } + + // For testing only + protected RemoteStoreEnums.PathType getPathTypeSetting() { + return pathType; + } + + // For testing only + protected RemoteStoreEnums.PathHashAlgorithm getPathHashAlgoSetting() { + return pathHashAlgo; + } +} diff --git a/server/src/main/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableHeader.java b/server/src/main/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableHeader.java deleted file mode 100644 index 5baea6adba0c7..0000000000000 --- a/server/src/main/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableHeader.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.gateway.remote.routingtable; - -import org.apache.lucene.codecs.CodecUtil; -import org.apache.lucene.index.CorruptIndexException; -import org.apache.lucene.index.IndexFormatTooNewException; -import org.apache.lucene.index.IndexFormatTooOldException; -import org.apache.lucene.store.InputStreamDataInput; -import org.apache.lucene.store.OutputStreamDataOutput; -import org.opensearch.core.common.io.stream.StreamInput; -import org.opensearch.core.common.io.stream.StreamOutput; -import org.opensearch.core.common.io.stream.Writeable; - -import java.io.EOFException; -import java.io.IOException; - -/** - * The stored header information for the individual index routing table - */ -public class IndexRoutingTableHeader implements Writeable { - - public static final String INDEX_ROUTING_HEADER_CODEC = "index_routing_header_codec"; - public static final int INITIAL_VERSION = 1; - public static final int CURRENT_VERSION = INITIAL_VERSION; - private final String indexName; - - public IndexRoutingTableHeader(String indexName) { - this.indexName = indexName; - } - - /** - * Reads the contents on the stream into the corresponding {@link IndexRoutingTableHeader} - * - * @param in streamInput - * @throws IOException exception thrown on failing to read from stream. - */ - public IndexRoutingTableHeader(StreamInput in) throws IOException { - try { - readHeaderVersion(in); - indexName = in.readString(); - } catch (EOFException e) { - throw new IOException("index routing header truncated", e); - } - } - - private void readHeaderVersion(final StreamInput in) throws IOException { - try { - CodecUtil.checkHeader(new InputStreamDataInput(in), INDEX_ROUTING_HEADER_CODEC, INITIAL_VERSION, CURRENT_VERSION); - } catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException e) { - throw new IOException("index routing table header corrupted", e); - } - } - - /** - * Write the IndexRoutingTable to given stream. - * - * @param out stream to write - * @throws IOException exception thrown on failing to write to stream. - */ - public void writeTo(StreamOutput out) throws IOException { - try { - CodecUtil.writeHeader(new OutputStreamDataOutput(out), INDEX_ROUTING_HEADER_CODEC, CURRENT_VERSION); - out.writeString(indexName); - out.flush(); - } catch (IOException e) { - throw new IOException("Failed to write IndexRoutingTable header", e); - } - } - - public String getIndexName() { - return indexName; - } - -} diff --git a/server/src/main/java/org/opensearch/gateway/remote/routingtable/RemoteIndexRoutingTable.java b/server/src/main/java/org/opensearch/gateway/remote/routingtable/RemoteIndexRoutingTable.java index 17c55190da07f..40b5bafde2b13 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/routingtable/RemoteIndexRoutingTable.java +++ b/server/src/main/java/org/opensearch/gateway/remote/routingtable/RemoteIndexRoutingTable.java @@ -9,92 +9,106 @@ package org.opensearch.gateway.remote.routingtable; import org.opensearch.cluster.routing.IndexRoutingTable; -import org.opensearch.cluster.routing.IndexShardRoutingTable; -import org.opensearch.core.common.io.stream.BufferedChecksumStreamInput; -import org.opensearch.core.common.io.stream.BufferedChecksumStreamOutput; -import org.opensearch.core.common.io.stream.InputStreamStreamInput; -import org.opensearch.core.common.io.stream.StreamOutput; -import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.common.io.Streams; +import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity; +import org.opensearch.common.remote.BlobPathParameters; +import org.opensearch.core.compress.Compressor; import org.opensearch.core.index.Index; +import org.opensearch.gateway.remote.ClusterMetadataManifest; +import org.opensearch.index.remote.RemoteStoreUtils; +import org.opensearch.repositories.blobstore.ChecksumWritableBlobStoreFormat; -import java.io.EOFException; import java.io.IOException; import java.io.InputStream; +import java.util.List; + +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER; /** * Remote store object for IndexRoutingTable */ -public class RemoteIndexRoutingTable implements Writeable { +public class RemoteIndexRoutingTable extends AbstractRemoteWritableBlobEntity { - private final IndexRoutingTable indexRoutingTable; + public static final String INDEX_ROUTING_TABLE = "index-routing"; + public static final String INDEX_ROUTING_METADATA_PREFIX = "indexRouting--"; + public static final String INDEX_ROUTING_FILE = "index_routing"; + private IndexRoutingTable indexRoutingTable; + private final Index index; + private long term; + private long version; + private BlobPathParameters blobPathParameters; + public static final ChecksumWritableBlobStoreFormat INDEX_ROUTING_TABLE_FORMAT = + new ChecksumWritableBlobStoreFormat<>("index-routing-table", IndexRoutingTable::readFrom); - public RemoteIndexRoutingTable(IndexRoutingTable indexRoutingTable) { + public RemoteIndexRoutingTable( + IndexRoutingTable indexRoutingTable, + String clusterUUID, + Compressor compressor, + long term, + long version + ) { + super(clusterUUID, compressor); + this.index = indexRoutingTable.getIndex(); this.indexRoutingTable = indexRoutingTable; + this.term = term; + this.version = version; } /** * Reads data from inputStream and creates RemoteIndexRoutingTable object with the {@link IndexRoutingTable} - * @param inputStream input stream with index routing data - * @param index index for the current routing data - * @throws IOException exception thrown on failing to read from stream. + * @param blobName name of the blob, which contains the index routing data + * @param clusterUUID UUID of the cluster + * @param compressor Compressor object */ - public RemoteIndexRoutingTable(InputStream inputStream, Index index) throws IOException { - try { - try (BufferedChecksumStreamInput in = new BufferedChecksumStreamInput(new InputStreamStreamInput(inputStream), "assertion")) { - // Read the Table Header first and confirm the index - IndexRoutingTableHeader indexRoutingTableHeader = new IndexRoutingTableHeader(in); - assert indexRoutingTableHeader.getIndexName().equals(index.getName()); + public RemoteIndexRoutingTable(String blobName, String clusterUUID, Compressor compressor) { + super(clusterUUID, compressor); + this.index = null; + this.term = -1; + this.version = -1; + this.blobName = blobName; + } - int numberOfShardRouting = in.readVInt(); - IndexRoutingTable.Builder indicesRoutingTable = IndexRoutingTable.builder(index); - for (int idx = 0; idx < numberOfShardRouting; idx++) { - IndexShardRoutingTable indexShardRoutingTable = IndexShardRoutingTable.Builder.readFrom(in); - indicesRoutingTable.addIndexShard(indexShardRoutingTable); - } - verifyCheckSum(in); - indexRoutingTable = indicesRoutingTable.build(); - } - } catch (EOFException e) { - throw new IOException("Indices Routing table is corrupted", e); + @Override + public BlobPathParameters getBlobPathParameters() { + if (blobPathParameters == null) { + blobPathParameters = new BlobPathParameters(List.of(indexRoutingTable.getIndex().getUUID()), INDEX_ROUTING_FILE); } + return blobPathParameters; } - public IndexRoutingTable getIndexRoutingTable() { - return indexRoutingTable; + @Override + public String getType() { + return INDEX_ROUTING_TABLE; } - /** - * Writes {@link IndexRoutingTable} to the given stream - * @param streamOutput output stream to write - * @throws IOException exception thrown on failing to write to stream. - */ @Override - public void writeTo(StreamOutput streamOutput) throws IOException { - try { - BufferedChecksumStreamOutput out = new BufferedChecksumStreamOutput(streamOutput); - IndexRoutingTableHeader indexRoutingTableHeader = new IndexRoutingTableHeader(indexRoutingTable.getIndex().getName()); - indexRoutingTableHeader.writeTo(out); - out.writeVInt(indexRoutingTable.shards().size()); - for (IndexShardRoutingTable next : indexRoutingTable) { - IndexShardRoutingTable.Builder.writeTo(next, out); - } - out.writeLong(out.getChecksum()); - out.flush(); - } catch (IOException e) { - throw new IOException("Failed to write IndexRoutingTable to stream", e); + public String generateBlobFileName() { + if (blobFileName == null) { + blobFileName = String.join( + DELIMITER, + getBlobPathParameters().getFilePrefix(), + RemoteStoreUtils.invertLong(term), + RemoteStoreUtils.invertLong(version), + RemoteStoreUtils.invertLong(System.currentTimeMillis()) + ); } + return blobFileName; } - private void verifyCheckSum(BufferedChecksumStreamInput in) throws IOException { - long expectedChecksum = in.getChecksum(); - long readChecksum = in.readLong(); - if (readChecksum != expectedChecksum) { - throw new IOException( - "checksum verification failed - expected: 0x" - + Long.toHexString(expectedChecksum) - + ", got: 0x" - + Long.toHexString(readChecksum) - ); - } + @Override + public ClusterMetadataManifest.UploadedMetadata getUploadedMetadata() { + assert blobName != null; + assert index != null; + return new ClusterMetadataManifest.UploadedIndexMetadata(index.getName(), index.getUUID(), blobName, INDEX_ROUTING_METADATA_PREFIX); + } + + @Override + public InputStream serialize() throws IOException { + return INDEX_ROUTING_TABLE_FORMAT.serialize(indexRoutingTable, generateBlobFileName(), getCompressor()).streamInput(); + } + + @Override + public IndexRoutingTable deserialize(InputStream in) throws IOException { + return INDEX_ROUTING_TABLE_FORMAT.deserialize(blobName, Streams.readFully(in)); } } diff --git a/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceFactoryTests.java b/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceFactoryTests.java index 39294ee8da41e..86f4b9502d6ab 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceFactoryTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceFactoryTests.java @@ -40,7 +40,8 @@ public void testGetServiceWhenRemoteRoutingDisabled() { repositoriesService, settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), - threadPool + threadPool, + "test-cluster" ); assertTrue(service instanceof NoopRemoteRoutingTableService); } @@ -56,7 +57,8 @@ public void testGetServiceWhenRemoteRoutingEnabled() { repositoriesService, settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), - threadPool + threadPool, + "test-cluster" ); assertTrue(service instanceof InternalRemoteRoutingTableService); } diff --git a/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java b/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java index 839ebe1ff8301..564c7f7aed304 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java @@ -19,27 +19,25 @@ import org.opensearch.cluster.routing.IndexRoutingTable; import org.opensearch.cluster.routing.RoutingTable; import org.opensearch.cluster.service.ClusterService; -import org.opensearch.common.CheckedRunnable; -import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer; import org.opensearch.common.blobstore.BlobContainer; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.blobstore.BlobStore; -import org.opensearch.common.blobstore.stream.write.WriteContext; import org.opensearch.common.blobstore.stream.write.WritePriority; import org.opensearch.common.compress.DeflateCompressor; -import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.FeatureFlags; +import org.opensearch.common.util.TestCapturingListener; import org.opensearch.core.action.ActionListener; -import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.compress.Compressor; +import org.opensearch.core.compress.NoneCompressor; import org.opensearch.core.index.Index; import org.opensearch.gateway.remote.ClusterMetadataManifest; -import org.opensearch.gateway.remote.RemoteStateTransferException; -import org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable; +import org.opensearch.gateway.remote.RemoteClusterStateUtils; import org.opensearch.index.remote.RemoteStoreEnums; import org.opensearch.index.remote.RemoteStorePathStrategy; import org.opensearch.index.remote.RemoteStoreUtils; +import org.opensearch.index.translog.transfer.BlobStoreTransferService; import org.opensearch.repositories.FilterRepository; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.RepositoryMissingException; @@ -51,33 +49,37 @@ import org.junit.Before; import java.io.IOException; +import java.io.InputStream; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Locale; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; import java.util.function.Supplier; -import org.mockito.ArgumentCaptor; import org.mockito.Mockito; -import static org.opensearch.cluster.routing.remote.InternalRemoteRoutingTableService.INDEX_ROUTING_FILE_PREFIX; -import static org.opensearch.cluster.routing.remote.InternalRemoteRoutingTableService.INDEX_ROUTING_PATH_TOKEN; import static org.opensearch.common.util.FeatureFlags.REMOTE_PUBLICATION_EXPERIMENTAL; import static org.opensearch.gateway.remote.ClusterMetadataManifestTests.randomUploadedIndexMetadataList; +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.CLUSTER_STATE_PATH_TOKEN; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER; +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.PATH_DELIMITER; +import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_FILE; +import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_METADATA_PREFIX; +import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_TABLE; +import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_TABLE_FORMAT; +import static org.opensearch.index.remote.RemoteStoreEnums.PathHashAlgorithm.FNV_1A_BASE64; +import static org.opensearch.index.remote.RemoteStoreEnums.PathType.HASHED_PREFIX; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY; -import static org.mockito.ArgumentMatchers.anyLong; +import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.ArgumentMatchers.startsWith; import static org.mockito.Mockito.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -92,6 +94,8 @@ public class RemoteRoutingTableServiceTests extends OpenSearchTestCase { private BlobPath basePath; private ClusterSettings clusterSettings; private ClusterService clusterService; + private Compressor compressor; + private BlobStoreTransferService blobStoreTransferService; private final ThreadPool threadPool = new TestThreadPool(getClass().getName()); @Before @@ -105,6 +109,7 @@ public void setup() { .build(); clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); clusterService = mock(ClusterService.class); + blobStoreTransferService = mock(BlobStoreTransferService.class); when(clusterService.getClusterSettings()).thenReturn(clusterSettings); blobStoreRepository = mock(BlobStoreRepository.class); when(blobStoreRepository.getCompressor()).thenReturn(new DeflateCompressor()); @@ -112,18 +117,20 @@ public void setup() { blobContainer = mock(BlobContainer.class); when(repositoriesService.repository("routing_repository")).thenReturn(blobStoreRepository); when(blobStoreRepository.blobStore()).thenReturn(blobStore); + when(blobStore.blobContainer(any())).thenReturn(blobContainer); Settings nodeSettings = Settings.builder().put(REMOTE_PUBLICATION_EXPERIMENTAL, "true").build(); FeatureFlags.initializeFeatureFlags(nodeSettings); - + compressor = new NoneCompressor(); basePath = BlobPath.cleanPath().add("base-path"); - + when(blobStoreRepository.basePath()).thenReturn(basePath); remoteRoutingTableService = new InternalRemoteRoutingTableService( repositoriesServiceSupplier, settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), - threadPool + threadPool, + "test-cluster" ); - + remoteRoutingTableService.doStart(); } @After @@ -141,7 +148,8 @@ public void testFailInitializationWhenRemoteRoutingDisabled() { repositoriesServiceSupplier, settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), - threadPool + threadPool, + "test-cluster" ) ); } @@ -347,136 +355,13 @@ public void testGetIndicesRoutingMapDiffIndexDeleted() { assertEquals(indexName, diff.getDeletes().get(0)); } - public void testGetIndexRoutingAsyncAction() throws IOException { - String indexName = randomAlphaOfLength(randomIntBetween(1, 50)); - ClusterState clusterState = createClusterState(indexName); - BlobPath expectedPath = getPath(); - - LatchedActionListener listener = mock(LatchedActionListener.class); - when(blobStore.blobContainer(expectedPath)).thenReturn(blobContainer); - - remoteRoutingTableService.start(); - CheckedRunnable runnable = remoteRoutingTableService.getIndexRoutingAsyncAction( - clusterState, - clusterState.routingTable().getIndicesRouting().get(indexName), - listener, - basePath - ); - assertNotNull(runnable); - runnable.run(); - - String expectedFilePrefix = String.join( - DELIMITER, - INDEX_ROUTING_FILE_PREFIX, - RemoteStoreUtils.invertLong(clusterState.term()), - RemoteStoreUtils.invertLong(clusterState.version()) - ); - verify(blobContainer, times(1)).writeBlob(startsWith(expectedFilePrefix), any(StreamInput.class), anyLong(), eq(true)); - verify(listener, times(1)).onResponse(any(ClusterMetadataManifest.UploadedMetadata.class)); - } - - public void testGetIndexRoutingAsyncActionFailureInBlobRepo() throws IOException { - String indexName = randomAlphaOfLength(randomIntBetween(1, 50)); - ClusterState clusterState = createClusterState(indexName); - BlobPath expectedPath = getPath(); - - LatchedActionListener listener = mock(LatchedActionListener.class); - when(blobStore.blobContainer(expectedPath)).thenReturn(blobContainer); - doThrow(new IOException("testing failure")).when(blobContainer).writeBlob(anyString(), any(StreamInput.class), anyLong(), eq(true)); - - remoteRoutingTableService.start(); - CheckedRunnable runnable = remoteRoutingTableService.getIndexRoutingAsyncAction( - clusterState, - clusterState.routingTable().getIndicesRouting().get(indexName), - listener, - basePath - ); - assertNotNull(runnable); - runnable.run(); - String expectedFilePrefix = String.join( - DELIMITER, - INDEX_ROUTING_FILE_PREFIX, - RemoteStoreUtils.invertLong(clusterState.term()), - RemoteStoreUtils.invertLong(clusterState.version()) - ); - verify(blobContainer, times(1)).writeBlob(startsWith(expectedFilePrefix), any(StreamInput.class), anyLong(), eq(true)); - verify(listener, times(1)).onFailure(any(RemoteStateTransferException.class)); - } - - public void testGetIndexRoutingAsyncActionAsyncRepo() throws IOException { - String indexName = randomAlphaOfLength(randomIntBetween(1, 50)); - ClusterState clusterState = createClusterState(indexName); - BlobPath expectedPath = getPath(); - - LatchedActionListener listener = mock(LatchedActionListener.class); - blobContainer = mock(AsyncMultiStreamBlobContainer.class); - when(blobStore.blobContainer(expectedPath)).thenReturn(blobContainer); - ArgumentCaptor> actionListenerArgumentCaptor = ArgumentCaptor.forClass(ActionListener.class); - ArgumentCaptor writeContextArgumentCaptor = ArgumentCaptor.forClass(WriteContext.class); - ConcurrentHashMap capturedWriteContext = new ConcurrentHashMap<>(); - - doAnswer((i) -> { - actionListenerArgumentCaptor.getValue().onResponse(null); - WriteContext writeContext = writeContextArgumentCaptor.getValue(); - capturedWriteContext.put(writeContext.getFileName().split(DELIMITER)[0], writeContextArgumentCaptor.getValue()); - return null; - }).when((AsyncMultiStreamBlobContainer) blobContainer) - .asyncBlobUpload(writeContextArgumentCaptor.capture(), actionListenerArgumentCaptor.capture()); - - remoteRoutingTableService.start(); - CheckedRunnable runnable = remoteRoutingTableService.getIndexRoutingAsyncAction( - clusterState, - clusterState.routingTable().getIndicesRouting().get(indexName), - listener, - basePath - ); - assertNotNull(runnable); - runnable.run(); - - String expectedFilePrefix = String.join( - DELIMITER, - INDEX_ROUTING_FILE_PREFIX, - RemoteStoreUtils.invertLong(clusterState.term()), - RemoteStoreUtils.invertLong(clusterState.version()) - ); - assertEquals(1, actionListenerArgumentCaptor.getAllValues().size()); - assertEquals(1, writeContextArgumentCaptor.getAllValues().size()); - assertNotNull(capturedWriteContext.get("index_routing")); - assertEquals(capturedWriteContext.get("index_routing").getWritePriority(), WritePriority.URGENT); - assertTrue(capturedWriteContext.get("index_routing").getFileName().startsWith(expectedFilePrefix)); - } - - public void testGetIndexRoutingAsyncActionAsyncRepoFailureInRepo() throws IOException { - String indexName = randomAlphaOfLength(randomIntBetween(1, 50)); - ClusterState clusterState = createClusterState(indexName); - BlobPath expectedPath = getPath(); - - LatchedActionListener listener = mock(LatchedActionListener.class); - blobContainer = mock(AsyncMultiStreamBlobContainer.class); - when(blobStore.blobContainer(expectedPath)).thenReturn(blobContainer); - - doThrow(new IOException("Testing failure")).when((AsyncMultiStreamBlobContainer) blobContainer) - .asyncBlobUpload(any(WriteContext.class), any(ActionListener.class)); - - remoteRoutingTableService.start(); - CheckedRunnable runnable = remoteRoutingTableService.getIndexRoutingAsyncAction( - clusterState, - clusterState.routingTable().getIndicesRouting().get(indexName), - listener, - basePath - ); - assertNotNull(runnable); - runnable.run(); - verify(listener, times(1)).onFailure(any(RemoteStateTransferException.class)); - } - public void testGetAllUploadedIndicesRouting() { final ClusterMetadataManifest previousManifest = ClusterMetadataManifest.builder().build(); final ClusterMetadataManifest.UploadedIndexMetadata uploadedIndexMetadata = new ClusterMetadataManifest.UploadedIndexMetadata( "test-index", "index-uuid", "index-filename", - InternalRemoteRoutingTableService.INDEX_ROUTING_METADATA_PREFIX + INDEX_ROUTING_METADATA_PREFIX ); List allIndiceRoutingMetadata = remoteRoutingTableService @@ -491,7 +376,7 @@ public void testGetAllUploadedIndicesRoutingExistingIndexInManifest() { "test-index", "index-uuid", "index-filename", - InternalRemoteRoutingTableService.INDEX_ROUTING_METADATA_PREFIX + INDEX_ROUTING_METADATA_PREFIX ); final ClusterMetadataManifest previousManifest = ClusterMetadataManifest.builder() .indicesRouting(List.of(uploadedIndexMetadata)) @@ -509,7 +394,7 @@ public void testGetAllUploadedIndicesRoutingNewIndexFromManifest() { "test-index", "index-uuid", "index-filename", - InternalRemoteRoutingTableService.INDEX_ROUTING_METADATA_PREFIX + INDEX_ROUTING_METADATA_PREFIX ); final ClusterMetadataManifest previousManifest = ClusterMetadataManifest.builder() .indicesRouting(List.of(uploadedIndexMetadata)) @@ -518,7 +403,7 @@ public void testGetAllUploadedIndicesRoutingNewIndexFromManifest() { "test-index2", "index-uuid", "index-filename", - InternalRemoteRoutingTableService.INDEX_ROUTING_METADATA_PREFIX + INDEX_ROUTING_METADATA_PREFIX ); List allIndiceRoutingMetadata = remoteRoutingTableService @@ -534,13 +419,13 @@ public void testGetAllUploadedIndicesRoutingIndexDeleted() { "test-index", "index-uuid", "index-filename", - InternalRemoteRoutingTableService.INDEX_ROUTING_METADATA_PREFIX + INDEX_ROUTING_METADATA_PREFIX ); final ClusterMetadataManifest.UploadedIndexMetadata uploadedIndexMetadata2 = new ClusterMetadataManifest.UploadedIndexMetadata( "test-index2", "index-uuid", "index-filename", - InternalRemoteRoutingTableService.INDEX_ROUTING_METADATA_PREFIX + INDEX_ROUTING_METADATA_PREFIX ); final ClusterMetadataManifest previousManifest = ClusterMetadataManifest.builder() .indicesRouting(List.of(uploadedIndexMetadata, uploadedIndexMetadata2)) @@ -558,13 +443,13 @@ public void testGetAllUploadedIndicesRoutingNoChange() { "test-index", "index-uuid", "index-filename", - InternalRemoteRoutingTableService.INDEX_ROUTING_METADATA_PREFIX + INDEX_ROUTING_METADATA_PREFIX ); final ClusterMetadataManifest.UploadedIndexMetadata uploadedIndexMetadata2 = new ClusterMetadataManifest.UploadedIndexMetadata( "test-index2", "index-uuid", "index-filename", - InternalRemoteRoutingTableService.INDEX_ROUTING_METADATA_PREFIX + INDEX_ROUTING_METADATA_PREFIX ); final ClusterMetadataManifest previousManifest = ClusterMetadataManifest.builder() .indicesRouting(List.of(uploadedIndexMetadata, uploadedIndexMetadata2)) @@ -640,69 +525,83 @@ public void testIndicesRoutingDiffWhenIndexDeletedAndAdded() { ); } - public void testGetAsyncIndexMetadataReadAction() throws Exception { + public void testGetAsyncIndexRoutingReadAction() throws Exception { String indexName = randomAlphaOfLength(randomIntBetween(1, 50)); ClusterState clusterState = createClusterState(indexName); String uploadedFileName = String.format(Locale.ROOT, "index-routing/" + indexName); - Index index = new Index(indexName, "uuid-01"); - - LatchedActionListener listener = mock(LatchedActionListener.class); - when(blobStore.blobContainer(any())).thenReturn(blobContainer); - BytesStreamOutput streamOutput = new BytesStreamOutput(); - RemoteIndexRoutingTable remoteIndexRoutingTable = new RemoteIndexRoutingTable( - clusterState.routingTable().getIndicesRouting().get(indexName) + when(blobContainer.readBlob(indexName)).thenReturn( + INDEX_ROUTING_TABLE_FORMAT.serialize( + clusterState.getRoutingTable().getIndicesRouting().get(indexName), + uploadedFileName, + compressor + ).streamInput() ); - remoteIndexRoutingTable.writeTo(streamOutput); - when(blobContainer.readBlob(indexName)).thenReturn(streamOutput.bytes().streamInput()); - remoteRoutingTableService.start(); + TestCapturingListener listener = new TestCapturingListener<>(); + CountDownLatch latch = new CountDownLatch(1); - CheckedRunnable runnable = remoteRoutingTableService.getAsyncIndexRoutingReadAction(uploadedFileName, index, listener); - assertNotNull(runnable); - runnable.run(); + remoteRoutingTableService.getAsyncIndexRoutingReadAction( + "cluster-uuid", + uploadedFileName, + new LatchedActionListener<>(listener, latch) + ).run(); + latch.await(); - assertBusy(() -> verify(blobContainer, times(1)).readBlob(any())); - assertBusy(() -> verify(listener, times(1)).onResponse(any(IndexRoutingTable.class))); + assertNull(listener.getFailure()); + assertNotNull(listener.getResult()); + IndexRoutingTable indexRoutingTable = listener.getResult(); + assertEquals(clusterState.getRoutingTable().getIndicesRouting().get(indexName), indexRoutingTable); } - public void testGetAsyncIndexMetadataReadActionFailureForIncorrectIndex() throws Exception { + public void testGetAsyncIndexRoutingWriteAction() throws Exception { String indexName = randomAlphaOfLength(randomIntBetween(1, 50)); ClusterState clusterState = createClusterState(indexName); - String uploadedFileName = String.format(Locale.ROOT, "index-routing/" + indexName); - Index index = new Index("incorrect-index", "uuid-01"); - - LatchedActionListener listener = mock(LatchedActionListener.class); - when(blobStore.blobContainer(any())).thenReturn(blobContainer); - BytesStreamOutput streamOutput = new BytesStreamOutput(); - RemoteIndexRoutingTable remoteIndexRoutingTable = new RemoteIndexRoutingTable( - clusterState.routingTable().getIndicesRouting().get(indexName) + Iterable remotePath = HASHED_PREFIX.path( + RemoteStorePathStrategy.PathInput.builder() + .basePath( + new BlobPath().add("base-path") + .add(RemoteClusterStateUtils.encodeString(ClusterName.DEFAULT.toString())) + .add(CLUSTER_STATE_PATH_TOKEN) + .add(clusterState.metadata().clusterUUID()) + .add(INDEX_ROUTING_TABLE) + ) + .indexUUID(clusterState.getRoutingTable().indicesRouting().get(indexName).getIndex().getUUID()) + .build(), + FNV_1A_BASE64 ); - remoteIndexRoutingTable.writeTo(streamOutput); - when(blobContainer.readBlob(anyString())).thenReturn(streamOutput.bytes().streamInput()); - remoteRoutingTableService.doStart(); - - CheckedRunnable runnable = remoteRoutingTableService.getAsyncIndexRoutingReadAction(uploadedFileName, index, listener); - assertNotNull(runnable); - runnable.run(); - - assertBusy(() -> verify(blobContainer, times(1)).readBlob(any())); - assertBusy(() -> verify(listener, times(1)).onFailure(any(Exception.class))); - } - - public void testGetAsyncIndexMetadataReadActionFailureInBlobRepo() throws Exception { - String indexName = randomAlphaOfLength(randomIntBetween(1, 50)); - String uploadedFileName = String.format(Locale.ROOT, "index-routing/" + indexName); - Index index = new Index(indexName, "uuid-01"); - - LatchedActionListener listener = mock(LatchedActionListener.class); - when(blobStore.blobContainer(any())).thenReturn(blobContainer); - doThrow(new IOException("testing failure")).when(blobContainer).readBlob(indexName); - remoteRoutingTableService.doStart(); - - CheckedRunnable runnable = remoteRoutingTableService.getAsyncIndexRoutingReadAction(uploadedFileName, index, listener); - assertNotNull(runnable); - runnable.run(); - assertBusy(() -> verify(listener, times(1)).onFailure(any(RemoteStateTransferException.class))); + doAnswer(invocationOnMock -> { + invocationOnMock.getArgument(4, ActionListener.class).onResponse(null); + return null; + }).when(blobStoreTransferService) + .uploadBlob(any(InputStream.class), eq(remotePath), anyString(), eq(WritePriority.URGENT), any(ActionListener.class)); + + TestCapturingListener listener = new TestCapturingListener<>(); + CountDownLatch latch = new CountDownLatch(1); + + remoteRoutingTableService.getAsyncIndexRoutingWriteAction( + clusterState.metadata().clusterUUID(), + clusterState.term(), + clusterState.version(), + clusterState.getRoutingTable().indicesRouting().get(indexName), + new LatchedActionListener<>(listener, latch) + ).run(); + latch.await(); + assertNull(listener.getFailure()); + assertNotNull(listener.getResult()); + ClusterMetadataManifest.UploadedMetadata uploadedMetadata = listener.getResult(); + + assertEquals(INDEX_ROUTING_METADATA_PREFIX + indexName, uploadedMetadata.getComponent()); + String uploadedFileName = uploadedMetadata.getUploadedFilename(); + String[] pathTokens = uploadedFileName.split(PATH_DELIMITER); + assertEquals(8, pathTokens.length); + assertEquals(pathTokens[1], "base-path"); + String[] fileNameTokens = pathTokens[7].split(DELIMITER); + + assertEquals(4, fileNameTokens.length); + assertEquals(fileNameTokens[0], INDEX_ROUTING_FILE); + assertEquals(fileNameTokens[1], RemoteStoreUtils.invertLong(1L)); + assertEquals(fileNameTokens[2], RemoteStoreUtils.invertLong(2L)); + assertThat(RemoteStoreUtils.invertLong(fileNameTokens[3]), lessThanOrEqualTo(System.currentTimeMillis())); } public void testGetUpdatedIndexRoutingTableMetadataWhenNoChange() { @@ -758,7 +657,7 @@ private ClusterState createClusterState(String indexName) { } private BlobPath getPath() { - BlobPath indexRoutingPath = basePath.add(INDEX_ROUTING_PATH_TOKEN); + BlobPath indexRoutingPath = basePath.add(INDEX_ROUTING_TABLE); return RemoteStoreEnums.PathType.HASHED_PREFIX.path( RemoteStorePathStrategy.PathInput.builder().basePath(indexRoutingPath).indexUUID("uuid").build(), RemoteStoreEnums.PathHashAlgorithm.FNV_1A_BASE64 diff --git a/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java b/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java index 152a6dba6c032..256161af1a3e2 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java @@ -13,7 +13,6 @@ import org.opensearch.cluster.metadata.IndexGraveyard; import org.opensearch.cluster.metadata.RepositoriesMetadata; import org.opensearch.cluster.metadata.WeightedRoutingMetadata; -import org.opensearch.cluster.routing.remote.InternalRemoteRoutingTableService; import org.opensearch.common.xcontent.json.JsonXContent; import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.core.common.io.stream.NamedWriteableRegistry; @@ -44,6 +43,7 @@ import static org.opensearch.gateway.remote.model.RemotePersistentSettingsMetadata.SETTING_METADATA; import static org.opensearch.gateway.remote.model.RemoteTemplatesMetadata.TEMPLATES_METADATA; import static org.opensearch.gateway.remote.model.RemoteTransientSettingsMetadata.TRANSIENT_SETTING_METADATA; +import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_METADATA_PREFIX; public class ClusterMetadataManifestTests extends OpenSearchTestCase { @@ -545,7 +545,7 @@ public void testClusterMetadataManifestXContentV2WithoutEphemeral() throws IOExc "test-index", "test-uuid", "routing-path", - InternalRemoteRoutingTableService.INDEX_ROUTING_METADATA_PREFIX + INDEX_ROUTING_METADATA_PREFIX ); ClusterMetadataManifest originalManifest = ClusterMetadataManifest.builder() .clusterTerm(1L) diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java index 6cd9cbbf13848..ebd3488d06007 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java @@ -117,6 +117,7 @@ import static org.opensearch.gateway.remote.RemoteClusterStateTestUtils.TestClusterStateCustom1; import static org.opensearch.gateway.remote.RemoteClusterStateTestUtils.TestClusterStateCustom2; import static org.opensearch.gateway.remote.RemoteClusterStateTestUtils.TestClusterStateCustom3; +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.CUSTOM_DELIMITER; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.FORMAT_PARAMS; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.getFormattedIndexFileName; @@ -126,7 +127,6 @@ import static org.opensearch.gateway.remote.model.RemoteClusterStateCustoms.CLUSTER_STATE_CUSTOM; import static org.opensearch.gateway.remote.model.RemoteCoordinationMetadata.COORDINATION_METADATA; import static org.opensearch.gateway.remote.model.RemoteCoordinationMetadata.COORDINATION_METADATA_FORMAT; -import static org.opensearch.gateway.remote.model.RemoteCustomMetadata.CUSTOM_DELIMITER; import static org.opensearch.gateway.remote.model.RemoteCustomMetadata.CUSTOM_METADATA; import static org.opensearch.gateway.remote.model.RemoteCustomMetadata.readFrom; import static org.opensearch.gateway.remote.model.RemoteDiscoveryNodes.DISCOVERY_NODES; @@ -144,6 +144,7 @@ import static org.opensearch.gateway.remote.model.RemoteTemplatesMetadata.TEMPLATES_METADATA_FORMAT; import static org.opensearch.gateway.remote.model.RemoteTemplatesMetadataTests.getTemplatesMetadata; import static org.opensearch.gateway.remote.model.RemoteTransientSettingsMetadata.TRANSIENT_SETTING_METADATA; +import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_METADATA_PREFIX; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT; @@ -2603,7 +2604,7 @@ public void testWriteFullMetadataSuccessWithRoutingTable() throws IOException { "test-index", "index-uuid", "routing-filename", - InternalRemoteRoutingTableService.INDEX_ROUTING_METADATA_PREFIX + INDEX_ROUTING_METADATA_PREFIX ); final ClusterMetadataManifest expectedManifest = ClusterMetadataManifest.builder() .indices(List.of(uploadedIndexMetadata)) @@ -2654,7 +2655,7 @@ public void testWriteFullMetadataInParallelSuccessWithRoutingTable() throws IOEx "test-index", "index-uuid", "routing-filename", - InternalRemoteRoutingTableService.INDEX_ROUTING_METADATA_PREFIX + INDEX_ROUTING_METADATA_PREFIX ); final ClusterMetadataManifest expectedManifest = ClusterMetadataManifest.builder() @@ -2710,7 +2711,7 @@ public void testWriteIncrementalMetadataSuccessWithRoutingTable() throws IOExcep "test-index", "index-uuid", "routing-filename", - InternalRemoteRoutingTableService.INDEX_ROUTING_METADATA_PREFIX + INDEX_ROUTING_METADATA_PREFIX ); final ClusterMetadataManifest expectedManifest = ClusterMetadataManifest.builder() .indices(List.of(uploadedIndexMetadata)) diff --git a/server/src/test/java/org/opensearch/gateway/remote/model/RemoteRoutingTableBlobStoreTests.java b/server/src/test/java/org/opensearch/gateway/remote/model/RemoteRoutingTableBlobStoreTests.java new file mode 100644 index 0000000000000..ea0f3264cfe4f --- /dev/null +++ b/server/src/test/java/org/opensearch/gateway/remote/model/RemoteRoutingTableBlobStoreTests.java @@ -0,0 +1,133 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote.model; + +import org.opensearch.Version; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.routing.IndexRoutingTable; +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.compress.DeflateCompressor; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.index.Index; +import org.opensearch.gateway.remote.RemoteClusterStateUtils; +import org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable; +import org.opensearch.index.remote.RemoteStoreEnums; +import org.opensearch.index.remote.RemoteStorePathStrategy; +import org.opensearch.index.translog.transfer.BlobStoreTransferService; +import org.opensearch.repositories.blobstore.BlobStoreRepository; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; +import org.junit.After; +import org.junit.Before; + +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.CLUSTER_STATE_PATH_TOKEN; +import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_TABLE; +import static org.opensearch.index.remote.RemoteStoreEnums.PathHashAlgorithm.FNV_1A_BASE64; +import static org.opensearch.index.remote.RemoteStoreEnums.PathType.HASHED_PREFIX; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class RemoteRoutingTableBlobStoreTests extends OpenSearchTestCase { + + private RemoteRoutingTableBlobStore remoteIndexRoutingTableStore; + ClusterSettings clusterSettings; + ThreadPool threadPool; + + @Before + public void setup() { + clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + BlobStoreTransferService blobStoreTransferService = mock(BlobStoreTransferService.class); + BlobStoreRepository blobStoreRepository = mock(BlobStoreRepository.class); + BlobPath blobPath = new BlobPath().add("base-path"); + when(blobStoreRepository.basePath()).thenReturn(blobPath); + + threadPool = new TestThreadPool(getClass().getName()); + this.remoteIndexRoutingTableStore = new RemoteRoutingTableBlobStore<>( + blobStoreTransferService, + blobStoreRepository, + "test-cluster", + threadPool, + ThreadPool.Names.REMOTE_STATE_READ, + clusterSettings + ); + } + + @After + public void tearDown() throws Exception { + super.tearDown(); + threadPool.shutdown(); + + } + + public void testRemoteRoutingTablePathTypeSetting() { + // Assert the default is HASHED_PREFIX + assertEquals(HASHED_PREFIX.toString(), remoteIndexRoutingTableStore.getPathTypeSetting().toString()); + + Settings newSettings = Settings.builder() + .put("cluster.remote_store.routing_table.path_type", RemoteStoreEnums.PathType.FIXED.toString()) + .build(); + clusterSettings.applySettings(newSettings); + assertEquals(RemoteStoreEnums.PathType.FIXED.toString(), remoteIndexRoutingTableStore.getPathTypeSetting().toString()); + } + + public void testRemoteRoutingTableHashAlgoSetting() { + // Assert the default is FNV_1A_BASE64 + assertEquals(FNV_1A_BASE64.toString(), remoteIndexRoutingTableStore.getPathHashAlgoSetting().toString()); + + Settings newSettings = Settings.builder() + .put("cluster.remote_store.routing_table.path_hash_algo", RemoteStoreEnums.PathHashAlgorithm.FNV_1A_COMPOSITE_1.toString()) + .build(); + clusterSettings.applySettings(newSettings); + assertEquals( + RemoteStoreEnums.PathHashAlgorithm.FNV_1A_COMPOSITE_1.toString(), + remoteIndexRoutingTableStore.getPathHashAlgoSetting().toString() + ); + } + + public void testGetBlobPathForUpload() { + + Index index = new Index("test-idx", "index-uuid"); + Settings idxSettings = Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_INDEX_UUID, index.getUUID()) + .build(); + + IndexMetadata indexMetadata = new IndexMetadata.Builder(index.getName()).settings(idxSettings) + .numberOfShards(1) + .numberOfReplicas(0) + .build(); + + IndexRoutingTable indexRoutingTable = new IndexRoutingTable.Builder(index).initializeAsNew(indexMetadata).build(); + + RemoteIndexRoutingTable remoteObjectForUpload = new RemoteIndexRoutingTable( + indexRoutingTable, + "cluster-uuid", + new DeflateCompressor(), + 2L, + 3L + ); + BlobPath blobPath = remoteIndexRoutingTableStore.getBlobPathForUpload(remoteObjectForUpload); + BlobPath expectedPath = HASHED_PREFIX.path( + RemoteStorePathStrategy.PathInput.builder() + .basePath( + new BlobPath().add("base-path") + .add(RemoteClusterStateUtils.encodeString("test-cluster")) + .add(CLUSTER_STATE_PATH_TOKEN) + .add("cluster-uuid") + .add(INDEX_ROUTING_TABLE) + ) + .indexUUID(index.getUUID()) + .build(), + FNV_1A_BASE64 + ); + assertEquals(expectedPath, blobPath); + } +} diff --git a/server/src/test/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableHeaderTests.java b/server/src/test/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableHeaderTests.java deleted file mode 100644 index a3f0ac36a40f1..0000000000000 --- a/server/src/test/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableHeaderTests.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.gateway.remote.routingtable; - -import org.opensearch.common.io.stream.BytesStreamOutput; -import org.opensearch.core.common.io.stream.BytesStreamInput; -import org.opensearch.test.OpenSearchTestCase; - -import java.io.IOException; - -public class IndexRoutingTableHeaderTests extends OpenSearchTestCase { - - public void testIndexRoutingTableHeader() throws IOException { - String indexName = randomAlphaOfLength(randomIntBetween(1, 50)); - IndexRoutingTableHeader header = new IndexRoutingTableHeader(indexName); - try (BytesStreamOutput out = new BytesStreamOutput()) { - header.writeTo(out); - - BytesStreamInput in = new BytesStreamInput(out.bytes().toBytesRef().bytes); - IndexRoutingTableHeader headerRead = new IndexRoutingTableHeader(in); - assertEquals(indexName, headerRead.getIndexName()); - - } - } - -} diff --git a/server/src/test/java/org/opensearch/gateway/remote/routingtable/RemoteIndexRoutingTableTests.java b/server/src/test/java/org/opensearch/gateway/remote/routingtable/RemoteIndexRoutingTableTests.java index 72066d8afb45b..29d4ffa978851 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/routingtable/RemoteIndexRoutingTableTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/routingtable/RemoteIndexRoutingTableTests.java @@ -13,16 +13,136 @@ import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.routing.IndexRoutingTable; import org.opensearch.cluster.routing.RoutingTable; -import org.opensearch.cluster.routing.ShardRoutingState; -import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.compress.DeflateCompressor; +import org.opensearch.common.remote.BlobPathParameters; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.common.io.stream.NamedWriteableRegistry; +import org.opensearch.core.compress.Compressor; +import org.opensearch.core.compress.NoneCompressor; +import org.opensearch.gateway.remote.ClusterMetadataManifest; +import org.opensearch.gateway.remote.RemoteClusterStateUtils; +import org.opensearch.index.remote.RemoteStoreUtils; +import org.opensearch.index.translog.transfer.BlobStoreTransferService; +import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; +import org.junit.After; +import org.junit.Before; import java.io.IOException; -import java.util.concurrent.atomic.AtomicInteger; +import java.io.InputStream; +import java.util.List; + +import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_FILE; +import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_METADATA_PREFIX; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.hamcrest.Matchers.nullValue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class RemoteIndexRoutingTableTests extends OpenSearchTestCase { - public void testRoutingTableInput() { + private static final String TEST_BLOB_NAME = "/test-path/test-blob-name"; + private static final String TEST_BLOB_PATH = "test-path"; + private static final String TEST_BLOB_FILE_NAME = "test-blob-name"; + private static final String INDEX_ROUTING_TABLE_TYPE = "test-index-routing-table"; + private static final long STATE_VERSION = 3L; + private static final long STATE_TERM = 2L; + private String clusterUUID; + private BlobStoreTransferService blobStoreTransferService; + private BlobStoreRepository blobStoreRepository; + private String clusterName; + private ClusterSettings clusterSettings; + private Compressor compressor; + private NamedWriteableRegistry namedWriteableRegistry; + private final ThreadPool threadPool = new TestThreadPool(getClass().getName()); + + @Before + public void setup() { + clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + this.clusterUUID = "test-cluster-uuid"; + this.blobStoreTransferService = mock(BlobStoreTransferService.class); + this.blobStoreRepository = mock(BlobStoreRepository.class); + BlobPath blobPath = new BlobPath().add("/path"); + when(blobStoreRepository.basePath()).thenReturn(blobPath); + when(blobStoreRepository.getCompressor()).thenReturn(new DeflateCompressor()); + compressor = new NoneCompressor(); + namedWriteableRegistry = writableRegistry(); + this.clusterName = "test-cluster-name"; + } + + @After + public void tearDown() throws Exception { + super.tearDown(); + threadPool.shutdown(); + } + + public void testClusterUUID() { + String indexName = randomAlphaOfLength(randomIntBetween(1, 50)); + int numberOfShards = randomIntBetween(1, 10); + int numberOfReplicas = randomIntBetween(1, 10); + Metadata metadata = Metadata.builder() + .put( + IndexMetadata.builder(indexName) + .settings(settings(Version.CURRENT)) + .numberOfShards(numberOfShards) + .numberOfReplicas(numberOfReplicas) + ) + .build(); + + RoutingTable initialRoutingTable = RoutingTable.builder().addAsNew(metadata.index(indexName)).build(); + + initialRoutingTable.getIndicesRouting().values().forEach(indexRoutingTable -> { + RemoteIndexRoutingTable remoteObjectForUpload = new RemoteIndexRoutingTable( + indexRoutingTable, + clusterUUID, + compressor, + STATE_TERM, + STATE_VERSION + ); + assertEquals(remoteObjectForUpload.clusterUUID(), clusterUUID); + + RemoteIndexRoutingTable remoteObjectForDownload = new RemoteIndexRoutingTable(TEST_BLOB_NAME, clusterUUID, compressor); + assertEquals(remoteObjectForDownload.clusterUUID(), clusterUUID); + }); + } + + public void testFullBlobName() { + String indexName = randomAlphaOfLength(randomIntBetween(1, 50)); + int numberOfShards = randomIntBetween(1, 10); + int numberOfReplicas = randomIntBetween(1, 10); + Metadata metadata = Metadata.builder() + .put( + IndexMetadata.builder(indexName) + .settings(settings(Version.CURRENT)) + .numberOfShards(numberOfShards) + .numberOfReplicas(numberOfReplicas) + ) + .build(); + + RoutingTable initialRoutingTable = RoutingTable.builder().addAsNew(metadata.index(indexName)).build(); + + initialRoutingTable.getIndicesRouting().values().forEach(indexRoutingTable -> { + RemoteIndexRoutingTable remoteObjectForUpload = new RemoteIndexRoutingTable( + indexRoutingTable, + clusterUUID, + compressor, + STATE_TERM, + STATE_VERSION + ); + assertThat(remoteObjectForUpload.getFullBlobName(), nullValue()); + + RemoteIndexRoutingTable remoteObjectForDownload = new RemoteIndexRoutingTable(TEST_BLOB_NAME, clusterUUID, compressor); + assertThat(remoteObjectForDownload.getFullBlobName(), is(TEST_BLOB_NAME)); + }); + } + + public void testBlobFileName() { String indexName = randomAlphaOfLength(randomIntBetween(1, 50)); int numberOfShards = randomIntBetween(1, 10); int numberOfReplicas = randomIntBetween(1, 10); @@ -37,51 +157,164 @@ public void testRoutingTableInput() { RoutingTable initialRoutingTable = RoutingTable.builder().addAsNew(metadata.index(indexName)).build(); - initialRoutingTable.getIndicesRouting().values().forEach(indexShardRoutingTables -> { - RemoteIndexRoutingTable indexRouting = new RemoteIndexRoutingTable(indexShardRoutingTables); - try (BytesStreamOutput streamOutput = new BytesStreamOutput();) { - indexRouting.writeTo(streamOutput); - RemoteIndexRoutingTable remoteIndexRoutingTable = new RemoteIndexRoutingTable( - streamOutput.bytes().streamInput(), - metadata.index(indexName).getIndex() - ); - IndexRoutingTable indexRoutingTable = remoteIndexRoutingTable.getIndexRoutingTable(); - assertEquals(numberOfShards, indexRoutingTable.getShards().size()); - assertEquals(metadata.index(indexName).getIndex(), indexRoutingTable.getIndex()); - assertEquals( - numberOfShards * (1 + numberOfReplicas), - indexRoutingTable.shardsWithState(ShardRoutingState.UNASSIGNED).size() - ); + initialRoutingTable.getIndicesRouting().values().forEach(indexRoutingTable -> { + RemoteIndexRoutingTable remoteObjectForUpload = new RemoteIndexRoutingTable( + indexRoutingTable, + clusterUUID, + compressor, + STATE_TERM, + STATE_VERSION + ); + assertThat(remoteObjectForUpload.getBlobFileName(), nullValue()); + + RemoteIndexRoutingTable remoteObjectForDownload = new RemoteIndexRoutingTable(TEST_BLOB_NAME, clusterUUID, compressor); + assertThat(remoteObjectForDownload.getBlobFileName(), is(TEST_BLOB_FILE_NAME)); + }); + } + + public void testBlobPathTokens() { + String uploadedFile = "user/local/opensearch/routingTable"; + RemoteIndexRoutingTable remoteObjectForDownload = new RemoteIndexRoutingTable(uploadedFile, clusterUUID, compressor); + assertThat(remoteObjectForDownload.getBlobPathTokens(), is(new String[] { "user", "local", "opensearch", "routingTable" })); + } + + public void testBlobPathParameters() { + String indexName = randomAlphaOfLength(randomIntBetween(1, 50)); + int numberOfShards = randomIntBetween(1, 10); + int numberOfReplicas = randomIntBetween(1, 10); + Metadata metadata = Metadata.builder() + .put( + IndexMetadata.builder(indexName) + .settings(settings(Version.CURRENT)) + .numberOfShards(numberOfShards) + .numberOfReplicas(numberOfReplicas) + ) + .build(); + + RoutingTable initialRoutingTable = RoutingTable.builder().addAsNew(metadata.index(indexName)).build(); + + initialRoutingTable.getIndicesRouting().values().forEach(indexRoutingTable -> { + RemoteIndexRoutingTable remoteObjectForUpload = new RemoteIndexRoutingTable( + indexRoutingTable, + clusterUUID, + compressor, + STATE_TERM, + STATE_VERSION + ); + assertThat(remoteObjectForUpload.getBlobFileName(), nullValue()); + + BlobPathParameters params = remoteObjectForUpload.getBlobPathParameters(); + assertThat(params.getPathTokens(), is(List.of(indexRoutingTable.getIndex().getUUID()))); + String expectedPrefix = INDEX_ROUTING_FILE; + assertThat(params.getFilePrefix(), is(expectedPrefix)); + }); + } + + public void testGenerateBlobFileName() { + String indexName = randomAlphaOfLength(randomIntBetween(1, 50)); + int numberOfShards = randomIntBetween(1, 10); + int numberOfReplicas = randomIntBetween(1, 10); + Metadata metadata = Metadata.builder() + .put( + IndexMetadata.builder(indexName) + .settings(settings(Version.CURRENT)) + .numberOfShards(numberOfShards) + .numberOfReplicas(numberOfReplicas) + ) + .build(); + + RoutingTable initialRoutingTable = RoutingTable.builder().addAsNew(metadata.index(indexName)).build(); + + initialRoutingTable.getIndicesRouting().values().forEach(indexRoutingTable -> { + RemoteIndexRoutingTable remoteObjectForUpload = new RemoteIndexRoutingTable( + indexRoutingTable, + clusterUUID, + compressor, + STATE_TERM, + STATE_VERSION + ); + + String blobFileName = remoteObjectForUpload.generateBlobFileName(); + String[] nameTokens = blobFileName.split(RemoteClusterStateUtils.DELIMITER); + assertEquals(nameTokens[0], INDEX_ROUTING_FILE); + assertEquals(nameTokens[1], RemoteStoreUtils.invertLong(STATE_TERM)); + assertEquals(nameTokens[2], RemoteStoreUtils.invertLong(STATE_VERSION)); + assertThat(RemoteStoreUtils.invertLong(nameTokens[3]), lessThanOrEqualTo(System.currentTimeMillis())); + }); + } + + public void testGetUploadedMetadata() throws IOException { + String indexName = randomAlphaOfLength(randomIntBetween(1, 50)); + int numberOfShards = randomIntBetween(1, 10); + int numberOfReplicas = randomIntBetween(1, 10); + Metadata metadata = Metadata.builder() + .put( + IndexMetadata.builder(indexName) + .settings(settings(Version.CURRENT)) + .numberOfShards(numberOfShards) + .numberOfReplicas(numberOfReplicas) + ) + .build(); + + RoutingTable initialRoutingTable = RoutingTable.builder().addAsNew(metadata.index(indexName)).build(); + + initialRoutingTable.getIndicesRouting().values().forEach(indexRoutingTable -> { + RemoteIndexRoutingTable remoteObjectForUpload = new RemoteIndexRoutingTable( + indexRoutingTable, + clusterUUID, + compressor, + STATE_TERM, + STATE_VERSION + ); + + assertThrows(AssertionError.class, remoteObjectForUpload::getUploadedMetadata); + + try (InputStream inputStream = remoteObjectForUpload.serialize()) { + remoteObjectForUpload.setFullBlobName(new BlobPath().add(TEST_BLOB_PATH)); + ClusterMetadataManifest.UploadedMetadata uploadedMetadata = remoteObjectForUpload.getUploadedMetadata(); + String expectedPrefix = INDEX_ROUTING_METADATA_PREFIX + indexRoutingTable.getIndex().getName(); + assertThat(uploadedMetadata.getComponent(), is(expectedPrefix)); + assertThat(uploadedMetadata.getUploadedFilename(), is(remoteObjectForUpload.getFullBlobName())); } catch (IOException e) { throw new RuntimeException(e); } }); } - public void testRoutingTableInputStreamWithInvalidIndex() { + public void testSerDe() throws IOException { + String indexName = randomAlphaOfLength(randomIntBetween(1, 50)); + int numberOfShards = randomIntBetween(1, 10); + int numberOfReplicas = randomIntBetween(1, 10); Metadata metadata = Metadata.builder() - .put(IndexMetadata.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1)) - .put(IndexMetadata.builder("invalid-index").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1)) + .put( + IndexMetadata.builder(indexName) + .settings(settings(Version.CURRENT)) + .numberOfShards(numberOfShards) + .numberOfReplicas(numberOfReplicas) + ) .build(); - RoutingTable initialRoutingTable = RoutingTable.builder().addAsNew(metadata.index("test")).build(); - AtomicInteger assertionError = new AtomicInteger(); - initialRoutingTable.getIndicesRouting().values().forEach(indexShardRoutingTables -> { - RemoteIndexRoutingTable indexRouting = new RemoteIndexRoutingTable(indexShardRoutingTables); - try (BytesStreamOutput streamOutput = new BytesStreamOutput()) { - indexRouting.writeTo(streamOutput); - RemoteIndexRoutingTable remoteIndexRoutingTable = new RemoteIndexRoutingTable( - streamOutput.bytes().streamInput(), - metadata.index("invalid-index").getIndex() - ); - } catch (AssertionError e) { - assertionError.getAndIncrement(); + RoutingTable initialRoutingTable = RoutingTable.builder().addAsNew(metadata.index(indexName)).build(); + + initialRoutingTable.getIndicesRouting().values().forEach(indexRoutingTable -> { + RemoteIndexRoutingTable remoteObjectForUpload = new RemoteIndexRoutingTable( + indexRoutingTable, + clusterUUID, + compressor, + STATE_TERM, + STATE_VERSION + ); + + assertThrows(AssertionError.class, remoteObjectForUpload::getUploadedMetadata); + + try (InputStream inputStream = remoteObjectForUpload.serialize()) { + remoteObjectForUpload.setFullBlobName(BlobPath.cleanPath()); + assertThat(inputStream.available(), greaterThan(0)); + IndexRoutingTable readIndexRoutingTable = remoteObjectForUpload.deserialize(inputStream); + assertEquals(readIndexRoutingTable, indexRoutingTable); } catch (IOException e) { throw new RuntimeException(e); } }); - - assertEquals(1, assertionError.get()); } - }