From efa150a2b0d53e578abab7f037ceb0a3594a1223 Mon Sep 17 00:00:00 2001 From: Arpit Bandejiya Date: Mon, 8 Jul 2024 10:30:22 +0530 Subject: [PATCH 1/6] Refactor remote-routing-table service Signed-off-by: Arpit Bandejiya --- .../InternalRemoteRoutingTableService.java | 219 ++++-------------- .../remote/NoopRemoteRoutingTableService.java | 8 +- .../remote/RemoteRoutingTableService.java | 8 +- .../RemoteRoutingTableServiceFactory.java | 20 +- .../common/settings/ClusterSettings.java | 6 +- .../remote/RemoteClusterStateService.java | 40 ++-- .../model/RemoteClusterStateBlobStore.java | 17 +- .../model/RemoteRoutingTableBlobStore.java | 107 +++++++++ .../routingtable/IndexRoutingTableHeader.java | 81 ------- .../routingtable/RemoteIndexRoutingTable.java | 140 ++++++----- 10 files changed, 289 insertions(+), 357 deletions(-) create mode 100644 server/src/main/java/org/opensearch/gateway/remote/model/RemoteRoutingTableBlobStore.java delete mode 100644 server/src/main/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableHeader.java diff --git a/server/src/main/java/org/opensearch/cluster/routing/remote/InternalRemoteRoutingTableService.java b/server/src/main/java/org/opensearch/cluster/routing/remote/InternalRemoteRoutingTableService.java index cc1b0713393f3..3a1ab73b398e3 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/remote/InternalRemoteRoutingTableService.java +++ b/server/src/main/java/org/opensearch/cluster/routing/remote/InternalRemoteRoutingTableService.java @@ -11,35 +11,26 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; -import org.apache.lucene.store.IndexInput; import org.opensearch.action.LatchedActionListener; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.DiffableUtils; import org.opensearch.cluster.routing.IndexRoutingTable; import org.opensearch.cluster.routing.RoutingTable; import org.opensearch.common.CheckedRunnable; -import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer; -import org.opensearch.common.blobstore.BlobContainer; import org.opensearch.common.blobstore.BlobPath; -import org.opensearch.common.blobstore.stream.write.WritePriority; -import org.opensearch.common.blobstore.transfer.RemoteTransferContainer; -import org.opensearch.common.blobstore.transfer.stream.OffsetRangeIndexInputStream; -import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.lifecycle.AbstractLifecycleComponent; -import org.opensearch.common.lucene.store.ByteArrayIndexInput; +import org.opensearch.common.remote.RemoteWritableEntityStore; import org.opensearch.common.settings.ClusterSettings; -import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.io.IOUtils; import org.opensearch.core.action.ActionListener; -import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.compress.Compressor; import org.opensearch.core.index.Index; import org.opensearch.gateway.remote.ClusterMetadataManifest; import org.opensearch.gateway.remote.RemoteStateTransferException; +import org.opensearch.gateway.remote.model.RemoteRoutingTableBlobStore; import org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable; -import org.opensearch.index.remote.RemoteStoreEnums; -import org.opensearch.index.remote.RemoteStorePathStrategy; -import org.opensearch.index.remote.RemoteStoreUtils; +import org.opensearch.index.translog.transfer.BlobStoreTransferService; import org.opensearch.node.Node; import org.opensearch.node.remotestore.RemoteStoreNodeAttribute; import org.opensearch.repositories.RepositoriesService; @@ -52,12 +43,10 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.concurrent.ExecutorService; import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; -import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled; /** @@ -67,30 +56,6 @@ */ public class InternalRemoteRoutingTableService extends AbstractLifecycleComponent implements RemoteRoutingTableService { - /** - * This setting is used to set the remote routing table store blob store path type strategy. - */ - public static final Setting REMOTE_ROUTING_TABLE_PATH_TYPE_SETTING = new Setting<>( - "cluster.remote_store.routing_table.path_type", - RemoteStoreEnums.PathType.HASHED_PREFIX.toString(), - RemoteStoreEnums.PathType::parseString, - Setting.Property.NodeScope, - Setting.Property.Dynamic - ); - - /** - * This setting is used to set the remote routing table store blob store path hash algorithm strategy. - * This setting will come to effect if the {@link #REMOTE_ROUTING_TABLE_PATH_TYPE_SETTING} - * is either {@code HASHED_PREFIX} or {@code HASHED_INFIX}. - */ - public static final Setting REMOTE_ROUTING_TABLE_PATH_HASH_ALGO_SETTING = new Setting<>( - "cluster.remote_store.routing_table.path_hash_algo", - RemoteStoreEnums.PathHashAlgorithm.FNV_1A_BASE64.toString(), - RemoteStoreEnums.PathHashAlgorithm::parseString, - Setting.Property.NodeScope, - Setting.Property.Dynamic - ); - public static final String INDEX_ROUTING_PATH_TOKEN = "index-routing"; public static final String INDEX_ROUTING_FILE_PREFIX = "index_routing"; public static final String INDEX_ROUTING_METADATA_PREFIX = "indexRouting--"; @@ -98,33 +63,34 @@ public class InternalRemoteRoutingTableService extends AbstractLifecycleComponen private static final Logger logger = LogManager.getLogger(InternalRemoteRoutingTableService.class); private final Settings settings; private final Supplier repositoriesService; + private final Compressor compressor; + private final RemoteWritableEntityStore remoteWritableEntityStore; private BlobStoreRepository blobStoreRepository; - private RemoteStoreEnums.PathType pathType; - private RemoteStoreEnums.PathHashAlgorithm pathHashAlgo; private ThreadPool threadPool; public InternalRemoteRoutingTableService( Supplier repositoriesService, Settings settings, ClusterSettings clusterSettings, - ThreadPool threadpool + ThreadPool threadpool, + Compressor compressor, + BlobStoreTransferService blobStoreTransferService, + BlobStoreRepository blobStoreRepository, + String clusterName ) { assert isRemoteRoutingTableEnabled(settings) : "Remote routing table is not enabled"; this.repositoriesService = repositoriesService; this.settings = settings; - this.pathType = clusterSettings.get(REMOTE_ROUTING_TABLE_PATH_TYPE_SETTING); - this.pathHashAlgo = clusterSettings.get(REMOTE_ROUTING_TABLE_PATH_HASH_ALGO_SETTING); - clusterSettings.addSettingsUpdateConsumer(REMOTE_ROUTING_TABLE_PATH_TYPE_SETTING, this::setPathTypeSetting); - clusterSettings.addSettingsUpdateConsumer(REMOTE_ROUTING_TABLE_PATH_HASH_ALGO_SETTING, this::setPathHashAlgoSetting); this.threadPool = threadpool; - } - - private void setPathTypeSetting(RemoteStoreEnums.PathType pathType) { - this.pathType = pathType; - } - - private void setPathHashAlgoSetting(RemoteStoreEnums.PathHashAlgorithm pathHashAlgo) { - this.pathHashAlgo = pathHashAlgo; + this.compressor = compressor; + this.remoteWritableEntityStore = new RemoteRoutingTableBlobStore<>( + blobStoreTransferService, + blobStoreRepository, + clusterName, + threadpool, + ThreadPool.Names.REMOTE_STATE_READ, + clusterSettings + ); } public List getIndicesRouting(RoutingTable routingTable) { @@ -152,42 +118,35 @@ public DiffableUtils.MapDiff getIndexRoutingAsyncAction( + @Override + public CheckedRunnable getAsyncIndexRoutingWriteAction( ClusterState clusterState, + String clusterUUID, IndexRoutingTable indexRouting, - LatchedActionListener latchedActionListener, - BlobPath clusterBasePath + LatchedActionListener latchedActionListener ) { - BlobPath indexRoutingPath = clusterBasePath.add(INDEX_ROUTING_PATH_TOKEN); - BlobPath path = pathType.path( - RemoteStorePathStrategy.PathInput.builder().basePath(indexRoutingPath).indexUUID(indexRouting.getIndex().getUUID()).build(), - pathHashAlgo + RemoteIndexRoutingTable remoteIndexRoutingTable = new RemoteIndexRoutingTable( + indexRouting, + clusterUUID, + compressor, + clusterState.term(), + clusterState.version() ); - final BlobContainer blobContainer = blobStoreRepository.blobStore().blobContainer(path); - - final String fileName = getIndexRoutingFileName(clusterState.term(), clusterState.version()); ActionListener completionListener = ActionListener.wrap( - resp -> latchedActionListener.onResponse( - new ClusterMetadataManifest.UploadedIndexMetadata( - indexRouting.getIndex().getName(), - indexRouting.getIndex().getUUID(), - path.buildAsString() + fileName, - INDEX_ROUTING_METADATA_PREFIX - ) - ), + resp -> latchedActionListener.onResponse(remoteIndexRoutingTable.getUploadedMetadata()), ex -> latchedActionListener.onFailure( new RemoteStateTransferException("Exception in writing index to remote store: " + indexRouting.getIndex().toString(), ex) ) ); - return () -> uploadIndex(indexRouting, fileName, blobContainer, completionListener); + return () -> remoteWritableEntityStore.writeAsync(remoteIndexRoutingTable, completionListener); } /** @@ -214,111 +173,22 @@ public List getAllUploadedIndices return new ArrayList<>(allUploadedIndicesRouting.values()); } - private void uploadIndex( - IndexRoutingTable indexRouting, - String fileName, - BlobContainer blobContainer, - ActionListener completionListener - ) { - RemoteIndexRoutingTable indexRoutingInput = new RemoteIndexRoutingTable(indexRouting); - BytesReference bytesInput = null; - try (BytesStreamOutput streamOutput = new BytesStreamOutput()) { - indexRoutingInput.writeTo(streamOutput); - bytesInput = streamOutput.bytes(); - } catch (IOException e) { - logger.error("Failed to serialize IndexRoutingTable for [{}]: [{}]", indexRouting, e); - completionListener.onFailure(e); - return; - } - - if (blobContainer instanceof AsyncMultiStreamBlobContainer == false) { - try { - blobContainer.writeBlob(fileName, bytesInput.streamInput(), bytesInput.length(), true); - completionListener.onResponse(null); - } catch (IOException e) { - logger.error("Failed to write IndexRoutingTable to remote store for indexRouting [{}]: [{}]", indexRouting, e); - completionListener.onFailure(e); - } - return; - } - - try (IndexInput input = new ByteArrayIndexInput("indexrouting", BytesReference.toBytes(bytesInput))) { - try ( - RemoteTransferContainer remoteTransferContainer = new RemoteTransferContainer( - fileName, - fileName, - input.length(), - true, - WritePriority.URGENT, - (size, position) -> new OffsetRangeIndexInputStream(input, size, position), - null, - false - ) - ) { - ((AsyncMultiStreamBlobContainer) blobContainer).asyncBlobUpload( - remoteTransferContainer.createWriteContext(), - completionListener - ); - } catch (IOException e) { - logger.error("Failed to write IndexRoutingTable to remote store for indexRouting [{}]: [{}]", indexRouting, e); - completionListener.onFailure(e); - } - } catch (IOException e) { - logger.error( - "Failed to create transfer object for IndexRoutingTable for remote store upload for indexRouting [{}]: [{}]", - indexRouting, - e - ); - completionListener.onFailure(e); - } - } - @Override public CheckedRunnable getAsyncIndexRoutingReadAction( + String clusterUUID, String uploadedFilename, Index index, LatchedActionListener latchedActionListener ) { - int idx = uploadedFilename.lastIndexOf("/"); - String blobFileName = uploadedFilename.substring(idx + 1); - BlobContainer blobContainer = blobStoreRepository.blobStore() - .blobContainer(BlobPath.cleanPath().add(uploadedFilename.substring(0, idx))); - return () -> readAsync( - blobContainer, - blobFileName, - index, - threadPool.executor(ThreadPool.Names.REMOTE_STATE_READ), - ActionListener.wrap( - response -> latchedActionListener.onResponse(response.getIndexRoutingTable()), - latchedActionListener::onFailure - ) + ActionListener actionListener = ActionListener.wrap( + latchedActionListener::onResponse, + latchedActionListener::onFailure ); - } - private void readAsync( - BlobContainer blobContainer, - String name, - Index index, - ExecutorService executorService, - ActionListener listener - ) { - executorService.execute(() -> { - try { - listener.onResponse(read(blobContainer, name, index)); - } catch (Exception e) { - listener.onFailure(e); - } - }); - } + RemoteIndexRoutingTable remoteIndexRoutingTable = new RemoteIndexRoutingTable(uploadedFilename, index, clusterUUID, compressor); - private RemoteIndexRoutingTable read(BlobContainer blobContainer, String path, Index index) { - try { - return new RemoteIndexRoutingTable(blobContainer.readBlob(path), index); - } catch (IOException | AssertionError e) { - logger.error(() -> new ParameterizedMessage("RoutingTable read failed for path {}", path), e); - throw new RemoteStateTransferException("Failed to read RemoteRoutingTable from Manifest with error ", e); - } + return () -> remoteWritableEntityStore.readAsync(remoteIndexRoutingTable, actionListener); } @Override @@ -335,16 +205,6 @@ public List getUpdatedIndexRoutin }).collect(Collectors.toList()); } - private String getIndexRoutingFileName(long term, long version) { - return String.join( - DELIMITER, - INDEX_ROUTING_FILE_PREFIX, - RemoteStoreUtils.invertLong(term), - RemoteStoreUtils.invertLong(version), - RemoteStoreUtils.invertLong(System.currentTimeMillis()) - ); - } - @Override protected void doClose() throws IOException { if (blobStoreRepository != null) { @@ -377,5 +237,4 @@ public void deleteStaleIndexRoutingPaths(List stalePaths) throws IOExcep throw e; } } - } diff --git a/server/src/main/java/org/opensearch/cluster/routing/remote/NoopRemoteRoutingTableService.java b/server/src/main/java/org/opensearch/cluster/routing/remote/NoopRemoteRoutingTableService.java index 6236d107d0220..fde4f381b3e8a 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/remote/NoopRemoteRoutingTableService.java +++ b/server/src/main/java/org/opensearch/cluster/routing/remote/NoopRemoteRoutingTableService.java @@ -14,7 +14,6 @@ import org.opensearch.cluster.routing.IndexRoutingTable; import org.opensearch.cluster.routing.RoutingTable; import org.opensearch.common.CheckedRunnable; -import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.lifecycle.AbstractLifecycleComponent; import org.opensearch.core.index.Index; import org.opensearch.gateway.remote.ClusterMetadataManifest; @@ -42,11 +41,11 @@ public DiffableUtils.MapDiff getIndexRoutingAsyncAction( + public CheckedRunnable getAsyncIndexRoutingWriteAction( ClusterState clusterState, + String clusterUUID, IndexRoutingTable indexRouting, - LatchedActionListener latchedActionListener, - BlobPath clusterBasePath + LatchedActionListener latchedActionListener ) { // noop return () -> {}; @@ -64,6 +63,7 @@ public List getAllUploadedIndices @Override public CheckedRunnable getAsyncIndexRoutingReadAction( + String clusterUUID, String uploadedFilename, Index index, LatchedActionListener latchedActionListener diff --git a/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java b/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java index d455dfb58eabc..823487132c132 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java +++ b/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java @@ -14,7 +14,6 @@ import org.opensearch.cluster.routing.IndexRoutingTable; import org.opensearch.cluster.routing.RoutingTable; import org.opensearch.common.CheckedRunnable; -import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.lifecycle.LifecycleComponent; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; @@ -47,6 +46,7 @@ public IndexRoutingTable read(StreamInput in, String key) throws IOException { List getIndicesRouting(RoutingTable routingTable); CheckedRunnable getAsyncIndexRoutingReadAction( + String clusterUUID, String uploadedFilename, Index index, LatchedActionListener latchedActionListener @@ -62,11 +62,11 @@ DiffableUtils.MapDiff> RoutingTable after ); - CheckedRunnable getIndexRoutingAsyncAction( + CheckedRunnable getAsyncIndexRoutingWriteAction( ClusterState clusterState, + String clusterUUID, IndexRoutingTable indexRouting, - LatchedActionListener latchedActionListener, - BlobPath clusterBasePath + LatchedActionListener latchedActionListener ); List getAllUploadedIndicesRouting( diff --git a/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceFactory.java b/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceFactory.java index 82837191a30b7..cbdebcb3dea43 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceFactory.java +++ b/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceFactory.java @@ -10,7 +10,10 @@ import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; +import org.opensearch.core.compress.Compressor; +import org.opensearch.index.translog.transfer.BlobStoreTransferService; import org.opensearch.repositories.RepositoriesService; +import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.threadpool.ThreadPool; import java.util.function.Supplier; @@ -34,10 +37,23 @@ public static RemoteRoutingTableService getService( Supplier repositoriesService, Settings settings, ClusterSettings clusterSettings, - ThreadPool threadPool + ThreadPool threadPool, + Compressor compressor, + BlobStoreTransferService blobStoreTransferService, + BlobStoreRepository blobStoreRepository, + String clusterName ) { if (isRemoteRoutingTableEnabled(settings)) { - return new InternalRemoteRoutingTableService(repositoriesService, settings, clusterSettings, threadPool); + return new InternalRemoteRoutingTableService( + repositoriesService, + settings, + clusterSettings, + threadPool, + compressor, + blobStoreTransferService, + blobStoreRepository, + clusterName + ); } return new NoopRemoteRoutingTableService(); } diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 5dcf23ae52294..8f317129581b6 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -77,7 +77,6 @@ import org.opensearch.cluster.routing.allocation.decider.SameShardAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider; -import org.opensearch.cluster.routing.remote.InternalRemoteRoutingTableService; import org.opensearch.cluster.service.ClusterApplierService; import org.opensearch.cluster.service.ClusterManagerService; import org.opensearch.cluster.service.ClusterManagerTaskThrottler; @@ -107,6 +106,7 @@ import org.opensearch.gateway.ShardsBatchGatewayAllocator; import org.opensearch.gateway.remote.RemoteClusterStateCleanupManager; import org.opensearch.gateway.remote.RemoteClusterStateService; +import org.opensearch.gateway.remote.model.RemoteRoutingTableBlobStore; import org.opensearch.http.HttpTransportSettings; import org.opensearch.index.IndexModule; import org.opensearch.index.IndexSettings; @@ -730,8 +730,8 @@ public void apply(Settings value, Settings current, Settings previous) { RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING, IndicesService.CLUSTER_REMOTE_INDEX_RESTRICT_ASYNC_DURABILITY_SETTING, IndicesService.CLUSTER_INDEX_RESTRICT_REPLICATION_TYPE_SETTING, - InternalRemoteRoutingTableService.REMOTE_ROUTING_TABLE_PATH_TYPE_SETTING, - InternalRemoteRoutingTableService.REMOTE_ROUTING_TABLE_PATH_HASH_ALGO_SETTING, + RemoteRoutingTableBlobStore.REMOTE_ROUTING_TABLE_PATH_TYPE_SETTING, + RemoteRoutingTableBlobStore.REMOTE_ROUTING_TABLE_PATH_HASH_ALGO_SETTING, // Admission Control Settings AdmissionControlSettings.ADMISSION_CONTROL_TRANSPORT_LAYER_MODE, diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index 3e63f9114ea16..f6d75805e9483 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -98,7 +98,6 @@ import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.UploadedMetadataResults; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.clusterUUIDContainer; -import static org.opensearch.gateway.remote.RemoteClusterStateUtils.getClusterMetadataBasePath; import static org.opensearch.gateway.remote.model.RemoteClusterStateCustoms.CLUSTER_STATE_CUSTOM; import static org.opensearch.gateway.remote.model.RemoteCoordinationMetadata.COORDINATION_METADATA; import static org.opensearch.gateway.remote.model.RemoteCustomMetadata.CUSTOM_DELIMITER; @@ -146,7 +145,7 @@ public class RemoteClusterStateService implements Closeable { private final List indexMetadataUploadListeners; private BlobStoreRepository blobStoreRepository; private BlobStoreTransferService blobStoreTransferService; - private final RemoteRoutingTableService remoteRoutingTableService; + private RemoteRoutingTableService remoteRoutingTableService; private volatile TimeValue slowWriteLoggingThreshold; private final RemotePersistenceStats remoteStateStats; @@ -156,6 +155,7 @@ public class RemoteClusterStateService implements Closeable { private RemoteClusterStateAttributesManager remoteClusterStateAttributesManager; private RemoteManifestManager remoteManifestManager; private ClusterSettings clusterSettings; + private final ClusterService clusterService; private final NamedWriteableRegistry namedWriteableRegistry; private final String CLUSTER_STATE_UPLOAD_TIME_LOG_STRING = "writing cluster state for version [{}] took [{}ms]"; private final String METADATA_UPDATE_LOG_STRING = "wrote metadata for [{}] indices and skipped [{}] unchanged " @@ -197,13 +197,7 @@ public RemoteClusterStateService( this.remoteStateStats = new RemotePersistenceStats(); this.namedWriteableRegistry = namedWriteableRegistry; this.indexMetadataUploadListeners = indexMetadataUploadListeners; - this.remoteRoutingTableService = RemoteRoutingTableServiceFactory.getService( - repositoriesService, - settings, - clusterSettings, - threadPool - ); - this.remoteClusterStateCleanupManager = new RemoteClusterStateCleanupManager(this, clusterService, remoteRoutingTableService); + this.clusterService = clusterService; this.isPublicationEnabled = FeatureFlags.isEnabled(REMOTE_PUBLICATION_EXPERIMENTAL) && RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled(settings) && RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled(settings); @@ -664,15 +658,11 @@ UploadedMetadataResults writeMetadataInParallel( indicesRoutingToUpload.forEach(indexRoutingTable -> { uploadTasks.put( InternalRemoteRoutingTableService.INDEX_ROUTING_METADATA_PREFIX + indexRoutingTable.getIndex().getName(), - remoteRoutingTableService.getIndexRoutingAsyncAction( + remoteRoutingTableService.getAsyncIndexRoutingWriteAction( clusterState, + clusterState.metadata().clusterUUID(), indexRoutingTable, - listener, - getClusterMetadataBasePath( - blobStoreRepository, - clusterState.getClusterName().value(), - clusterState.metadata().clusterUUID() - ) + listener ) ); }); @@ -897,10 +887,20 @@ public void start() { final Repository repository = repositoriesService.get().repository(remoteStoreRepo); assert repository instanceof BlobStoreRepository : "Repository should be instance of BlobStoreRepository"; blobStoreRepository = (BlobStoreRepository) repository; - this.remoteRoutingTableService.start(); - blobStoreTransferService = new BlobStoreTransferService(getBlobStore(), threadpool); String clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings).value(); + blobStoreTransferService = new BlobStoreTransferService(getBlobStore(), threadpool); + this.remoteRoutingTableService = RemoteRoutingTableServiceFactory.getService( + repositoriesService, + settings, + clusterSettings, + threadpool, + blobStoreRepository.getCompressor(), + blobStoreTransferService, + blobStoreRepository, + ClusterName.CLUSTER_NAME_SETTING.get(settings).value() + ); + remoteGlobalMetadataManager = new RemoteGlobalMetadataManager( clusterSettings, clusterName, @@ -931,7 +931,10 @@ public void start() { namedWriteableRegistry, threadpool ); + remoteClusterStateCleanupManager = new RemoteClusterStateCleanupManager(this, clusterService, remoteRoutingTableService); + remoteClusterStateCleanupManager.start(); + remoteRoutingTableService.start(); } private void setSlowWriteLoggingThreshold(TimeValue slowWriteLoggingThreshold) { @@ -1034,6 +1037,7 @@ ClusterState readClusterStateInParallel( for (UploadedIndexMetadata indexRouting : indicesRoutingToRead) { asyncMetadataReadActions.add( remoteRoutingTableService.getAsyncIndexRoutingReadAction( + clusterUUID, indexRouting.getUploadedFilename(), new Index(indexRouting.getIndexName(), indexRouting.getIndexUUID()), routingTableLatchedActionListener diff --git a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterStateBlobStore.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterStateBlobStore.java index 1dd23443f1252..27a9adf35f8a3 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterStateBlobStore.java +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterStateBlobStore.java @@ -31,7 +31,7 @@ */ public class RemoteClusterStateBlobStore> implements RemoteWritableEntityStore { - private final BlobStoreTransferService transferService; + protected final BlobStoreTransferService transferService; private final BlobStoreRepository blobStoreRepository; private final String clusterName; private final ExecutorService executorService; @@ -88,9 +88,16 @@ public void readAsync(final U entity, final ActionListener listener) { }); } - private BlobPath getBlobPathForUpload(final AbstractRemoteWritableBlobEntity obj) { - BlobPath blobPath = blobStoreRepository.basePath() - .add(RemoteClusterStateUtils.encodeString(clusterName)) + public String getClusterName() { + return clusterName; + } + + public BlobPath getBasePath() { + return blobStoreRepository.basePath(); + } + + public BlobPath getBlobPathForUpload(final AbstractRemoteWritableBlobEntity obj) { + BlobPath blobPath = getBasePath().add(RemoteClusterStateUtils.encodeString(getClusterName())) .add("cluster-state") .add(obj.clusterUUID()); for (String token : obj.getBlobPathParameters().getPathTokens()) { @@ -99,7 +106,7 @@ private BlobPath getBlobPathForUpload(final AbstractRemoteWritableBlobEntity return blobPath; } - private BlobPath getBlobPathForDownload(final AbstractRemoteWritableBlobEntity obj) { + public BlobPath getBlobPathForDownload(final AbstractRemoteWritableBlobEntity obj) { String[] pathTokens = obj.getBlobPathTokens(); BlobPath blobPath = new BlobPath(); if (pathTokens == null || pathTokens.length < 1) { diff --git a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteRoutingTableBlobStore.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteRoutingTableBlobStore.java new file mode 100644 index 0000000000000..17354d16c1b0f --- /dev/null +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteRoutingTableBlobStore.java @@ -0,0 +1,107 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote.model; + +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity; +import org.opensearch.common.remote.RemoteWriteableEntity; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Setting; +import org.opensearch.gateway.remote.RemoteClusterStateUtils; +import org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable; +import org.opensearch.index.remote.RemoteStoreEnums; +import org.opensearch.index.remote.RemoteStorePathStrategy; +import org.opensearch.index.translog.transfer.BlobStoreTransferService; +import org.opensearch.repositories.blobstore.BlobStoreRepository; +import org.opensearch.threadpool.ThreadPool; + +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; + +import static org.opensearch.cluster.routing.remote.InternalRemoteRoutingTableService.INDEX_ROUTING_PATH_TOKEN; + +/** + * Extends the RemoteClusterStateBlobStore to support {@link RemoteIndexRoutingTable} + * + * @param which can be uploaded to / downloaded from blob store + * @param The concrete class implementing {@link RemoteWriteableEntity} which is used as a wrapper for IndexRoutingTable entity. + */ +public class RemoteRoutingTableBlobStore> extends + RemoteClusterStateBlobStore { + + /** + * This setting is used to set the remote routing table store blob store path type strategy. + */ + public static final Setting REMOTE_ROUTING_TABLE_PATH_TYPE_SETTING = new Setting<>( + "cluster.remote_store.routing_table.path_type", + RemoteStoreEnums.PathType.HASHED_PREFIX.toString(), + RemoteStoreEnums.PathType::parseString, + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + + /** + * This setting is used to set the remote routing table store blob store path hash algorithm strategy. + * This setting will come to effect if the {@link #REMOTE_ROUTING_TABLE_PATH_TYPE_SETTING} + * is either {@code HASHED_PREFIX} or {@code HASHED_INFIX}. + */ + public static final Setting REMOTE_ROUTING_TABLE_PATH_HASH_ALGO_SETTING = new Setting<>( + "cluster.remote_store.routing_table.path_hash_algo", + RemoteStoreEnums.PathHashAlgorithm.FNV_1A_BASE64.toString(), + RemoteStoreEnums.PathHashAlgorithm::parseString, + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + + public static final String INDEX_ROUTING_FILE_PREFIX = "index_routing"; + private RemoteStoreEnums.PathType pathType; + private RemoteStoreEnums.PathHashAlgorithm pathHashAlgo; + + public RemoteRoutingTableBlobStore( + BlobStoreTransferService blobStoreTransferService, + BlobStoreRepository blobStoreRepository, + String clusterName, + ThreadPool threadPool, + String executor, + ClusterSettings clusterSettings + ) { + super(blobStoreTransferService, blobStoreRepository, clusterName, threadPool, executor); + this.pathType = clusterSettings.get(REMOTE_ROUTING_TABLE_PATH_TYPE_SETTING); + this.pathHashAlgo = clusterSettings.get(REMOTE_ROUTING_TABLE_PATH_HASH_ALGO_SETTING); + clusterSettings.addSettingsUpdateConsumer(REMOTE_ROUTING_TABLE_PATH_TYPE_SETTING, this::setPathTypeSetting); + clusterSettings.addSettingsUpdateConsumer(REMOTE_ROUTING_TABLE_PATH_HASH_ALGO_SETTING, this::setPathHashAlgoSetting); + } + + @Override + public BlobPath getBlobPathForUpload(final AbstractRemoteWritableBlobEntity obj) { + assert obj.getBlobPathParameters().getPathTokens().size() == 1 : "Unexpected tokens in RemoteRoutingTableObject"; + BlobPath indexRoutingPath = getBasePath().add(RemoteClusterStateUtils.encodeString(getClusterName())) + .add("cluster-state") + .add(obj.clusterUUID()) + .add(INDEX_ROUTING_PATH_TOKEN); + BlobPath path = pathType.path( + RemoteStorePathStrategy.PathInput.builder() + .basePath(indexRoutingPath) + .indexUUID(String.join("", obj.getBlobPathParameters().getPathTokens())) + .build(), + pathHashAlgo + ); + return path; + } + + private void setPathTypeSetting(RemoteStoreEnums.PathType pathType) { + this.pathType = pathType; + } + + private void setPathHashAlgoSetting(RemoteStoreEnums.PathHashAlgorithm pathHashAlgo) { + this.pathHashAlgo = pathHashAlgo; + } + +} diff --git a/server/src/main/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableHeader.java b/server/src/main/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableHeader.java deleted file mode 100644 index 5baea6adba0c7..0000000000000 --- a/server/src/main/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableHeader.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.gateway.remote.routingtable; - -import org.apache.lucene.codecs.CodecUtil; -import org.apache.lucene.index.CorruptIndexException; -import org.apache.lucene.index.IndexFormatTooNewException; -import org.apache.lucene.index.IndexFormatTooOldException; -import org.apache.lucene.store.InputStreamDataInput; -import org.apache.lucene.store.OutputStreamDataOutput; -import org.opensearch.core.common.io.stream.StreamInput; -import org.opensearch.core.common.io.stream.StreamOutput; -import org.opensearch.core.common.io.stream.Writeable; - -import java.io.EOFException; -import java.io.IOException; - -/** - * The stored header information for the individual index routing table - */ -public class IndexRoutingTableHeader implements Writeable { - - public static final String INDEX_ROUTING_HEADER_CODEC = "index_routing_header_codec"; - public static final int INITIAL_VERSION = 1; - public static final int CURRENT_VERSION = INITIAL_VERSION; - private final String indexName; - - public IndexRoutingTableHeader(String indexName) { - this.indexName = indexName; - } - - /** - * Reads the contents on the stream into the corresponding {@link IndexRoutingTableHeader} - * - * @param in streamInput - * @throws IOException exception thrown on failing to read from stream. - */ - public IndexRoutingTableHeader(StreamInput in) throws IOException { - try { - readHeaderVersion(in); - indexName = in.readString(); - } catch (EOFException e) { - throw new IOException("index routing header truncated", e); - } - } - - private void readHeaderVersion(final StreamInput in) throws IOException { - try { - CodecUtil.checkHeader(new InputStreamDataInput(in), INDEX_ROUTING_HEADER_CODEC, INITIAL_VERSION, CURRENT_VERSION); - } catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException e) { - throw new IOException("index routing table header corrupted", e); - } - } - - /** - * Write the IndexRoutingTable to given stream. - * - * @param out stream to write - * @throws IOException exception thrown on failing to write to stream. - */ - public void writeTo(StreamOutput out) throws IOException { - try { - CodecUtil.writeHeader(new OutputStreamDataOutput(out), INDEX_ROUTING_HEADER_CODEC, CURRENT_VERSION); - out.writeString(indexName); - out.flush(); - } catch (IOException e) { - throw new IOException("Failed to write IndexRoutingTable header", e); - } - } - - public String getIndexName() { - return indexName; - } - -} diff --git a/server/src/main/java/org/opensearch/gateway/remote/routingtable/RemoteIndexRoutingTable.java b/server/src/main/java/org/opensearch/gateway/remote/routingtable/RemoteIndexRoutingTable.java index 17c55190da07f..00e789ecb595b 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/routingtable/RemoteIndexRoutingTable.java +++ b/server/src/main/java/org/opensearch/gateway/remote/routingtable/RemoteIndexRoutingTable.java @@ -8,93 +8,113 @@ package org.opensearch.gateway.remote.routingtable; + import org.opensearch.cluster.routing.IndexRoutingTable; import org.opensearch.cluster.routing.IndexShardRoutingTable; -import org.opensearch.core.common.io.stream.BufferedChecksumStreamInput; -import org.opensearch.core.common.io.stream.BufferedChecksumStreamOutput; -import org.opensearch.core.common.io.stream.InputStreamStreamInput; -import org.opensearch.core.common.io.stream.StreamOutput; -import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.common.io.Streams; +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.common.lucene.store.IndexOutputOutputStream; +import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity; +import org.opensearch.common.remote.BlobPathParameters; +import org.opensearch.core.compress.Compressor; import org.opensearch.core.index.Index; +import org.opensearch.gateway.remote.ClusterMetadataManifest; +import org.opensearch.index.remote.RemoteStoreUtils; +import org.opensearch.repositories.blobstore.ChecksumWritableBlobStoreFormat; + -import java.io.EOFException; import java.io.IOException; import java.io.InputStream; +import java.util.List; + +import static org.opensearch.cluster.routing.remote.InternalRemoteRoutingTableService.INDEX_ROUTING_METADATA_PREFIX; +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER; +import static org.opensearch.gateway.remote.model.RemoteRoutingTableBlobStore.INDEX_ROUTING_FILE_PREFIX; /** * Remote store object for IndexRoutingTable */ -public class RemoteIndexRoutingTable implements Writeable { +public class RemoteIndexRoutingTable extends AbstractRemoteWritableBlobEntity { - private final IndexRoutingTable indexRoutingTable; + public static final String INDEX_ROUTING_TABLE = "index_routing_table"; + private IndexRoutingTable indexRoutingTable; + private final Index index; + private long term; + private long version; + public static final ChecksumWritableBlobStoreFormat INDEX_ROUTING_TABLE_FORMAT = new ChecksumWritableBlobStoreFormat<>( + "index-routing-table", + IndexRoutingTable::readFrom + ); - public RemoteIndexRoutingTable(IndexRoutingTable indexRoutingTable) { + public RemoteIndexRoutingTable( + IndexRoutingTable indexRoutingTable, + String clusterUUID, + Compressor compressor, + long term, + long version + ) { + super(clusterUUID, compressor, null); + this.index = indexRoutingTable.getIndex(); this.indexRoutingTable = indexRoutingTable; + this.term = term; + this.version = version; + this.blobFileName = generateBlobFileName(); } /** * Reads data from inputStream and creates RemoteIndexRoutingTable object with the {@link IndexRoutingTable} - * @param inputStream input stream with index routing data + * @param blobName name of the blob, which contains the index routing data * @param index index for the current routing data - * @throws IOException exception thrown on failing to read from stream. */ - public RemoteIndexRoutingTable(InputStream inputStream, Index index) throws IOException { - try { - try (BufferedChecksumStreamInput in = new BufferedChecksumStreamInput(new InputStreamStreamInput(inputStream), "assertion")) { - // Read the Table Header first and confirm the index - IndexRoutingTableHeader indexRoutingTableHeader = new IndexRoutingTableHeader(in); - assert indexRoutingTableHeader.getIndexName().equals(index.getName()); - - int numberOfShardRouting = in.readVInt(); - IndexRoutingTable.Builder indicesRoutingTable = IndexRoutingTable.builder(index); - for (int idx = 0; idx < numberOfShardRouting; idx++) { - IndexShardRoutingTable indexShardRoutingTable = IndexShardRoutingTable.Builder.readFrom(in); - indicesRoutingTable.addIndexShard(indexShardRoutingTable); - } - verifyCheckSum(in); - indexRoutingTable = indicesRoutingTable.build(); - } - } catch (EOFException e) { - throw new IOException("Indices Routing table is corrupted", e); - } + public RemoteIndexRoutingTable(String blobName, Index index, String clusterUUID, Compressor compressor) { + super(clusterUUID, compressor, null); + this.index = index; + this.term = -1; + this.version = -1; + this.blobName = blobName; } public IndexRoutingTable getIndexRoutingTable() { return indexRoutingTable; } - /** - * Writes {@link IndexRoutingTable} to the given stream - * @param streamOutput output stream to write - * @throws IOException exception thrown on failing to write to stream. - */ + public Index getIndex() { + return index; + } + + @Override + public BlobPathParameters getBlobPathParameters() { + return new BlobPathParameters(List.of(indexRoutingTable.getIndex().getUUID()), ""); + } + @Override - public void writeTo(StreamOutput streamOutput) throws IOException { - try { - BufferedChecksumStreamOutput out = new BufferedChecksumStreamOutput(streamOutput); - IndexRoutingTableHeader indexRoutingTableHeader = new IndexRoutingTableHeader(indexRoutingTable.getIndex().getName()); - indexRoutingTableHeader.writeTo(out); - out.writeVInt(indexRoutingTable.shards().size()); - for (IndexShardRoutingTable next : indexRoutingTable) { - IndexShardRoutingTable.Builder.writeTo(next, out); - } - out.writeLong(out.getChecksum()); - out.flush(); - } catch (IOException e) { - throw new IOException("Failed to write IndexRoutingTable to stream", e); - } + public String getType() { + return INDEX_ROUTING_TABLE; } - private void verifyCheckSum(BufferedChecksumStreamInput in) throws IOException { - long expectedChecksum = in.getChecksum(); - long readChecksum = in.readLong(); - if (readChecksum != expectedChecksum) { - throw new IOException( - "checksum verification failed - expected: 0x" - + Long.toHexString(expectedChecksum) - + ", got: 0x" - + Long.toHexString(readChecksum) - ); - } + @Override + public String generateBlobFileName() { + return String.join( + DELIMITER, + INDEX_ROUTING_FILE_PREFIX, + RemoteStoreUtils.invertLong(term), + RemoteStoreUtils.invertLong(version), + RemoteStoreUtils.invertLong(System.currentTimeMillis()) + ); + } + + @Override + public ClusterMetadataManifest.UploadedMetadata getUploadedMetadata() { + return new ClusterMetadataManifest.UploadedIndexMetadata(index.getName(), index.getUUID(), blobName, INDEX_ROUTING_METADATA_PREFIX); + } + + @Override + public InputStream serialize() throws IOException { + return INDEX_ROUTING_TABLE_FORMAT.serialize(indexRoutingTable, generateBlobFileName(), getCompressor()).streamInput(); + } + + @Override + public IndexRoutingTable deserialize(InputStream in) throws IOException { + return INDEX_ROUTING_TABLE_FORMAT.deserialize(blobName, Streams.readFully(in)); } } From f75135d39e57e18e97b4f90c0d9d46f1e828a645 Mon Sep 17 00:00:00 2001 From: Arpit Bandejiya Date: Mon, 8 Jul 2024 17:38:30 +0530 Subject: [PATCH 2/6] Fix test files Signed-off-by: Arpit Bandejiya --- .../InternalRemoteRoutingTableService.java | 33 +- .../remote/NoopRemoteRoutingTableService.java | 6 +- .../remote/RemoteRoutingTableService.java | 10 +- .../common/remote/BlobPathParameters.java | 4 + .../remote/RemoteClusterStateService.java | 12 +- .../model/RemoteRoutingTableBlobStore.java | 9 +- .../routingtable/RemoteIndexRoutingTable.java | 47 ++- ...RemoteRoutingTableServiceFactoryTests.java | 23 +- .../RemoteRoutingTableServiceTests.java | 292 +++++------------ .../remote/ClusterMetadataManifestTests.java | 4 +- .../RemoteClusterStateServiceTests.java | 11 +- .../IndexRoutingTableHeaderTests.java | 32 -- .../RemoteIndexRoutingTableTests.java | 308 +++++++++++++++--- 13 files changed, 442 insertions(+), 349 deletions(-) delete mode 100644 server/src/test/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableHeaderTests.java diff --git a/server/src/main/java/org/opensearch/cluster/routing/remote/InternalRemoteRoutingTableService.java b/server/src/main/java/org/opensearch/cluster/routing/remote/InternalRemoteRoutingTableService.java index 3a1ab73b398e3..21e6cdb74c75b 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/remote/InternalRemoteRoutingTableService.java +++ b/server/src/main/java/org/opensearch/cluster/routing/remote/InternalRemoteRoutingTableService.java @@ -12,7 +12,6 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.action.LatchedActionListener; -import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.DiffableUtils; import org.opensearch.cluster.routing.IndexRoutingTable; import org.opensearch.cluster.routing.RoutingTable; @@ -25,7 +24,6 @@ import org.opensearch.common.util.io.IOUtils; import org.opensearch.core.action.ActionListener; import org.opensearch.core.compress.Compressor; -import org.opensearch.core.index.Index; import org.opensearch.gateway.remote.ClusterMetadataManifest; import org.opensearch.gateway.remote.RemoteStateTransferException; import org.opensearch.gateway.remote.model.RemoteRoutingTableBlobStore; @@ -56,17 +54,13 @@ */ public class InternalRemoteRoutingTableService extends AbstractLifecycleComponent implements RemoteRoutingTableService { - public static final String INDEX_ROUTING_PATH_TOKEN = "index-routing"; - public static final String INDEX_ROUTING_FILE_PREFIX = "index_routing"; - public static final String INDEX_ROUTING_METADATA_PREFIX = "indexRouting--"; - private static final Logger logger = LogManager.getLogger(InternalRemoteRoutingTableService.class); private final Settings settings; private final Supplier repositoriesService; private final Compressor compressor; - private final RemoteWritableEntityStore remoteWritableEntityStore; + private final RemoteWritableEntityStore remoteIndexRoutingTableStore; private BlobStoreRepository blobStoreRepository; - private ThreadPool threadPool; + private final ThreadPool threadPool; public InternalRemoteRoutingTableService( Supplier repositoriesService, @@ -83,7 +77,7 @@ public InternalRemoteRoutingTableService( this.settings = settings; this.threadPool = threadpool; this.compressor = compressor; - this.remoteWritableEntityStore = new RemoteRoutingTableBlobStore<>( + this.remoteIndexRoutingTableStore = new RemoteRoutingTableBlobStore<>( blobStoreTransferService, blobStoreRepository, clusterName, @@ -117,7 +111,8 @@ public DiffableUtils.MapDiff getAsyncIndexRoutingWriteAction( - ClusterState clusterState, String clusterUUID, + long term, + long version, IndexRoutingTable indexRouting, LatchedActionListener latchedActionListener ) { - RemoteIndexRoutingTable remoteIndexRoutingTable = new RemoteIndexRoutingTable( - indexRouting, - clusterUUID, - compressor, - clusterState.term(), - clusterState.version() - ); + RemoteIndexRoutingTable remoteIndexRoutingTable = new RemoteIndexRoutingTable(indexRouting, clusterUUID, compressor, term, version); ActionListener completionListener = ActionListener.wrap( resp -> latchedActionListener.onResponse(remoteIndexRoutingTable.getUploadedMetadata()), @@ -146,7 +136,7 @@ public CheckedRunnable getAsyncIndexRoutingWriteAction( ) ); - return () -> remoteWritableEntityStore.writeAsync(remoteIndexRoutingTable, completionListener); + return () -> remoteIndexRoutingTableStore.writeAsync(remoteIndexRoutingTable, completionListener); } /** @@ -177,7 +167,6 @@ public List getAllUploadedIndices public CheckedRunnable getAsyncIndexRoutingReadAction( String clusterUUID, String uploadedFilename, - Index index, LatchedActionListener latchedActionListener ) { @@ -186,9 +175,9 @@ public CheckedRunnable getAsyncIndexRoutingReadAction( latchedActionListener::onFailure ); - RemoteIndexRoutingTable remoteIndexRoutingTable = new RemoteIndexRoutingTable(uploadedFilename, index, clusterUUID, compressor); + RemoteIndexRoutingTable remoteIndexRoutingTable = new RemoteIndexRoutingTable(uploadedFilename, clusterUUID, compressor); - return () -> remoteWritableEntityStore.readAsync(remoteIndexRoutingTable, actionListener); + return () -> remoteIndexRoutingTableStore.readAsync(remoteIndexRoutingTable, actionListener); } @Override diff --git a/server/src/main/java/org/opensearch/cluster/routing/remote/NoopRemoteRoutingTableService.java b/server/src/main/java/org/opensearch/cluster/routing/remote/NoopRemoteRoutingTableService.java index fde4f381b3e8a..4636e492df28f 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/remote/NoopRemoteRoutingTableService.java +++ b/server/src/main/java/org/opensearch/cluster/routing/remote/NoopRemoteRoutingTableService.java @@ -9,13 +9,11 @@ package org.opensearch.cluster.routing.remote; import org.opensearch.action.LatchedActionListener; -import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.DiffableUtils; import org.opensearch.cluster.routing.IndexRoutingTable; import org.opensearch.cluster.routing.RoutingTable; import org.opensearch.common.CheckedRunnable; import org.opensearch.common.lifecycle.AbstractLifecycleComponent; -import org.opensearch.core.index.Index; import org.opensearch.gateway.remote.ClusterMetadataManifest; import java.io.IOException; @@ -42,8 +40,9 @@ public DiffableUtils.MapDiff getAsyncIndexRoutingWriteAction( - ClusterState clusterState, String clusterUUID, + long term, + long version, IndexRoutingTable indexRouting, LatchedActionListener latchedActionListener ) { @@ -65,7 +64,6 @@ public List getAllUploadedIndices public CheckedRunnable getAsyncIndexRoutingReadAction( String clusterUUID, String uploadedFilename, - Index index, LatchedActionListener latchedActionListener ) { // noop diff --git a/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java b/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java index 823487132c132..828c65353cdd9 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java +++ b/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java @@ -9,7 +9,6 @@ package org.opensearch.cluster.routing.remote; import org.opensearch.action.LatchedActionListener; -import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.DiffableUtils; import org.opensearch.cluster.routing.IndexRoutingTable; import org.opensearch.cluster.routing.RoutingTable; @@ -17,7 +16,6 @@ import org.opensearch.common.lifecycle.LifecycleComponent; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; -import org.opensearch.core.index.Index; import org.opensearch.gateway.remote.ClusterMetadataManifest; import java.io.IOException; @@ -30,8 +28,8 @@ * @opensearch.internal */ public interface RemoteRoutingTableService extends LifecycleComponent { - public static final DiffableUtils.NonDiffableValueSerializer CUSTOM_ROUTING_TABLE_VALUE_SERIALIZER = - new DiffableUtils.NonDiffableValueSerializer() { + DiffableUtils.NonDiffableValueSerializer CUSTOM_ROUTING_TABLE_VALUE_SERIALIZER = + new DiffableUtils.NonDiffableValueSerializer<>() { @Override public void write(IndexRoutingTable value, StreamOutput out) throws IOException { value.writeTo(out); @@ -48,7 +46,6 @@ public IndexRoutingTable read(StreamInput in, String key) throws IOException { CheckedRunnable getAsyncIndexRoutingReadAction( String clusterUUID, String uploadedFilename, - Index index, LatchedActionListener latchedActionListener ); @@ -63,8 +60,9 @@ DiffableUtils.MapDiff> ); CheckedRunnable getAsyncIndexRoutingWriteAction( - ClusterState clusterState, String clusterUUID, + long term, + long version, IndexRoutingTable indexRouting, LatchedActionListener latchedActionListener ); diff --git a/server/src/main/java/org/opensearch/common/remote/BlobPathParameters.java b/server/src/main/java/org/opensearch/common/remote/BlobPathParameters.java index 58c73a804b66a..7d15e2875886f 100644 --- a/server/src/main/java/org/opensearch/common/remote/BlobPathParameters.java +++ b/server/src/main/java/org/opensearch/common/remote/BlobPathParameters.java @@ -24,6 +24,10 @@ public BlobPathParameters(final List pathTokens, final String filePrefix this.filePrefix = filePrefix; } + public BlobPathParameters(final List pathTokens) { + this(pathTokens, null); + } + public List getPathTokens() { return pathTokens; } diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index f6d75805e9483..a64aabc4f8306 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -26,7 +26,6 @@ import org.opensearch.cluster.node.DiscoveryNodes.Builder; import org.opensearch.cluster.routing.IndexRoutingTable; import org.opensearch.cluster.routing.RoutingTable; -import org.opensearch.cluster.routing.remote.InternalRemoteRoutingTableService; import org.opensearch.cluster.routing.remote.RemoteRoutingTableService; import org.opensearch.cluster.routing.remote.RemoteRoutingTableServiceFactory; import org.opensearch.cluster.service.ClusterService; @@ -43,7 +42,6 @@ import org.opensearch.common.util.io.IOUtils; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.io.stream.NamedWriteableRegistry; -import org.opensearch.core.index.Index; import org.opensearch.core.xcontent.ToXContent; import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata; import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadataAttribute; @@ -106,6 +104,8 @@ import static org.opensearch.gateway.remote.model.RemotePersistentSettingsMetadata.SETTING_METADATA; import static org.opensearch.gateway.remote.model.RemoteTemplatesMetadata.TEMPLATES_METADATA; import static org.opensearch.gateway.remote.model.RemoteTransientSettingsMetadata.TRANSIENT_SETTING_METADATA; +import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_TABLE; +import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_TABLE_PREFIX; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled; /** @@ -657,10 +657,11 @@ UploadedMetadataResults writeMetadataInParallel( }); indicesRoutingToUpload.forEach(indexRoutingTable -> { uploadTasks.put( - InternalRemoteRoutingTableService.INDEX_ROUTING_METADATA_PREFIX + indexRoutingTable.getIndex().getName(), + String.join(CUSTOM_DELIMITER, INDEX_ROUTING_TABLE, indexRoutingTable.getIndex().getName()), remoteRoutingTableService.getAsyncIndexRoutingWriteAction( - clusterState, clusterState.metadata().clusterUUID(), + clusterState.term(), + clusterState.version(), indexRoutingTable, listener ) @@ -713,7 +714,7 @@ UploadedMetadataResults writeMetadataInParallel( UploadedMetadataResults response = new UploadedMetadataResults(); results.forEach((name, uploadedMetadata) -> { if (uploadedMetadata.getClass().equals(UploadedIndexMetadata.class) - && uploadedMetadata.getComponent().contains(InternalRemoteRoutingTableService.INDEX_ROUTING_METADATA_PREFIX)) { + && uploadedMetadata.getComponent().contains(INDEX_ROUTING_TABLE_PREFIX)) { response.uploadedIndicesRoutingMetadata.add((UploadedIndexMetadata) uploadedMetadata); } else if (name.startsWith(CUSTOM_METADATA)) { // component name for custom metadata will look like custom-- @@ -1039,7 +1040,6 @@ ClusterState readClusterStateInParallel( remoteRoutingTableService.getAsyncIndexRoutingReadAction( clusterUUID, indexRouting.getUploadedFilename(), - new Index(indexRouting.getIndexName(), indexRouting.getIndexUUID()), routingTableLatchedActionListener ) ); diff --git a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteRoutingTableBlobStore.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteRoutingTableBlobStore.java index 17354d16c1b0f..ecadb3159bb9f 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteRoutingTableBlobStore.java +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteRoutingTableBlobStore.java @@ -21,11 +21,7 @@ import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.threadpool.ThreadPool; -import java.io.EOFException; -import java.io.IOException; -import java.io.InputStream; - -import static org.opensearch.cluster.routing.remote.InternalRemoteRoutingTableService.INDEX_ROUTING_PATH_TOKEN; +import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_TABLE; /** * Extends the RemoteClusterStateBlobStore to support {@link RemoteIndexRoutingTable} @@ -60,7 +56,6 @@ public class RemoteRoutingTableBlobStore { - public static final String INDEX_ROUTING_TABLE = "index_routing_table"; + public static final String INDEX_ROUTING_TABLE = "index-routing"; + public static final String INDEX_ROUTING_TABLE_PREFIX = "index-routing--"; + public static final String INDEX_ROUTING_METADATA_PREFIX = "indexRouting--"; + public static final String INDEX_ROUTING_FILE = "index_routing"; private IndexRoutingTable indexRoutingTable; private final Index index; private long term; private long version; - public static final ChecksumWritableBlobStoreFormat INDEX_ROUTING_TABLE_FORMAT = new ChecksumWritableBlobStoreFormat<>( - "index-routing-table", - IndexRoutingTable::readFrom - ); + public static final ChecksumWritableBlobStoreFormat INDEX_ROUTING_TABLE_FORMAT = + new ChecksumWritableBlobStoreFormat<>("index-routing-table", IndexRoutingTable::readFrom); public RemoteIndexRoutingTable( IndexRoutingTable indexRoutingTable, @@ -58,17 +52,17 @@ public RemoteIndexRoutingTable( this.indexRoutingTable = indexRoutingTable; this.term = term; this.version = version; - this.blobFileName = generateBlobFileName(); } /** * Reads data from inputStream and creates RemoteIndexRoutingTable object with the {@link IndexRoutingTable} * @param blobName name of the blob, which contains the index routing data - * @param index index for the current routing data + * @param clusterUUID UUID of the cluster + * @param compressor Compressor object */ - public RemoteIndexRoutingTable(String blobName, Index index, String clusterUUID, Compressor compressor) { + public RemoteIndexRoutingTable(String blobName, String clusterUUID, Compressor compressor) { super(clusterUUID, compressor, null); - this.index = index; + this.index = null; this.term = -1; this.version = -1; this.blobName = blobName; @@ -84,7 +78,7 @@ public Index getIndex() { @Override public BlobPathParameters getBlobPathParameters() { - return new BlobPathParameters(List.of(indexRoutingTable.getIndex().getUUID()), ""); + return new BlobPathParameters(List.of(indexRoutingTable.getIndex().getUUID())); } @Override @@ -94,17 +88,22 @@ public String getType() { @Override public String generateBlobFileName() { - return String.join( - DELIMITER, - INDEX_ROUTING_FILE_PREFIX, - RemoteStoreUtils.invertLong(term), - RemoteStoreUtils.invertLong(version), - RemoteStoreUtils.invertLong(System.currentTimeMillis()) - ); + if (blobFileName == null) { + blobFileName = String.join( + DELIMITER, + INDEX_ROUTING_FILE, + RemoteStoreUtils.invertLong(term), + RemoteStoreUtils.invertLong(version), + RemoteStoreUtils.invertLong(System.currentTimeMillis()) + ); + } + return blobFileName; } @Override public ClusterMetadataManifest.UploadedMetadata getUploadedMetadata() { + assert blobName != null; + assert index != null; return new ClusterMetadataManifest.UploadedIndexMetadata(index.getName(), index.getUUID(), blobName, INDEX_ROUTING_METADATA_PREFIX); } diff --git a/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceFactoryTests.java b/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceFactoryTests.java index 39294ee8da41e..5fd8f087e2073 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceFactoryTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceFactoryTests.java @@ -11,7 +11,11 @@ import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.FeatureFlags; +import org.opensearch.core.compress.Compressor; +import org.opensearch.core.compress.NoneCompressor; +import org.opensearch.index.translog.transfer.BlobStoreTransferService; import org.opensearch.repositories.RepositoriesService; +import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.repositories.fs.FsRepository; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.TestThreadPool; @@ -22,6 +26,7 @@ import static org.opensearch.common.util.FeatureFlags.REMOTE_PUBLICATION_EXPERIMENTAL; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY; +import static org.mockito.Mockito.mock; public class RemoteRoutingTableServiceFactoryTests extends OpenSearchTestCase { @@ -36,11 +41,18 @@ public void teardown() throws Exception { public void testGetServiceWhenRemoteRoutingDisabled() { Settings settings = Settings.builder().build(); + BlobStoreRepository blobStoreRepository = mock(BlobStoreRepository.class); + Compressor compressor = new NoneCompressor(); + BlobStoreTransferService blobStoreTransferService = mock(BlobStoreTransferService.class); RemoteRoutingTableService service = RemoteRoutingTableServiceFactory.getService( repositoriesService, settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), - threadPool + threadPool, + compressor, + blobStoreTransferService, + blobStoreRepository, + "test-cluster" ); assertTrue(service instanceof NoopRemoteRoutingTableService); } @@ -50,13 +62,20 @@ public void testGetServiceWhenRemoteRoutingEnabled() { .put("node.attr." + REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY, "routing_repository") .put(FsRepository.REPOSITORIES_COMPRESS_SETTING.getKey(), false) .build(); + BlobStoreRepository blobStoreRepository = mock(BlobStoreRepository.class); + Compressor compressor = new NoneCompressor(); + BlobStoreTransferService blobStoreTransferService = mock(BlobStoreTransferService.class); Settings nodeSettings = Settings.builder().put(REMOTE_PUBLICATION_EXPERIMENTAL, "true").build(); FeatureFlags.initializeFeatureFlags(nodeSettings); RemoteRoutingTableService service = RemoteRoutingTableServiceFactory.getService( repositoriesService, settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), - threadPool + threadPool, + compressor, + blobStoreTransferService, + blobStoreRepository, + "test-cluster" ); assertTrue(service instanceof InternalRemoteRoutingTableService); } diff --git a/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java b/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java index 839ebe1ff8301..e287fb9799d1d 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java @@ -19,27 +19,24 @@ import org.opensearch.cluster.routing.IndexRoutingTable; import org.opensearch.cluster.routing.RoutingTable; import org.opensearch.cluster.service.ClusterService; -import org.opensearch.common.CheckedRunnable; -import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer; import org.opensearch.common.blobstore.BlobContainer; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.blobstore.BlobStore; -import org.opensearch.common.blobstore.stream.write.WriteContext; import org.opensearch.common.blobstore.stream.write.WritePriority; import org.opensearch.common.compress.DeflateCompressor; -import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.FeatureFlags; +import org.opensearch.common.util.TestCapturingListener; import org.opensearch.core.action.ActionListener; -import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.compress.Compressor; +import org.opensearch.core.compress.NoneCompressor; import org.opensearch.core.index.Index; import org.opensearch.gateway.remote.ClusterMetadataManifest; -import org.opensearch.gateway.remote.RemoteStateTransferException; -import org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable; import org.opensearch.index.remote.RemoteStoreEnums; import org.opensearch.index.remote.RemoteStorePathStrategy; import org.opensearch.index.remote.RemoteStoreUtils; +import org.opensearch.index.translog.transfer.BlobStoreTransferService; import org.opensearch.repositories.FilterRepository; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.RepositoryMissingException; @@ -51,33 +48,34 @@ import org.junit.Before; import java.io.IOException; +import java.io.InputStream; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Locale; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; import java.util.function.Supplier; -import org.mockito.ArgumentCaptor; import org.mockito.Mockito; -import static org.opensearch.cluster.routing.remote.InternalRemoteRoutingTableService.INDEX_ROUTING_FILE_PREFIX; -import static org.opensearch.cluster.routing.remote.InternalRemoteRoutingTableService.INDEX_ROUTING_PATH_TOKEN; import static org.opensearch.common.util.FeatureFlags.REMOTE_PUBLICATION_EXPERIMENTAL; import static org.opensearch.gateway.remote.ClusterMetadataManifestTests.randomUploadedIndexMetadataList; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER; +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.PATH_DELIMITER; +import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_TABLE; +import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_TABLE_FORMAT; +import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_TABLE_PREFIX; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY; -import static org.mockito.ArgumentMatchers.anyLong; +import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.mockito.ArgumentMatchers.anyIterable; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.ArgumentMatchers.startsWith; import static org.mockito.Mockito.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -92,6 +90,8 @@ public class RemoteRoutingTableServiceTests extends OpenSearchTestCase { private BlobPath basePath; private ClusterSettings clusterSettings; private ClusterService clusterService; + private Compressor compressor; + private BlobStoreTransferService blobStoreTransferService; private final ThreadPool threadPool = new TestThreadPool(getClass().getName()); @Before @@ -105,6 +105,7 @@ public void setup() { .build(); clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); clusterService = mock(ClusterService.class); + blobStoreTransferService = mock(BlobStoreTransferService.class); when(clusterService.getClusterSettings()).thenReturn(clusterSettings); blobStoreRepository = mock(BlobStoreRepository.class); when(blobStoreRepository.getCompressor()).thenReturn(new DeflateCompressor()); @@ -114,14 +115,18 @@ public void setup() { when(blobStoreRepository.blobStore()).thenReturn(blobStore); Settings nodeSettings = Settings.builder().put(REMOTE_PUBLICATION_EXPERIMENTAL, "true").build(); FeatureFlags.initializeFeatureFlags(nodeSettings); - + compressor = new NoneCompressor(); basePath = BlobPath.cleanPath().add("base-path"); - + when(blobStoreRepository.basePath()).thenReturn(basePath); remoteRoutingTableService = new InternalRemoteRoutingTableService( repositoriesServiceSupplier, settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), - threadPool + threadPool, + compressor, + blobStoreTransferService, + blobStoreRepository, + "test-cluster" ); } @@ -141,7 +146,11 @@ public void testFailInitializationWhenRemoteRoutingDisabled() { repositoriesServiceSupplier, settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), - threadPool + threadPool, + compressor, + blobStoreTransferService, + blobStoreRepository, + "test-cluster" ) ); } @@ -347,136 +356,13 @@ public void testGetIndicesRoutingMapDiffIndexDeleted() { assertEquals(indexName, diff.getDeletes().get(0)); } - public void testGetIndexRoutingAsyncAction() throws IOException { - String indexName = randomAlphaOfLength(randomIntBetween(1, 50)); - ClusterState clusterState = createClusterState(indexName); - BlobPath expectedPath = getPath(); - - LatchedActionListener listener = mock(LatchedActionListener.class); - when(blobStore.blobContainer(expectedPath)).thenReturn(blobContainer); - - remoteRoutingTableService.start(); - CheckedRunnable runnable = remoteRoutingTableService.getIndexRoutingAsyncAction( - clusterState, - clusterState.routingTable().getIndicesRouting().get(indexName), - listener, - basePath - ); - assertNotNull(runnable); - runnable.run(); - - String expectedFilePrefix = String.join( - DELIMITER, - INDEX_ROUTING_FILE_PREFIX, - RemoteStoreUtils.invertLong(clusterState.term()), - RemoteStoreUtils.invertLong(clusterState.version()) - ); - verify(blobContainer, times(1)).writeBlob(startsWith(expectedFilePrefix), any(StreamInput.class), anyLong(), eq(true)); - verify(listener, times(1)).onResponse(any(ClusterMetadataManifest.UploadedMetadata.class)); - } - - public void testGetIndexRoutingAsyncActionFailureInBlobRepo() throws IOException { - String indexName = randomAlphaOfLength(randomIntBetween(1, 50)); - ClusterState clusterState = createClusterState(indexName); - BlobPath expectedPath = getPath(); - - LatchedActionListener listener = mock(LatchedActionListener.class); - when(blobStore.blobContainer(expectedPath)).thenReturn(blobContainer); - doThrow(new IOException("testing failure")).when(blobContainer).writeBlob(anyString(), any(StreamInput.class), anyLong(), eq(true)); - - remoteRoutingTableService.start(); - CheckedRunnable runnable = remoteRoutingTableService.getIndexRoutingAsyncAction( - clusterState, - clusterState.routingTable().getIndicesRouting().get(indexName), - listener, - basePath - ); - assertNotNull(runnable); - runnable.run(); - String expectedFilePrefix = String.join( - DELIMITER, - INDEX_ROUTING_FILE_PREFIX, - RemoteStoreUtils.invertLong(clusterState.term()), - RemoteStoreUtils.invertLong(clusterState.version()) - ); - verify(blobContainer, times(1)).writeBlob(startsWith(expectedFilePrefix), any(StreamInput.class), anyLong(), eq(true)); - verify(listener, times(1)).onFailure(any(RemoteStateTransferException.class)); - } - - public void testGetIndexRoutingAsyncActionAsyncRepo() throws IOException { - String indexName = randomAlphaOfLength(randomIntBetween(1, 50)); - ClusterState clusterState = createClusterState(indexName); - BlobPath expectedPath = getPath(); - - LatchedActionListener listener = mock(LatchedActionListener.class); - blobContainer = mock(AsyncMultiStreamBlobContainer.class); - when(blobStore.blobContainer(expectedPath)).thenReturn(blobContainer); - ArgumentCaptor> actionListenerArgumentCaptor = ArgumentCaptor.forClass(ActionListener.class); - ArgumentCaptor writeContextArgumentCaptor = ArgumentCaptor.forClass(WriteContext.class); - ConcurrentHashMap capturedWriteContext = new ConcurrentHashMap<>(); - - doAnswer((i) -> { - actionListenerArgumentCaptor.getValue().onResponse(null); - WriteContext writeContext = writeContextArgumentCaptor.getValue(); - capturedWriteContext.put(writeContext.getFileName().split(DELIMITER)[0], writeContextArgumentCaptor.getValue()); - return null; - }).when((AsyncMultiStreamBlobContainer) blobContainer) - .asyncBlobUpload(writeContextArgumentCaptor.capture(), actionListenerArgumentCaptor.capture()); - - remoteRoutingTableService.start(); - CheckedRunnable runnable = remoteRoutingTableService.getIndexRoutingAsyncAction( - clusterState, - clusterState.routingTable().getIndicesRouting().get(indexName), - listener, - basePath - ); - assertNotNull(runnable); - runnable.run(); - - String expectedFilePrefix = String.join( - DELIMITER, - INDEX_ROUTING_FILE_PREFIX, - RemoteStoreUtils.invertLong(clusterState.term()), - RemoteStoreUtils.invertLong(clusterState.version()) - ); - assertEquals(1, actionListenerArgumentCaptor.getAllValues().size()); - assertEquals(1, writeContextArgumentCaptor.getAllValues().size()); - assertNotNull(capturedWriteContext.get("index_routing")); - assertEquals(capturedWriteContext.get("index_routing").getWritePriority(), WritePriority.URGENT); - assertTrue(capturedWriteContext.get("index_routing").getFileName().startsWith(expectedFilePrefix)); - } - - public void testGetIndexRoutingAsyncActionAsyncRepoFailureInRepo() throws IOException { - String indexName = randomAlphaOfLength(randomIntBetween(1, 50)); - ClusterState clusterState = createClusterState(indexName); - BlobPath expectedPath = getPath(); - - LatchedActionListener listener = mock(LatchedActionListener.class); - blobContainer = mock(AsyncMultiStreamBlobContainer.class); - when(blobStore.blobContainer(expectedPath)).thenReturn(blobContainer); - - doThrow(new IOException("Testing failure")).when((AsyncMultiStreamBlobContainer) blobContainer) - .asyncBlobUpload(any(WriteContext.class), any(ActionListener.class)); - - remoteRoutingTableService.start(); - CheckedRunnable runnable = remoteRoutingTableService.getIndexRoutingAsyncAction( - clusterState, - clusterState.routingTable().getIndicesRouting().get(indexName), - listener, - basePath - ); - assertNotNull(runnable); - runnable.run(); - verify(listener, times(1)).onFailure(any(RemoteStateTransferException.class)); - } - public void testGetAllUploadedIndicesRouting() { final ClusterMetadataManifest previousManifest = ClusterMetadataManifest.builder().build(); final ClusterMetadataManifest.UploadedIndexMetadata uploadedIndexMetadata = new ClusterMetadataManifest.UploadedIndexMetadata( "test-index", "index-uuid", "index-filename", - InternalRemoteRoutingTableService.INDEX_ROUTING_METADATA_PREFIX + INDEX_ROUTING_TABLE_PREFIX ); List allIndiceRoutingMetadata = remoteRoutingTableService @@ -491,7 +377,7 @@ public void testGetAllUploadedIndicesRoutingExistingIndexInManifest() { "test-index", "index-uuid", "index-filename", - InternalRemoteRoutingTableService.INDEX_ROUTING_METADATA_PREFIX + INDEX_ROUTING_TABLE_PREFIX ); final ClusterMetadataManifest previousManifest = ClusterMetadataManifest.builder() .indicesRouting(List.of(uploadedIndexMetadata)) @@ -509,7 +395,7 @@ public void testGetAllUploadedIndicesRoutingNewIndexFromManifest() { "test-index", "index-uuid", "index-filename", - InternalRemoteRoutingTableService.INDEX_ROUTING_METADATA_PREFIX + INDEX_ROUTING_TABLE_PREFIX ); final ClusterMetadataManifest previousManifest = ClusterMetadataManifest.builder() .indicesRouting(List.of(uploadedIndexMetadata)) @@ -518,7 +404,7 @@ public void testGetAllUploadedIndicesRoutingNewIndexFromManifest() { "test-index2", "index-uuid", "index-filename", - InternalRemoteRoutingTableService.INDEX_ROUTING_METADATA_PREFIX + INDEX_ROUTING_TABLE_PREFIX ); List allIndiceRoutingMetadata = remoteRoutingTableService @@ -534,13 +420,13 @@ public void testGetAllUploadedIndicesRoutingIndexDeleted() { "test-index", "index-uuid", "index-filename", - InternalRemoteRoutingTableService.INDEX_ROUTING_METADATA_PREFIX + INDEX_ROUTING_TABLE_PREFIX ); final ClusterMetadataManifest.UploadedIndexMetadata uploadedIndexMetadata2 = new ClusterMetadataManifest.UploadedIndexMetadata( "test-index2", "index-uuid", "index-filename", - InternalRemoteRoutingTableService.INDEX_ROUTING_METADATA_PREFIX + INDEX_ROUTING_TABLE_PREFIX ); final ClusterMetadataManifest previousManifest = ClusterMetadataManifest.builder() .indicesRouting(List.of(uploadedIndexMetadata, uploadedIndexMetadata2)) @@ -558,13 +444,13 @@ public void testGetAllUploadedIndicesRoutingNoChange() { "test-index", "index-uuid", "index-filename", - InternalRemoteRoutingTableService.INDEX_ROUTING_METADATA_PREFIX + INDEX_ROUTING_TABLE_PREFIX ); final ClusterMetadataManifest.UploadedIndexMetadata uploadedIndexMetadata2 = new ClusterMetadataManifest.UploadedIndexMetadata( "test-index2", "index-uuid", "index-filename", - InternalRemoteRoutingTableService.INDEX_ROUTING_METADATA_PREFIX + INDEX_ROUTING_TABLE_PREFIX ); final ClusterMetadataManifest previousManifest = ClusterMetadataManifest.builder() .indicesRouting(List.of(uploadedIndexMetadata, uploadedIndexMetadata2)) @@ -640,69 +526,69 @@ public void testIndicesRoutingDiffWhenIndexDeletedAndAdded() { ); } - public void testGetAsyncIndexMetadataReadAction() throws Exception { + public void testGetAsyncIndexRoutingReadAction() throws Exception { String indexName = randomAlphaOfLength(randomIntBetween(1, 50)); ClusterState clusterState = createClusterState(indexName); String uploadedFileName = String.format(Locale.ROOT, "index-routing/" + indexName); - Index index = new Index(indexName, "uuid-01"); - - LatchedActionListener listener = mock(LatchedActionListener.class); - when(blobStore.blobContainer(any())).thenReturn(blobContainer); - BytesStreamOutput streamOutput = new BytesStreamOutput(); - RemoteIndexRoutingTable remoteIndexRoutingTable = new RemoteIndexRoutingTable( - clusterState.routingTable().getIndicesRouting().get(indexName) + when(blobStoreTransferService.downloadBlob(anyIterable(), anyString())).thenReturn( + INDEX_ROUTING_TABLE_FORMAT.serialize( + clusterState.getRoutingTable().getIndicesRouting().get(indexName), + uploadedFileName, + compressor + ).streamInput() ); - remoteIndexRoutingTable.writeTo(streamOutput); - when(blobContainer.readBlob(indexName)).thenReturn(streamOutput.bytes().streamInput()); - remoteRoutingTableService.start(); + TestCapturingListener listener = new TestCapturingListener<>(); + CountDownLatch latch = new CountDownLatch(1); - CheckedRunnable runnable = remoteRoutingTableService.getAsyncIndexRoutingReadAction(uploadedFileName, index, listener); - assertNotNull(runnable); - runnable.run(); + remoteRoutingTableService.getAsyncIndexRoutingReadAction( + "cluster-uuid", + uploadedFileName, + new LatchedActionListener<>(listener, latch) + ).run(); + latch.await(); - assertBusy(() -> verify(blobContainer, times(1)).readBlob(any())); - assertBusy(() -> verify(listener, times(1)).onResponse(any(IndexRoutingTable.class))); + assertNull(listener.getFailure()); + assertNotNull(listener.getResult()); + IndexRoutingTable indexRoutingTable = listener.getResult(); + assertEquals(clusterState.getRoutingTable().getIndicesRouting().get(indexName), indexRoutingTable); } - public void testGetAsyncIndexMetadataReadActionFailureForIncorrectIndex() throws Exception { + public void testGetAsyncIndexRoutingWriteAction() throws Exception { String indexName = randomAlphaOfLength(randomIntBetween(1, 50)); ClusterState clusterState = createClusterState(indexName); - String uploadedFileName = String.format(Locale.ROOT, "index-routing/" + indexName); - Index index = new Index("incorrect-index", "uuid-01"); - - LatchedActionListener listener = mock(LatchedActionListener.class); - when(blobStore.blobContainer(any())).thenReturn(blobContainer); - BytesStreamOutput streamOutput = new BytesStreamOutput(); - RemoteIndexRoutingTable remoteIndexRoutingTable = new RemoteIndexRoutingTable( - clusterState.routingTable().getIndicesRouting().get(indexName) - ); - remoteIndexRoutingTable.writeTo(streamOutput); - when(blobContainer.readBlob(anyString())).thenReturn(streamOutput.bytes().streamInput()); - remoteRoutingTableService.doStart(); - - CheckedRunnable runnable = remoteRoutingTableService.getAsyncIndexRoutingReadAction(uploadedFileName, index, listener); - assertNotNull(runnable); - runnable.run(); - - assertBusy(() -> verify(blobContainer, times(1)).readBlob(any())); - assertBusy(() -> verify(listener, times(1)).onFailure(any(Exception.class))); - } - - public void testGetAsyncIndexMetadataReadActionFailureInBlobRepo() throws Exception { - String indexName = randomAlphaOfLength(randomIntBetween(1, 50)); - String uploadedFileName = String.format(Locale.ROOT, "index-routing/" + indexName); - Index index = new Index(indexName, "uuid-01"); - - LatchedActionListener listener = mock(LatchedActionListener.class); - when(blobStore.blobContainer(any())).thenReturn(blobContainer); - doThrow(new IOException("testing failure")).when(blobContainer).readBlob(indexName); - remoteRoutingTableService.doStart(); - - CheckedRunnable runnable = remoteRoutingTableService.getAsyncIndexRoutingReadAction(uploadedFileName, index, listener); - assertNotNull(runnable); - runnable.run(); - - assertBusy(() -> verify(listener, times(1)).onFailure(any(RemoteStateTransferException.class))); + doAnswer(invocationOnMock -> { + invocationOnMock.getArgument(4, ActionListener.class).onResponse(null); + return null; + }).when(blobStoreTransferService) + .uploadBlob(any(InputStream.class), anyIterable(), anyString(), eq(WritePriority.URGENT), any(ActionListener.class)); + + TestCapturingListener listener = new TestCapturingListener<>(); + CountDownLatch latch = new CountDownLatch(1); + + remoteRoutingTableService.getAsyncIndexRoutingWriteAction( + clusterState.metadata().clusterUUID(), + clusterState.term(), + clusterState.version(), + clusterState.getRoutingTable().indicesRouting().get(indexName), + new LatchedActionListener<>(listener, latch) + ).run(); + latch.await(); + assertNull(listener.getFailure()); + assertNotNull(listener.getResult()); + ClusterMetadataManifest.UploadedMetadata uploadedMetadata = listener.getResult(); + + assertEquals(INDEX_ROUTING_TABLE_PREFIX + indexName, uploadedMetadata.getComponent()); + String uploadedFileName = uploadedMetadata.getUploadedFilename(); + String[] pathTokens = uploadedFileName.split(PATH_DELIMITER); + assertEquals(8, pathTokens.length); + assertEquals(pathTokens[1], "base-path"); + String[] fileNameTokens = pathTokens[7].split(DELIMITER); + + assertEquals(4, fileNameTokens.length); + assertEquals(fileNameTokens[0], INDEX_ROUTING_TABLE); + assertEquals(fileNameTokens[1], RemoteStoreUtils.invertLong(1L)); + assertEquals(fileNameTokens[2], RemoteStoreUtils.invertLong(2L)); + assertThat(RemoteStoreUtils.invertLong(fileNameTokens[3]), lessThanOrEqualTo(System.currentTimeMillis())); } public void testGetUpdatedIndexRoutingTableMetadataWhenNoChange() { @@ -758,7 +644,7 @@ private ClusterState createClusterState(String indexName) { } private BlobPath getPath() { - BlobPath indexRoutingPath = basePath.add(INDEX_ROUTING_PATH_TOKEN); + BlobPath indexRoutingPath = basePath.add(INDEX_ROUTING_TABLE); return RemoteStoreEnums.PathType.HASHED_PREFIX.path( RemoteStorePathStrategy.PathInput.builder().basePath(indexRoutingPath).indexUUID("uuid").build(), RemoteStoreEnums.PathHashAlgorithm.FNV_1A_BASE64 diff --git a/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java b/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java index 152a6dba6c032..34a8ab275e4da 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java @@ -13,7 +13,6 @@ import org.opensearch.cluster.metadata.IndexGraveyard; import org.opensearch.cluster.metadata.RepositoriesMetadata; import org.opensearch.cluster.metadata.WeightedRoutingMetadata; -import org.opensearch.cluster.routing.remote.InternalRemoteRoutingTableService; import org.opensearch.common.xcontent.json.JsonXContent; import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.core.common.io.stream.NamedWriteableRegistry; @@ -44,6 +43,7 @@ import static org.opensearch.gateway.remote.model.RemotePersistentSettingsMetadata.SETTING_METADATA; import static org.opensearch.gateway.remote.model.RemoteTemplatesMetadata.TEMPLATES_METADATA; import static org.opensearch.gateway.remote.model.RemoteTransientSettingsMetadata.TRANSIENT_SETTING_METADATA; +import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_TABLE_PREFIX; public class ClusterMetadataManifestTests extends OpenSearchTestCase { @@ -545,7 +545,7 @@ public void testClusterMetadataManifestXContentV2WithoutEphemeral() throws IOExc "test-index", "test-uuid", "routing-path", - InternalRemoteRoutingTableService.INDEX_ROUTING_METADATA_PREFIX + INDEX_ROUTING_TABLE_PREFIX ); ClusterMetadataManifest originalManifest = ClusterMetadataManifest.builder() .clusterTerm(1L) diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java index 6cd9cbbf13848..c4ddbbd2f1c7b 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java @@ -117,6 +117,7 @@ import static org.opensearch.gateway.remote.RemoteClusterStateTestUtils.TestClusterStateCustom1; import static org.opensearch.gateway.remote.RemoteClusterStateTestUtils.TestClusterStateCustom2; import static org.opensearch.gateway.remote.RemoteClusterStateTestUtils.TestClusterStateCustom3; +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.CUSTOM_DELIMITER; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.FORMAT_PARAMS; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.getFormattedIndexFileName; @@ -126,7 +127,6 @@ import static org.opensearch.gateway.remote.model.RemoteClusterStateCustoms.CLUSTER_STATE_CUSTOM; import static org.opensearch.gateway.remote.model.RemoteCoordinationMetadata.COORDINATION_METADATA; import static org.opensearch.gateway.remote.model.RemoteCoordinationMetadata.COORDINATION_METADATA_FORMAT; -import static org.opensearch.gateway.remote.model.RemoteCustomMetadata.CUSTOM_DELIMITER; import static org.opensearch.gateway.remote.model.RemoteCustomMetadata.CUSTOM_METADATA; import static org.opensearch.gateway.remote.model.RemoteCustomMetadata.readFrom; import static org.opensearch.gateway.remote.model.RemoteDiscoveryNodes.DISCOVERY_NODES; @@ -142,6 +142,7 @@ import static org.opensearch.gateway.remote.model.RemotePersistentSettingsMetadata.SETTING_METADATA; import static org.opensearch.gateway.remote.model.RemoteTemplatesMetadata.TEMPLATES_METADATA; import static org.opensearch.gateway.remote.model.RemoteTemplatesMetadata.TEMPLATES_METADATA_FORMAT; +import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_TABLE; import static org.opensearch.gateway.remote.model.RemoteTemplatesMetadataTests.getTemplatesMetadata; import static org.opensearch.gateway.remote.model.RemoteTransientSettingsMetadata.TRANSIENT_SETTING_METADATA; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY; @@ -253,6 +254,7 @@ public void setup() { List.of(new RemoteIndexPathUploader(threadPool, settings, repositoriesServiceSupplier, clusterSettings)), namedWriteableRegistry ); + remoteClusterStateService.start(); } @After @@ -2586,6 +2588,7 @@ public void testRemoteRoutingTableInitializedWhenEnabled() { List.of(new RemoteIndexPathUploader(threadPool, newSettings, repositoriesServiceSupplier, clusterSettings)), writableRegistry() ); + remoteClusterStateService.start(); assertTrue(remoteClusterStateService.getRemoteRoutingTableService() instanceof InternalRemoteRoutingTableService); } @@ -2603,7 +2606,7 @@ public void testWriteFullMetadataSuccessWithRoutingTable() throws IOException { "test-index", "index-uuid", "routing-filename", - InternalRemoteRoutingTableService.INDEX_ROUTING_METADATA_PREFIX + INDEX_ROUTING_TABLE + CUSTOM_DELIMITER ); final ClusterMetadataManifest expectedManifest = ClusterMetadataManifest.builder() .indices(List.of(uploadedIndexMetadata)) @@ -2654,7 +2657,7 @@ public void testWriteFullMetadataInParallelSuccessWithRoutingTable() throws IOEx "test-index", "index-uuid", "routing-filename", - InternalRemoteRoutingTableService.INDEX_ROUTING_METADATA_PREFIX + INDEX_ROUTING_TABLE + CUSTOM_DELIMITER ); final ClusterMetadataManifest expectedManifest = ClusterMetadataManifest.builder() @@ -2710,7 +2713,7 @@ public void testWriteIncrementalMetadataSuccessWithRoutingTable() throws IOExcep "test-index", "index-uuid", "routing-filename", - InternalRemoteRoutingTableService.INDEX_ROUTING_METADATA_PREFIX + INDEX_ROUTING_TABLE + CUSTOM_DELIMITER ); final ClusterMetadataManifest expectedManifest = ClusterMetadataManifest.builder() .indices(List.of(uploadedIndexMetadata)) diff --git a/server/src/test/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableHeaderTests.java b/server/src/test/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableHeaderTests.java deleted file mode 100644 index a3f0ac36a40f1..0000000000000 --- a/server/src/test/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableHeaderTests.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.gateway.remote.routingtable; - -import org.opensearch.common.io.stream.BytesStreamOutput; -import org.opensearch.core.common.io.stream.BytesStreamInput; -import org.opensearch.test.OpenSearchTestCase; - -import java.io.IOException; - -public class IndexRoutingTableHeaderTests extends OpenSearchTestCase { - - public void testIndexRoutingTableHeader() throws IOException { - String indexName = randomAlphaOfLength(randomIntBetween(1, 50)); - IndexRoutingTableHeader header = new IndexRoutingTableHeader(indexName); - try (BytesStreamOutput out = new BytesStreamOutput()) { - header.writeTo(out); - - BytesStreamInput in = new BytesStreamInput(out.bytes().toBytesRef().bytes); - IndexRoutingTableHeader headerRead = new IndexRoutingTableHeader(in); - assertEquals(indexName, headerRead.getIndexName()); - - } - } - -} diff --git a/server/src/test/java/org/opensearch/gateway/remote/routingtable/RemoteIndexRoutingTableTests.java b/server/src/test/java/org/opensearch/gateway/remote/routingtable/RemoteIndexRoutingTableTests.java index 72066d8afb45b..dee32c64e1327 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/routingtable/RemoteIndexRoutingTableTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/routingtable/RemoteIndexRoutingTableTests.java @@ -13,16 +13,137 @@ import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.routing.IndexRoutingTable; import org.opensearch.cluster.routing.RoutingTable; -import org.opensearch.cluster.routing.ShardRoutingState; -import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.compress.DeflateCompressor; +import org.opensearch.common.remote.BlobPathParameters; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.common.io.stream.NamedWriteableRegistry; +import org.opensearch.core.compress.Compressor; +import org.opensearch.core.compress.NoneCompressor; +import org.opensearch.gateway.remote.ClusterMetadataManifest; +import org.opensearch.gateway.remote.RemoteClusterStateUtils; +import org.opensearch.index.remote.RemoteStoreUtils; +import org.opensearch.index.translog.transfer.BlobStoreTransferService; +import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; +import org.junit.After; +import org.junit.Before; import java.io.IOException; -import java.util.concurrent.atomic.AtomicInteger; +import java.io.InputStream; +import java.util.List; + +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.CUSTOM_DELIMITER; +import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_TABLE; +import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_TABLE_PREFIX; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.hamcrest.Matchers.nullValue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class RemoteIndexRoutingTableTests extends OpenSearchTestCase { - public void testRoutingTableInput() { + private static final String TEST_BLOB_NAME = "/test-path/test-blob-name"; + private static final String TEST_BLOB_PATH = "test-path"; + private static final String TEST_BLOB_FILE_NAME = "test-blob-name"; + private static final String INDEX_ROUTING_TABLE_TYPE = "test-index-routing-table"; + private static final long STATE_VERSION = 3L; + private static final long STATE_TERM = 2L; + private String clusterUUID; + private BlobStoreTransferService blobStoreTransferService; + private BlobStoreRepository blobStoreRepository; + private String clusterName; + private ClusterSettings clusterSettings; + private Compressor compressor; + private NamedWriteableRegistry namedWriteableRegistry; + private final ThreadPool threadPool = new TestThreadPool(getClass().getName()); + + @Before + public void setup() { + clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + this.clusterUUID = "test-cluster-uuid"; + this.blobStoreTransferService = mock(BlobStoreTransferService.class); + this.blobStoreRepository = mock(BlobStoreRepository.class); + BlobPath blobPath = new BlobPath().add("/path"); + when(blobStoreRepository.basePath()).thenReturn(blobPath); + when(blobStoreRepository.getCompressor()).thenReturn(new DeflateCompressor()); + compressor = new NoneCompressor(); + namedWriteableRegistry = writableRegistry(); + this.clusterName = "test-cluster-name"; + } + + @After + public void tearDown() throws Exception { + super.tearDown(); + threadPool.shutdown(); + } + + public void testClusterUUID() { + String indexName = randomAlphaOfLength(randomIntBetween(1, 50)); + int numberOfShards = randomIntBetween(1, 10); + int numberOfReplicas = randomIntBetween(1, 10); + Metadata metadata = Metadata.builder() + .put( + IndexMetadata.builder(indexName) + .settings(settings(Version.CURRENT)) + .numberOfShards(numberOfShards) + .numberOfReplicas(numberOfReplicas) + ) + .build(); + + RoutingTable initialRoutingTable = RoutingTable.builder().addAsNew(metadata.index(indexName)).build(); + + initialRoutingTable.getIndicesRouting().values().forEach(indexRoutingTable -> { + RemoteIndexRoutingTable remoteObjectForUpload = new RemoteIndexRoutingTable( + indexRoutingTable, + clusterUUID, + compressor, + STATE_TERM, + STATE_VERSION + ); + assertEquals(remoteObjectForUpload.clusterUUID(), clusterUUID); + + RemoteIndexRoutingTable remoteObjectForDownload = new RemoteIndexRoutingTable(TEST_BLOB_NAME, clusterUUID, compressor); + assertEquals(remoteObjectForDownload.clusterUUID(), clusterUUID); + }); + } + + public void testFullBlobName() { + String indexName = randomAlphaOfLength(randomIntBetween(1, 50)); + int numberOfShards = randomIntBetween(1, 10); + int numberOfReplicas = randomIntBetween(1, 10); + Metadata metadata = Metadata.builder() + .put( + IndexMetadata.builder(indexName) + .settings(settings(Version.CURRENT)) + .numberOfShards(numberOfShards) + .numberOfReplicas(numberOfReplicas) + ) + .build(); + + RoutingTable initialRoutingTable = RoutingTable.builder().addAsNew(metadata.index(indexName)).build(); + + initialRoutingTable.getIndicesRouting().values().forEach(indexRoutingTable -> { + RemoteIndexRoutingTable remoteObjectForUpload = new RemoteIndexRoutingTable( + indexRoutingTable, + clusterUUID, + compressor, + STATE_TERM, + STATE_VERSION + ); + assertThat(remoteObjectForUpload.getFullBlobName(), nullValue()); + + RemoteIndexRoutingTable remoteObjectForDownload = new RemoteIndexRoutingTable(TEST_BLOB_NAME, clusterUUID, compressor); + assertThat(remoteObjectForDownload.getFullBlobName(), is(TEST_BLOB_NAME)); + }); + } + + public void testBlobFileName() { String indexName = randomAlphaOfLength(randomIntBetween(1, 50)); int numberOfShards = randomIntBetween(1, 10); int numberOfReplicas = randomIntBetween(1, 10); @@ -37,51 +158,164 @@ public void testRoutingTableInput() { RoutingTable initialRoutingTable = RoutingTable.builder().addAsNew(metadata.index(indexName)).build(); - initialRoutingTable.getIndicesRouting().values().forEach(indexShardRoutingTables -> { - RemoteIndexRoutingTable indexRouting = new RemoteIndexRoutingTable(indexShardRoutingTables); - try (BytesStreamOutput streamOutput = new BytesStreamOutput();) { - indexRouting.writeTo(streamOutput); - RemoteIndexRoutingTable remoteIndexRoutingTable = new RemoteIndexRoutingTable( - streamOutput.bytes().streamInput(), - metadata.index(indexName).getIndex() - ); - IndexRoutingTable indexRoutingTable = remoteIndexRoutingTable.getIndexRoutingTable(); - assertEquals(numberOfShards, indexRoutingTable.getShards().size()); - assertEquals(metadata.index(indexName).getIndex(), indexRoutingTable.getIndex()); - assertEquals( - numberOfShards * (1 + numberOfReplicas), - indexRoutingTable.shardsWithState(ShardRoutingState.UNASSIGNED).size() - ); + initialRoutingTable.getIndicesRouting().values().forEach(indexRoutingTable -> { + RemoteIndexRoutingTable remoteObjectForUpload = new RemoteIndexRoutingTable( + indexRoutingTable, + clusterUUID, + compressor, + STATE_TERM, + STATE_VERSION + ); + assertThat(remoteObjectForUpload.getBlobFileName(), nullValue()); + + RemoteIndexRoutingTable remoteObjectForDownload = new RemoteIndexRoutingTable(TEST_BLOB_NAME, clusterUUID, compressor); + assertThat(remoteObjectForDownload.getBlobFileName(), is(TEST_BLOB_FILE_NAME)); + }); + } + + public void testBlobPathTokens() { + String uploadedFile = "user/local/opensearch/routingTable"; + RemoteIndexRoutingTable remoteObjectForDownload = new RemoteIndexRoutingTable(uploadedFile, clusterUUID, compressor); + assertThat(remoteObjectForDownload.getBlobPathTokens(), is(new String[] { "user", "local", "opensearch", "routingTable" })); + } + + public void testBlobPathParameters() { + String indexName = randomAlphaOfLength(randomIntBetween(1, 50)); + int numberOfShards = randomIntBetween(1, 10); + int numberOfReplicas = randomIntBetween(1, 10); + Metadata metadata = Metadata.builder() + .put( + IndexMetadata.builder(indexName) + .settings(settings(Version.CURRENT)) + .numberOfShards(numberOfShards) + .numberOfReplicas(numberOfReplicas) + ) + .build(); + + RoutingTable initialRoutingTable = RoutingTable.builder().addAsNew(metadata.index(indexName)).build(); + + initialRoutingTable.getIndicesRouting().values().forEach(indexRoutingTable -> { + RemoteIndexRoutingTable remoteObjectForUpload = new RemoteIndexRoutingTable( + indexRoutingTable, + clusterUUID, + compressor, + STATE_TERM, + STATE_VERSION + ); + assertThat(remoteObjectForUpload.getBlobFileName(), nullValue()); + + BlobPathParameters params = remoteObjectForUpload.getBlobPathParameters(); + assertThat(params.getPathTokens(), is(List.of(indexRoutingTable.getIndex().getUUID()))); + String expectedPrefix = ""; + assertThat(params.getFilePrefix(), is(expectedPrefix)); + }); + } + + public void testGenerateBlobFileName() { + String indexName = randomAlphaOfLength(randomIntBetween(1, 50)); + int numberOfShards = randomIntBetween(1, 10); + int numberOfReplicas = randomIntBetween(1, 10); + Metadata metadata = Metadata.builder() + .put( + IndexMetadata.builder(indexName) + .settings(settings(Version.CURRENT)) + .numberOfShards(numberOfShards) + .numberOfReplicas(numberOfReplicas) + ) + .build(); + + RoutingTable initialRoutingTable = RoutingTable.builder().addAsNew(metadata.index(indexName)).build(); + + initialRoutingTable.getIndicesRouting().values().forEach(indexRoutingTable -> { + RemoteIndexRoutingTable remoteObjectForUpload = new RemoteIndexRoutingTable( + indexRoutingTable, + clusterUUID, + compressor, + STATE_TERM, + STATE_VERSION + ); + + String blobFileName = remoteObjectForUpload.generateBlobFileName(); + String[] nameTokens = blobFileName.split(RemoteClusterStateUtils.DELIMITER); + assertEquals(nameTokens[0], INDEX_ROUTING_TABLE_PREFIX); + assertEquals(nameTokens[1], RemoteStoreUtils.invertLong(STATE_TERM)); + assertEquals(nameTokens[2], RemoteStoreUtils.invertLong(STATE_VERSION)); + assertThat(RemoteStoreUtils.invertLong(nameTokens[3]), lessThanOrEqualTo(System.currentTimeMillis())); + }); + } + + public void testGetUploadedMetadata() throws IOException { + String indexName = randomAlphaOfLength(randomIntBetween(1, 50)); + int numberOfShards = randomIntBetween(1, 10); + int numberOfReplicas = randomIntBetween(1, 10); + Metadata metadata = Metadata.builder() + .put( + IndexMetadata.builder(indexName) + .settings(settings(Version.CURRENT)) + .numberOfShards(numberOfShards) + .numberOfReplicas(numberOfReplicas) + ) + .build(); + + RoutingTable initialRoutingTable = RoutingTable.builder().addAsNew(metadata.index(indexName)).build(); + + initialRoutingTable.getIndicesRouting().values().forEach(indexRoutingTable -> { + RemoteIndexRoutingTable remoteObjectForUpload = new RemoteIndexRoutingTable( + indexRoutingTable, + clusterUUID, + compressor, + STATE_TERM, + STATE_VERSION + ); + + assertThrows(AssertionError.class, remoteObjectForUpload::getUploadedMetadata); + + try (InputStream inputStream = remoteObjectForUpload.serialize()) { + remoteObjectForUpload.setFullBlobName(new BlobPath().add(TEST_BLOB_PATH)); + ClusterMetadataManifest.UploadedMetadata uploadedMetadata = remoteObjectForUpload.getUploadedMetadata(); + String expectedPrefix = String.join(CUSTOM_DELIMITER, INDEX_ROUTING_TABLE, indexRoutingTable.getIndex().getName()); + assertThat(uploadedMetadata.getComponent(), is(expectedPrefix)); + assertThat(uploadedMetadata.getUploadedFilename(), is(remoteObjectForUpload.getFullBlobName())); } catch (IOException e) { throw new RuntimeException(e); } }); } - public void testRoutingTableInputStreamWithInvalidIndex() { + public void testSerDe() throws IOException { + String indexName = randomAlphaOfLength(randomIntBetween(1, 50)); + int numberOfShards = randomIntBetween(1, 10); + int numberOfReplicas = randomIntBetween(1, 10); Metadata metadata = Metadata.builder() - .put(IndexMetadata.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1)) - .put(IndexMetadata.builder("invalid-index").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1)) + .put( + IndexMetadata.builder(indexName) + .settings(settings(Version.CURRENT)) + .numberOfShards(numberOfShards) + .numberOfReplicas(numberOfReplicas) + ) .build(); - RoutingTable initialRoutingTable = RoutingTable.builder().addAsNew(metadata.index("test")).build(); - AtomicInteger assertionError = new AtomicInteger(); - initialRoutingTable.getIndicesRouting().values().forEach(indexShardRoutingTables -> { - RemoteIndexRoutingTable indexRouting = new RemoteIndexRoutingTable(indexShardRoutingTables); - try (BytesStreamOutput streamOutput = new BytesStreamOutput()) { - indexRouting.writeTo(streamOutput); - RemoteIndexRoutingTable remoteIndexRoutingTable = new RemoteIndexRoutingTable( - streamOutput.bytes().streamInput(), - metadata.index("invalid-index").getIndex() - ); - } catch (AssertionError e) { - assertionError.getAndIncrement(); + RoutingTable initialRoutingTable = RoutingTable.builder().addAsNew(metadata.index(indexName)).build(); + + initialRoutingTable.getIndicesRouting().values().forEach(indexRoutingTable -> { + RemoteIndexRoutingTable remoteObjectForUpload = new RemoteIndexRoutingTable( + indexRoutingTable, + clusterUUID, + compressor, + STATE_TERM, + STATE_VERSION + ); + + assertThrows(AssertionError.class, remoteObjectForUpload::getUploadedMetadata); + + try (InputStream inputStream = remoteObjectForUpload.serialize()) { + remoteObjectForUpload.setFullBlobName(BlobPath.cleanPath()); + assertThat(inputStream.available(), greaterThan(0)); + IndexRoutingTable readIndexRoutingTable = remoteObjectForUpload.deserialize(inputStream); + assertEquals(readIndexRoutingTable, indexRoutingTable); } catch (IOException e) { throw new RuntimeException(e); } }); - - assertEquals(1, assertionError.get()); } - } From 8105d30625f648f115ee42e397eb797d271cc00a Mon Sep 17 00:00:00 2001 From: Arpit Bandejiya Date: Tue, 9 Jul 2024 14:50:56 +0530 Subject: [PATCH 3/6] Address comments Signed-off-by: Arpit Bandejiya --- .../routing/remote/RemoteRoutingTableService.java | 2 +- .../remote/AbstractRemoteWritableBlobEntity.java | 4 ++++ .../gateway/remote/RemoteClusterStateService.java | 4 ++-- .../remote/model/RemoteClusterStateBlobStore.java | 12 ++++++++---- .../remote/model/RemoteRoutingTableBlobStore.java | 7 ++----- .../remote/routingtable/RemoteIndexRoutingTable.java | 12 ++++-------- .../remote/RemoteRoutingTableServiceTests.java | 6 ++++-- .../remote/RemoteClusterStateServiceTests.java | 8 ++++---- .../routingtable/RemoteIndexRoutingTableTests.java | 11 +++++------ 9 files changed, 34 insertions(+), 32 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java b/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java index 828c65353cdd9..6e4108bc69d69 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java +++ b/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java @@ -28,7 +28,7 @@ * @opensearch.internal */ public interface RemoteRoutingTableService extends LifecycleComponent { - DiffableUtils.NonDiffableValueSerializer CUSTOM_ROUTING_TABLE_VALUE_SERIALIZER = + public static final DiffableUtils.NonDiffableValueSerializer CUSTOM_ROUTING_TABLE_VALUE_SERIALIZER = new DiffableUtils.NonDiffableValueSerializer<>() { @Override public void write(IndexRoutingTable value, StreamOutput out) throws IOException { diff --git a/server/src/main/java/org/opensearch/common/remote/AbstractRemoteWritableBlobEntity.java b/server/src/main/java/org/opensearch/common/remote/AbstractRemoteWritableBlobEntity.java index 23fc9d3ad77cb..237c077cb673c 100644 --- a/server/src/main/java/org/opensearch/common/remote/AbstractRemoteWritableBlobEntity.java +++ b/server/src/main/java/org/opensearch/common/remote/AbstractRemoteWritableBlobEntity.java @@ -40,6 +40,10 @@ public AbstractRemoteWritableBlobEntity( this.namedXContentRegistry = namedXContentRegistry; } + public AbstractRemoteWritableBlobEntity(final String clusterUUID, final Compressor compressor) { + this(clusterUUID, compressor, null); + } + public abstract BlobPathParameters getBlobPathParameters(); public abstract String getType(); diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index a64aabc4f8306..f756ed1949a13 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -104,8 +104,8 @@ import static org.opensearch.gateway.remote.model.RemotePersistentSettingsMetadata.SETTING_METADATA; import static org.opensearch.gateway.remote.model.RemoteTemplatesMetadata.TEMPLATES_METADATA; import static org.opensearch.gateway.remote.model.RemoteTransientSettingsMetadata.TRANSIENT_SETTING_METADATA; +import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_METADATA_PREFIX; import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_TABLE; -import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_TABLE_PREFIX; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled; /** @@ -714,7 +714,7 @@ UploadedMetadataResults writeMetadataInParallel( UploadedMetadataResults response = new UploadedMetadataResults(); results.forEach((name, uploadedMetadata) -> { if (uploadedMetadata.getClass().equals(UploadedIndexMetadata.class) - && uploadedMetadata.getComponent().contains(INDEX_ROUTING_TABLE_PREFIX)) { + && uploadedMetadata.getComponent().contains(INDEX_ROUTING_METADATA_PREFIX)) { response.uploadedIndicesRoutingMetadata.add((UploadedIndexMetadata) uploadedMetadata); } else if (name.startsWith(CUSTOM_METADATA)) { // component name for custom metadata will look like custom-- diff --git a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterStateBlobStore.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterStateBlobStore.java index 27a9adf35f8a3..7fdc7ee4196e2 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterStateBlobStore.java +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterStateBlobStore.java @@ -23,6 +23,8 @@ import java.io.InputStream; import java.util.concurrent.ExecutorService; +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.CLUSTER_STATE_PATH_TOKEN; + /** * Abstract class for a blob type storage * @@ -31,7 +33,7 @@ */ public class RemoteClusterStateBlobStore> implements RemoteWritableEntityStore { - protected final BlobStoreTransferService transferService; + private final BlobStoreTransferService transferService; private final BlobStoreRepository blobStoreRepository; private final String clusterName; private final ExecutorService executorService; @@ -96,10 +98,12 @@ public BlobPath getBasePath() { return blobStoreRepository.basePath(); } + public BlobPath getBlobPathPrefix(String clusterUUID) { + return getBasePath().add(RemoteClusterStateUtils.encodeString(getClusterName())).add(CLUSTER_STATE_PATH_TOKEN).add(clusterUUID); + } + public BlobPath getBlobPathForUpload(final AbstractRemoteWritableBlobEntity obj) { - BlobPath blobPath = getBasePath().add(RemoteClusterStateUtils.encodeString(getClusterName())) - .add("cluster-state") - .add(obj.clusterUUID()); + BlobPath blobPath = getBlobPathPrefix(obj.clusterUUID()); for (String token : obj.getBlobPathParameters().getPathTokens()) { blobPath = blobPath.add(token); } diff --git a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteRoutingTableBlobStore.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteRoutingTableBlobStore.java index ecadb3159bb9f..c9653cbf80d9d 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteRoutingTableBlobStore.java +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteRoutingTableBlobStore.java @@ -13,7 +13,6 @@ import org.opensearch.common.remote.RemoteWriteableEntity; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Setting; -import org.opensearch.gateway.remote.RemoteClusterStateUtils; import org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable; import org.opensearch.index.remote.RemoteStoreEnums; import org.opensearch.index.remote.RemoteStorePathStrategy; @@ -77,10 +76,8 @@ public RemoteRoutingTableBlobStore( @Override public BlobPath getBlobPathForUpload(final AbstractRemoteWritableBlobEntity obj) { assert obj.getBlobPathParameters().getPathTokens().size() == 1 : "Unexpected tokens in RemoteRoutingTableObject"; - BlobPath indexRoutingPath = getBasePath().add(RemoteClusterStateUtils.encodeString(getClusterName())) - .add("cluster-state") - .add(obj.clusterUUID()) - .add(INDEX_ROUTING_TABLE); + BlobPath indexRoutingPath = getBlobPathPrefix(obj.clusterUUID()).add(INDEX_ROUTING_TABLE); + BlobPath path = pathType.path( RemoteStorePathStrategy.PathInput.builder() .basePath(indexRoutingPath) diff --git a/server/src/main/java/org/opensearch/gateway/remote/routingtable/RemoteIndexRoutingTable.java b/server/src/main/java/org/opensearch/gateway/remote/routingtable/RemoteIndexRoutingTable.java index cc1b19fa45195..b656386c360fe 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/routingtable/RemoteIndexRoutingTable.java +++ b/server/src/main/java/org/opensearch/gateway/remote/routingtable/RemoteIndexRoutingTable.java @@ -47,7 +47,7 @@ public RemoteIndexRoutingTable( long term, long version ) { - super(clusterUUID, compressor, null); + super(clusterUUID, compressor); this.index = indexRoutingTable.getIndex(); this.indexRoutingTable = indexRoutingTable; this.term = term; @@ -61,24 +61,20 @@ public RemoteIndexRoutingTable( * @param compressor Compressor object */ public RemoteIndexRoutingTable(String blobName, String clusterUUID, Compressor compressor) { - super(clusterUUID, compressor, null); + super(clusterUUID, compressor); this.index = null; this.term = -1; this.version = -1; this.blobName = blobName; } - public IndexRoutingTable getIndexRoutingTable() { - return indexRoutingTable; - } - public Index getIndex() { return index; } @Override public BlobPathParameters getBlobPathParameters() { - return new BlobPathParameters(List.of(indexRoutingTable.getIndex().getUUID())); + return new BlobPathParameters(List.of(indexRoutingTable.getIndex().getUUID()), INDEX_ROUTING_FILE); } @Override @@ -91,7 +87,7 @@ public String generateBlobFileName() { if (blobFileName == null) { blobFileName = String.join( DELIMITER, - INDEX_ROUTING_FILE, + getBlobPathParameters().getFilePrefix(), RemoteStoreUtils.invertLong(term), RemoteStoreUtils.invertLong(version), RemoteStoreUtils.invertLong(System.currentTimeMillis()) diff --git a/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java b/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java index e287fb9799d1d..da057dfac4c5c 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java @@ -63,6 +63,8 @@ import static org.opensearch.gateway.remote.ClusterMetadataManifestTests.randomUploadedIndexMetadataList; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.PATH_DELIMITER; +import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_FILE; +import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_METADATA_PREFIX; import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_TABLE; import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_TABLE_FORMAT; import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_TABLE_PREFIX; @@ -577,7 +579,7 @@ public void testGetAsyncIndexRoutingWriteAction() throws Exception { assertNotNull(listener.getResult()); ClusterMetadataManifest.UploadedMetadata uploadedMetadata = listener.getResult(); - assertEquals(INDEX_ROUTING_TABLE_PREFIX + indexName, uploadedMetadata.getComponent()); + assertEquals(INDEX_ROUTING_METADATA_PREFIX + indexName, uploadedMetadata.getComponent()); String uploadedFileName = uploadedMetadata.getUploadedFilename(); String[] pathTokens = uploadedFileName.split(PATH_DELIMITER); assertEquals(8, pathTokens.length); @@ -585,7 +587,7 @@ public void testGetAsyncIndexRoutingWriteAction() throws Exception { String[] fileNameTokens = pathTokens[7].split(DELIMITER); assertEquals(4, fileNameTokens.length); - assertEquals(fileNameTokens[0], INDEX_ROUTING_TABLE); + assertEquals(fileNameTokens[0], INDEX_ROUTING_FILE); assertEquals(fileNameTokens[1], RemoteStoreUtils.invertLong(1L)); assertEquals(fileNameTokens[2], RemoteStoreUtils.invertLong(2L)); assertThat(RemoteStoreUtils.invertLong(fileNameTokens[3]), lessThanOrEqualTo(System.currentTimeMillis())); diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java index c4ddbbd2f1c7b..32106ec93a7df 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java @@ -142,9 +142,9 @@ import static org.opensearch.gateway.remote.model.RemotePersistentSettingsMetadata.SETTING_METADATA; import static org.opensearch.gateway.remote.model.RemoteTemplatesMetadata.TEMPLATES_METADATA; import static org.opensearch.gateway.remote.model.RemoteTemplatesMetadata.TEMPLATES_METADATA_FORMAT; -import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_TABLE; import static org.opensearch.gateway.remote.model.RemoteTemplatesMetadataTests.getTemplatesMetadata; import static org.opensearch.gateway.remote.model.RemoteTransientSettingsMetadata.TRANSIENT_SETTING_METADATA; +import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_METADATA_PREFIX; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT; @@ -2606,7 +2606,7 @@ public void testWriteFullMetadataSuccessWithRoutingTable() throws IOException { "test-index", "index-uuid", "routing-filename", - INDEX_ROUTING_TABLE + CUSTOM_DELIMITER + INDEX_ROUTING_METADATA_PREFIX ); final ClusterMetadataManifest expectedManifest = ClusterMetadataManifest.builder() .indices(List.of(uploadedIndexMetadata)) @@ -2657,7 +2657,7 @@ public void testWriteFullMetadataInParallelSuccessWithRoutingTable() throws IOEx "test-index", "index-uuid", "routing-filename", - INDEX_ROUTING_TABLE + CUSTOM_DELIMITER + INDEX_ROUTING_METADATA_PREFIX ); final ClusterMetadataManifest expectedManifest = ClusterMetadataManifest.builder() @@ -2713,7 +2713,7 @@ public void testWriteIncrementalMetadataSuccessWithRoutingTable() throws IOExcep "test-index", "index-uuid", "routing-filename", - INDEX_ROUTING_TABLE + CUSTOM_DELIMITER + INDEX_ROUTING_METADATA_PREFIX ); final ClusterMetadataManifest expectedManifest = ClusterMetadataManifest.builder() .indices(List.of(uploadedIndexMetadata)) diff --git a/server/src/test/java/org/opensearch/gateway/remote/routingtable/RemoteIndexRoutingTableTests.java b/server/src/test/java/org/opensearch/gateway/remote/routingtable/RemoteIndexRoutingTableTests.java index dee32c64e1327..29d4ffa978851 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/routingtable/RemoteIndexRoutingTableTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/routingtable/RemoteIndexRoutingTableTests.java @@ -36,9 +36,8 @@ import java.io.InputStream; import java.util.List; -import static org.opensearch.gateway.remote.RemoteClusterStateUtils.CUSTOM_DELIMITER; -import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_TABLE; -import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_TABLE_PREFIX; +import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_FILE; +import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_METADATA_PREFIX; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThanOrEqualTo; @@ -206,7 +205,7 @@ public void testBlobPathParameters() { BlobPathParameters params = remoteObjectForUpload.getBlobPathParameters(); assertThat(params.getPathTokens(), is(List.of(indexRoutingTable.getIndex().getUUID()))); - String expectedPrefix = ""; + String expectedPrefix = INDEX_ROUTING_FILE; assertThat(params.getFilePrefix(), is(expectedPrefix)); }); } @@ -237,7 +236,7 @@ public void testGenerateBlobFileName() { String blobFileName = remoteObjectForUpload.generateBlobFileName(); String[] nameTokens = blobFileName.split(RemoteClusterStateUtils.DELIMITER); - assertEquals(nameTokens[0], INDEX_ROUTING_TABLE_PREFIX); + assertEquals(nameTokens[0], INDEX_ROUTING_FILE); assertEquals(nameTokens[1], RemoteStoreUtils.invertLong(STATE_TERM)); assertEquals(nameTokens[2], RemoteStoreUtils.invertLong(STATE_VERSION)); assertThat(RemoteStoreUtils.invertLong(nameTokens[3]), lessThanOrEqualTo(System.currentTimeMillis())); @@ -273,7 +272,7 @@ public void testGetUploadedMetadata() throws IOException { try (InputStream inputStream = remoteObjectForUpload.serialize()) { remoteObjectForUpload.setFullBlobName(new BlobPath().add(TEST_BLOB_PATH)); ClusterMetadataManifest.UploadedMetadata uploadedMetadata = remoteObjectForUpload.getUploadedMetadata(); - String expectedPrefix = String.join(CUSTOM_DELIMITER, INDEX_ROUTING_TABLE, indexRoutingTable.getIndex().getName()); + String expectedPrefix = INDEX_ROUTING_METADATA_PREFIX + indexRoutingTable.getIndex().getName(); assertThat(uploadedMetadata.getComponent(), is(expectedPrefix)); assertThat(uploadedMetadata.getUploadedFilename(), is(remoteObjectForUpload.getFullBlobName())); } catch (IOException e) { From 1d35d5f8daec68958a7bbb05434135d2de50d11a Mon Sep 17 00:00:00 2001 From: Arpit Bandejiya Date: Wed, 10 Jul 2024 18:27:06 +0530 Subject: [PATCH 4/6] Fix repo initialisation in remote routing table Signed-off-by: Arpit Bandejiya --- CHANGELOG.md | 1 + .../InternalRemoteRoutingTableService.java | 30 ++++++++++--------- .../remote/RemoteRoutingTableService.java | 2 +- .../RemoteRoutingTableServiceFactory.java | 17 +---------- .../common/remote/BlobPathParameters.java | 4 --- .../remote/RemoteClusterStateService.java | 25 +++++++--------- ...RemoteRoutingTableServiceFactoryTests.java | 17 ----------- .../RemoteRoutingTableServiceTests.java | 13 +++----- .../remote/ClusterMetadataManifestTests.java | 4 +-- .../RemoteClusterStateServiceTests.java | 2 -- 10 files changed, 35 insertions(+), 80 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 813eecbaabfa3..6d87a71ab802d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add allowlist setting for ingest-common and search-pipeline-common processors ([#14439](https://github.com/opensearch-project/OpenSearch/issues/14439)) - Create SystemIndexRegistry with helper method matchesSystemIndex ([#14415](https://github.com/opensearch-project/OpenSearch/pull/14415)) - Print reason why parent task was cancelled ([#14604](https://github.com/opensearch-project/OpenSearch/issues/14604)) +- Refactor remote-routing-table service inline with remote state interfaces([#14668](https://github.com/opensearch-project/OpenSearch/pull/14668)) ### Dependencies - Bump `org.gradle.test-retry` from 1.5.8 to 1.5.9 ([#13442](https://github.com/opensearch-project/OpenSearch/pull/13442)) diff --git a/server/src/main/java/org/opensearch/cluster/routing/remote/InternalRemoteRoutingTableService.java b/server/src/main/java/org/opensearch/cluster/routing/remote/InternalRemoteRoutingTableService.java index 21e6cdb74c75b..f3f245ee9f8f0 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/remote/InternalRemoteRoutingTableService.java +++ b/server/src/main/java/org/opensearch/cluster/routing/remote/InternalRemoteRoutingTableService.java @@ -57,34 +57,26 @@ public class InternalRemoteRoutingTableService extends AbstractLifecycleComponen private static final Logger logger = LogManager.getLogger(InternalRemoteRoutingTableService.class); private final Settings settings; private final Supplier repositoriesService; - private final Compressor compressor; - private final RemoteWritableEntityStore remoteIndexRoutingTableStore; + private Compressor compressor; + private RemoteWritableEntityStore remoteIndexRoutingTableStore; + private final ClusterSettings clusterSettings; private BlobStoreRepository blobStoreRepository; private final ThreadPool threadPool; + private final String clusterName; public InternalRemoteRoutingTableService( Supplier repositoriesService, Settings settings, ClusterSettings clusterSettings, ThreadPool threadpool, - Compressor compressor, - BlobStoreTransferService blobStoreTransferService, - BlobStoreRepository blobStoreRepository, String clusterName ) { assert isRemoteRoutingTableEnabled(settings) : "Remote routing table is not enabled"; this.repositoriesService = repositoriesService; this.settings = settings; this.threadPool = threadpool; - this.compressor = compressor; - this.remoteIndexRoutingTableStore = new RemoteRoutingTableBlobStore<>( - blobStoreTransferService, - blobStoreRepository, - clusterName, - threadpool, - ThreadPool.Names.REMOTE_STATE_READ, - clusterSettings - ); + this.clusterName = clusterName; + this.clusterSettings = clusterSettings; } public List getIndicesRouting(RoutingTable routingTable) { @@ -211,6 +203,16 @@ protected void doStart() { final Repository repository = repositoriesService.get().repository(remoteStoreRepo); assert repository instanceof BlobStoreRepository : "Repository should be instance of BlobStoreRepository"; blobStoreRepository = (BlobStoreRepository) repository; + compressor = blobStoreRepository.getCompressor(); + + this.remoteIndexRoutingTableStore = new RemoteRoutingTableBlobStore<>( + new BlobStoreTransferService(blobStoreRepository.blobStore(), threadPool), + blobStoreRepository, + clusterName, + threadPool, + ThreadPool.Names.REMOTE_STATE_READ, + clusterSettings + ); } @Override diff --git a/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java b/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java index 6e4108bc69d69..d319123bc2cee 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java +++ b/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java @@ -29,7 +29,7 @@ */ public interface RemoteRoutingTableService extends LifecycleComponent { public static final DiffableUtils.NonDiffableValueSerializer CUSTOM_ROUTING_TABLE_VALUE_SERIALIZER = - new DiffableUtils.NonDiffableValueSerializer<>() { + new DiffableUtils.NonDiffableValueSerializer() { @Override public void write(IndexRoutingTable value, StreamOutput out) throws IOException { value.writeTo(out); diff --git a/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceFactory.java b/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceFactory.java index cbdebcb3dea43..56dfa03215a64 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceFactory.java +++ b/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceFactory.java @@ -10,10 +10,7 @@ import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; -import org.opensearch.core.compress.Compressor; -import org.opensearch.index.translog.transfer.BlobStoreTransferService; import org.opensearch.repositories.RepositoriesService; -import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.threadpool.ThreadPool; import java.util.function.Supplier; @@ -38,22 +35,10 @@ public static RemoteRoutingTableService getService( Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool, - Compressor compressor, - BlobStoreTransferService blobStoreTransferService, - BlobStoreRepository blobStoreRepository, String clusterName ) { if (isRemoteRoutingTableEnabled(settings)) { - return new InternalRemoteRoutingTableService( - repositoriesService, - settings, - clusterSettings, - threadPool, - compressor, - blobStoreTransferService, - blobStoreRepository, - clusterName - ); + return new InternalRemoteRoutingTableService(repositoriesService, settings, clusterSettings, threadPool, clusterName); } return new NoopRemoteRoutingTableService(); } diff --git a/server/src/main/java/org/opensearch/common/remote/BlobPathParameters.java b/server/src/main/java/org/opensearch/common/remote/BlobPathParameters.java index 7d15e2875886f..58c73a804b66a 100644 --- a/server/src/main/java/org/opensearch/common/remote/BlobPathParameters.java +++ b/server/src/main/java/org/opensearch/common/remote/BlobPathParameters.java @@ -24,10 +24,6 @@ public BlobPathParameters(final List pathTokens, final String filePrefix this.filePrefix = filePrefix; } - public BlobPathParameters(final List pathTokens) { - this(pathTokens, null); - } - public List getPathTokens() { return pathTokens; } diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index f756ed1949a13..82ed6a06870a2 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -105,7 +105,6 @@ import static org.opensearch.gateway.remote.model.RemoteTemplatesMetadata.TEMPLATES_METADATA; import static org.opensearch.gateway.remote.model.RemoteTransientSettingsMetadata.TRANSIENT_SETTING_METADATA; import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_METADATA_PREFIX; -import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_TABLE; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled; /** @@ -201,6 +200,14 @@ public RemoteClusterStateService( this.isPublicationEnabled = FeatureFlags.isEnabled(REMOTE_PUBLICATION_EXPERIMENTAL) && RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled(settings) && RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled(settings); + this.remoteRoutingTableService = RemoteRoutingTableServiceFactory.getService( + repositoriesService, + settings, + clusterSettings, + threadpool, + ClusterName.CLUSTER_NAME_SETTING.get(settings).value() + ); + remoteClusterStateCleanupManager = new RemoteClusterStateCleanupManager(this, clusterService, remoteRoutingTableService); } /** @@ -657,7 +664,7 @@ UploadedMetadataResults writeMetadataInParallel( }); indicesRoutingToUpload.forEach(indexRoutingTable -> { uploadTasks.put( - String.join(CUSTOM_DELIMITER, INDEX_ROUTING_TABLE, indexRoutingTable.getIndex().getName()), + INDEX_ROUTING_METADATA_PREFIX + indexRoutingTable.getIndex().getName(), remoteRoutingTableService.getAsyncIndexRoutingWriteAction( clusterState.metadata().clusterUUID(), clusterState.term(), @@ -889,18 +896,7 @@ public void start() { assert repository instanceof BlobStoreRepository : "Repository should be instance of BlobStoreRepository"; blobStoreRepository = (BlobStoreRepository) repository; String clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings).value(); - blobStoreTransferService = new BlobStoreTransferService(getBlobStore(), threadpool); - this.remoteRoutingTableService = RemoteRoutingTableServiceFactory.getService( - repositoriesService, - settings, - clusterSettings, - threadpool, - blobStoreRepository.getCompressor(), - blobStoreTransferService, - blobStoreRepository, - ClusterName.CLUSTER_NAME_SETTING.get(settings).value() - ); remoteGlobalMetadataManager = new RemoteGlobalMetadataManager( clusterSettings, @@ -932,10 +928,9 @@ public void start() { namedWriteableRegistry, threadpool ); - remoteClusterStateCleanupManager = new RemoteClusterStateCleanupManager(this, clusterService, remoteRoutingTableService); - remoteClusterStateCleanupManager.start(); remoteRoutingTableService.start(); + remoteClusterStateCleanupManager.start(); } private void setSlowWriteLoggingThreshold(TimeValue slowWriteLoggingThreshold) { diff --git a/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceFactoryTests.java b/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceFactoryTests.java index 5fd8f087e2073..86f4b9502d6ab 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceFactoryTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceFactoryTests.java @@ -11,11 +11,7 @@ import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.FeatureFlags; -import org.opensearch.core.compress.Compressor; -import org.opensearch.core.compress.NoneCompressor; -import org.opensearch.index.translog.transfer.BlobStoreTransferService; import org.opensearch.repositories.RepositoriesService; -import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.repositories.fs.FsRepository; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.TestThreadPool; @@ -26,7 +22,6 @@ import static org.opensearch.common.util.FeatureFlags.REMOTE_PUBLICATION_EXPERIMENTAL; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY; -import static org.mockito.Mockito.mock; public class RemoteRoutingTableServiceFactoryTests extends OpenSearchTestCase { @@ -41,17 +36,11 @@ public void teardown() throws Exception { public void testGetServiceWhenRemoteRoutingDisabled() { Settings settings = Settings.builder().build(); - BlobStoreRepository blobStoreRepository = mock(BlobStoreRepository.class); - Compressor compressor = new NoneCompressor(); - BlobStoreTransferService blobStoreTransferService = mock(BlobStoreTransferService.class); RemoteRoutingTableService service = RemoteRoutingTableServiceFactory.getService( repositoriesService, settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), threadPool, - compressor, - blobStoreTransferService, - blobStoreRepository, "test-cluster" ); assertTrue(service instanceof NoopRemoteRoutingTableService); @@ -62,9 +51,6 @@ public void testGetServiceWhenRemoteRoutingEnabled() { .put("node.attr." + REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY, "routing_repository") .put(FsRepository.REPOSITORIES_COMPRESS_SETTING.getKey(), false) .build(); - BlobStoreRepository blobStoreRepository = mock(BlobStoreRepository.class); - Compressor compressor = new NoneCompressor(); - BlobStoreTransferService blobStoreTransferService = mock(BlobStoreTransferService.class); Settings nodeSettings = Settings.builder().put(REMOTE_PUBLICATION_EXPERIMENTAL, "true").build(); FeatureFlags.initializeFeatureFlags(nodeSettings); RemoteRoutingTableService service = RemoteRoutingTableServiceFactory.getService( @@ -72,9 +58,6 @@ public void testGetServiceWhenRemoteRoutingEnabled() { settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), threadPool, - compressor, - blobStoreTransferService, - blobStoreRepository, "test-cluster" ); assertTrue(service instanceof InternalRemoteRoutingTableService); diff --git a/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java b/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java index da057dfac4c5c..8a263a1ba9180 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java @@ -113,8 +113,9 @@ public void setup() { when(blobStoreRepository.getCompressor()).thenReturn(new DeflateCompressor()); blobStore = mock(BlobStore.class); blobContainer = mock(BlobContainer.class); - when(repositoriesService.repository("routing_repository")).thenReturn(blobStoreRepository); + when(repositoriesService.repository(anyString())).thenReturn(blobStoreRepository); when(blobStoreRepository.blobStore()).thenReturn(blobStore); + when(blobStore.blobContainer(any())).thenReturn(blobContainer); Settings nodeSettings = Settings.builder().put(REMOTE_PUBLICATION_EXPERIMENTAL, "true").build(); FeatureFlags.initializeFeatureFlags(nodeSettings); compressor = new NoneCompressor(); @@ -125,12 +126,9 @@ public void setup() { settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), threadPool, - compressor, - blobStoreTransferService, - blobStoreRepository, "test-cluster" ); - + remoteRoutingTableService.doStart(); } @After @@ -149,9 +147,6 @@ public void testFailInitializationWhenRemoteRoutingDisabled() { settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), threadPool, - compressor, - blobStoreTransferService, - blobStoreRepository, "test-cluster" ) ); @@ -532,7 +527,7 @@ public void testGetAsyncIndexRoutingReadAction() throws Exception { String indexName = randomAlphaOfLength(randomIntBetween(1, 50)); ClusterState clusterState = createClusterState(indexName); String uploadedFileName = String.format(Locale.ROOT, "index-routing/" + indexName); - when(blobStoreTransferService.downloadBlob(anyIterable(), anyString())).thenReturn( + when(blobContainer.readBlob(indexName)).thenReturn( INDEX_ROUTING_TABLE_FORMAT.serialize( clusterState.getRoutingTable().getIndicesRouting().get(indexName), uploadedFileName, diff --git a/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java b/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java index 34a8ab275e4da..256161af1a3e2 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java @@ -43,7 +43,7 @@ import static org.opensearch.gateway.remote.model.RemotePersistentSettingsMetadata.SETTING_METADATA; import static org.opensearch.gateway.remote.model.RemoteTemplatesMetadata.TEMPLATES_METADATA; import static org.opensearch.gateway.remote.model.RemoteTransientSettingsMetadata.TRANSIENT_SETTING_METADATA; -import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_TABLE_PREFIX; +import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_METADATA_PREFIX; public class ClusterMetadataManifestTests extends OpenSearchTestCase { @@ -545,7 +545,7 @@ public void testClusterMetadataManifestXContentV2WithoutEphemeral() throws IOExc "test-index", "test-uuid", "routing-path", - INDEX_ROUTING_TABLE_PREFIX + INDEX_ROUTING_METADATA_PREFIX ); ClusterMetadataManifest originalManifest = ClusterMetadataManifest.builder() .clusterTerm(1L) diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java index 32106ec93a7df..ebd3488d06007 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java @@ -254,7 +254,6 @@ public void setup() { List.of(new RemoteIndexPathUploader(threadPool, settings, repositoriesServiceSupplier, clusterSettings)), namedWriteableRegistry ); - remoteClusterStateService.start(); } @After @@ -2588,7 +2587,6 @@ public void testRemoteRoutingTableInitializedWhenEnabled() { List.of(new RemoteIndexPathUploader(threadPool, newSettings, repositoriesServiceSupplier, clusterSettings)), writableRegistry() ); - remoteClusterStateService.start(); assertTrue(remoteClusterStateService.getRemoteRoutingTableService() instanceof InternalRemoteRoutingTableService); } From 69ec4011664073e237c44de7397ede189d353419 Mon Sep 17 00:00:00 2001 From: Arpit Bandejiya Date: Fri, 12 Jul 2024 17:18:55 +0530 Subject: [PATCH 5/6] Address comments Signed-off-by: Arpit Bandejiya --- .../remote/RemoteClusterStateService.java | 2 - .../model/RemoteClusterStateBlobStore.java | 9 +- .../model/RemoteRoutingTableBlobStore.java | 9 ++ .../routingtable/RemoteIndexRoutingTable.java | 11 +- .../RemoteRoutingTableServiceTests.java | 17 ++- .../RemoteRoutingTableBlobStoreTests.java | 133 ++++++++++++++++++ 6 files changed, 159 insertions(+), 22 deletions(-) create mode 100644 server/src/test/java/org/opensearch/gateway/remote/model/RemoteRoutingTableBlobStoreTests.java diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index 82ed6a06870a2..5d6bb4e492509 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -154,7 +154,6 @@ public class RemoteClusterStateService implements Closeable { private RemoteClusterStateAttributesManager remoteClusterStateAttributesManager; private RemoteManifestManager remoteManifestManager; private ClusterSettings clusterSettings; - private final ClusterService clusterService; private final NamedWriteableRegistry namedWriteableRegistry; private final String CLUSTER_STATE_UPLOAD_TIME_LOG_STRING = "writing cluster state for version [{}] took [{}ms]"; private final String METADATA_UPDATE_LOG_STRING = "wrote metadata for [{}] indices and skipped [{}] unchanged " @@ -196,7 +195,6 @@ public RemoteClusterStateService( this.remoteStateStats = new RemotePersistenceStats(); this.namedWriteableRegistry = namedWriteableRegistry; this.indexMetadataUploadListeners = indexMetadataUploadListeners; - this.clusterService = clusterService; this.isPublicationEnabled = FeatureFlags.isEnabled(REMOTE_PUBLICATION_EXPERIMENTAL) && RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled(settings) && RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled(settings); diff --git a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterStateBlobStore.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterStateBlobStore.java index 7fdc7ee4196e2..cd8b8aa41ad65 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterStateBlobStore.java +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterStateBlobStore.java @@ -94,12 +94,11 @@ public String getClusterName() { return clusterName; } - public BlobPath getBasePath() { - return blobStoreRepository.basePath(); - } - public BlobPath getBlobPathPrefix(String clusterUUID) { - return getBasePath().add(RemoteClusterStateUtils.encodeString(getClusterName())).add(CLUSTER_STATE_PATH_TOKEN).add(clusterUUID); + return blobStoreRepository.basePath() + .add(RemoteClusterStateUtils.encodeString(getClusterName())) + .add(CLUSTER_STATE_PATH_TOKEN) + .add(clusterUUID); } public BlobPath getBlobPathForUpload(final AbstractRemoteWritableBlobEntity obj) { diff --git a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteRoutingTableBlobStore.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteRoutingTableBlobStore.java index c9653cbf80d9d..1e2edcc4ce3bc 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteRoutingTableBlobStore.java +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteRoutingTableBlobStore.java @@ -96,4 +96,13 @@ private void setPathHashAlgoSetting(RemoteStoreEnums.PathHashAlgorithm pathHashA this.pathHashAlgo = pathHashAlgo; } + // For testing only + public RemoteStoreEnums.PathType getPathTypeSetting() { + return pathType; + } + + // For testing only + public RemoteStoreEnums.PathHashAlgorithm getPathHashAlgoSetting() { + return pathHashAlgo; + } } diff --git a/server/src/main/java/org/opensearch/gateway/remote/routingtable/RemoteIndexRoutingTable.java b/server/src/main/java/org/opensearch/gateway/remote/routingtable/RemoteIndexRoutingTable.java index b656386c360fe..40b5bafde2b13 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/routingtable/RemoteIndexRoutingTable.java +++ b/server/src/main/java/org/opensearch/gateway/remote/routingtable/RemoteIndexRoutingTable.java @@ -30,13 +30,13 @@ public class RemoteIndexRoutingTable extends AbstractRemoteWritableBlobEntity { public static final String INDEX_ROUTING_TABLE = "index-routing"; - public static final String INDEX_ROUTING_TABLE_PREFIX = "index-routing--"; public static final String INDEX_ROUTING_METADATA_PREFIX = "indexRouting--"; public static final String INDEX_ROUTING_FILE = "index_routing"; private IndexRoutingTable indexRoutingTable; private final Index index; private long term; private long version; + private BlobPathParameters blobPathParameters; public static final ChecksumWritableBlobStoreFormat INDEX_ROUTING_TABLE_FORMAT = new ChecksumWritableBlobStoreFormat<>("index-routing-table", IndexRoutingTable::readFrom); @@ -68,13 +68,12 @@ public RemoteIndexRoutingTable(String blobName, String clusterUUID, Compressor c this.blobName = blobName; } - public Index getIndex() { - return index; - } - @Override public BlobPathParameters getBlobPathParameters() { - return new BlobPathParameters(List.of(indexRoutingTable.getIndex().getUUID()), INDEX_ROUTING_FILE); + if (blobPathParameters == null) { + blobPathParameters = new BlobPathParameters(List.of(indexRoutingTable.getIndex().getUUID()), INDEX_ROUTING_FILE); + } + return blobPathParameters; } @Override diff --git a/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java b/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java index 8a263a1ba9180..1c5cafc825aa5 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java @@ -67,7 +67,6 @@ import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_METADATA_PREFIX; import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_TABLE; import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_TABLE_FORMAT; -import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_TABLE_PREFIX; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.mockito.ArgumentMatchers.anyIterable; @@ -359,7 +358,7 @@ public void testGetAllUploadedIndicesRouting() { "test-index", "index-uuid", "index-filename", - INDEX_ROUTING_TABLE_PREFIX + INDEX_ROUTING_METADATA_PREFIX ); List allIndiceRoutingMetadata = remoteRoutingTableService @@ -374,7 +373,7 @@ public void testGetAllUploadedIndicesRoutingExistingIndexInManifest() { "test-index", "index-uuid", "index-filename", - INDEX_ROUTING_TABLE_PREFIX + INDEX_ROUTING_METADATA_PREFIX ); final ClusterMetadataManifest previousManifest = ClusterMetadataManifest.builder() .indicesRouting(List.of(uploadedIndexMetadata)) @@ -392,7 +391,7 @@ public void testGetAllUploadedIndicesRoutingNewIndexFromManifest() { "test-index", "index-uuid", "index-filename", - INDEX_ROUTING_TABLE_PREFIX + INDEX_ROUTING_METADATA_PREFIX ); final ClusterMetadataManifest previousManifest = ClusterMetadataManifest.builder() .indicesRouting(List.of(uploadedIndexMetadata)) @@ -401,7 +400,7 @@ public void testGetAllUploadedIndicesRoutingNewIndexFromManifest() { "test-index2", "index-uuid", "index-filename", - INDEX_ROUTING_TABLE_PREFIX + INDEX_ROUTING_METADATA_PREFIX ); List allIndiceRoutingMetadata = remoteRoutingTableService @@ -417,13 +416,13 @@ public void testGetAllUploadedIndicesRoutingIndexDeleted() { "test-index", "index-uuid", "index-filename", - INDEX_ROUTING_TABLE_PREFIX + INDEX_ROUTING_METADATA_PREFIX ); final ClusterMetadataManifest.UploadedIndexMetadata uploadedIndexMetadata2 = new ClusterMetadataManifest.UploadedIndexMetadata( "test-index2", "index-uuid", "index-filename", - INDEX_ROUTING_TABLE_PREFIX + INDEX_ROUTING_METADATA_PREFIX ); final ClusterMetadataManifest previousManifest = ClusterMetadataManifest.builder() .indicesRouting(List.of(uploadedIndexMetadata, uploadedIndexMetadata2)) @@ -441,13 +440,13 @@ public void testGetAllUploadedIndicesRoutingNoChange() { "test-index", "index-uuid", "index-filename", - INDEX_ROUTING_TABLE_PREFIX + INDEX_ROUTING_METADATA_PREFIX ); final ClusterMetadataManifest.UploadedIndexMetadata uploadedIndexMetadata2 = new ClusterMetadataManifest.UploadedIndexMetadata( "test-index2", "index-uuid", "index-filename", - INDEX_ROUTING_TABLE_PREFIX + INDEX_ROUTING_METADATA_PREFIX ); final ClusterMetadataManifest previousManifest = ClusterMetadataManifest.builder() .indicesRouting(List.of(uploadedIndexMetadata, uploadedIndexMetadata2)) diff --git a/server/src/test/java/org/opensearch/gateway/remote/model/RemoteRoutingTableBlobStoreTests.java b/server/src/test/java/org/opensearch/gateway/remote/model/RemoteRoutingTableBlobStoreTests.java new file mode 100644 index 0000000000000..ea0f3264cfe4f --- /dev/null +++ b/server/src/test/java/org/opensearch/gateway/remote/model/RemoteRoutingTableBlobStoreTests.java @@ -0,0 +1,133 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote.model; + +import org.opensearch.Version; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.routing.IndexRoutingTable; +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.compress.DeflateCompressor; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.index.Index; +import org.opensearch.gateway.remote.RemoteClusterStateUtils; +import org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable; +import org.opensearch.index.remote.RemoteStoreEnums; +import org.opensearch.index.remote.RemoteStorePathStrategy; +import org.opensearch.index.translog.transfer.BlobStoreTransferService; +import org.opensearch.repositories.blobstore.BlobStoreRepository; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; +import org.junit.After; +import org.junit.Before; + +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.CLUSTER_STATE_PATH_TOKEN; +import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_TABLE; +import static org.opensearch.index.remote.RemoteStoreEnums.PathHashAlgorithm.FNV_1A_BASE64; +import static org.opensearch.index.remote.RemoteStoreEnums.PathType.HASHED_PREFIX; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class RemoteRoutingTableBlobStoreTests extends OpenSearchTestCase { + + private RemoteRoutingTableBlobStore remoteIndexRoutingTableStore; + ClusterSettings clusterSettings; + ThreadPool threadPool; + + @Before + public void setup() { + clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + BlobStoreTransferService blobStoreTransferService = mock(BlobStoreTransferService.class); + BlobStoreRepository blobStoreRepository = mock(BlobStoreRepository.class); + BlobPath blobPath = new BlobPath().add("base-path"); + when(blobStoreRepository.basePath()).thenReturn(blobPath); + + threadPool = new TestThreadPool(getClass().getName()); + this.remoteIndexRoutingTableStore = new RemoteRoutingTableBlobStore<>( + blobStoreTransferService, + blobStoreRepository, + "test-cluster", + threadPool, + ThreadPool.Names.REMOTE_STATE_READ, + clusterSettings + ); + } + + @After + public void tearDown() throws Exception { + super.tearDown(); + threadPool.shutdown(); + + } + + public void testRemoteRoutingTablePathTypeSetting() { + // Assert the default is HASHED_PREFIX + assertEquals(HASHED_PREFIX.toString(), remoteIndexRoutingTableStore.getPathTypeSetting().toString()); + + Settings newSettings = Settings.builder() + .put("cluster.remote_store.routing_table.path_type", RemoteStoreEnums.PathType.FIXED.toString()) + .build(); + clusterSettings.applySettings(newSettings); + assertEquals(RemoteStoreEnums.PathType.FIXED.toString(), remoteIndexRoutingTableStore.getPathTypeSetting().toString()); + } + + public void testRemoteRoutingTableHashAlgoSetting() { + // Assert the default is FNV_1A_BASE64 + assertEquals(FNV_1A_BASE64.toString(), remoteIndexRoutingTableStore.getPathHashAlgoSetting().toString()); + + Settings newSettings = Settings.builder() + .put("cluster.remote_store.routing_table.path_hash_algo", RemoteStoreEnums.PathHashAlgorithm.FNV_1A_COMPOSITE_1.toString()) + .build(); + clusterSettings.applySettings(newSettings); + assertEquals( + RemoteStoreEnums.PathHashAlgorithm.FNV_1A_COMPOSITE_1.toString(), + remoteIndexRoutingTableStore.getPathHashAlgoSetting().toString() + ); + } + + public void testGetBlobPathForUpload() { + + Index index = new Index("test-idx", "index-uuid"); + Settings idxSettings = Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_INDEX_UUID, index.getUUID()) + .build(); + + IndexMetadata indexMetadata = new IndexMetadata.Builder(index.getName()).settings(idxSettings) + .numberOfShards(1) + .numberOfReplicas(0) + .build(); + + IndexRoutingTable indexRoutingTable = new IndexRoutingTable.Builder(index).initializeAsNew(indexMetadata).build(); + + RemoteIndexRoutingTable remoteObjectForUpload = new RemoteIndexRoutingTable( + indexRoutingTable, + "cluster-uuid", + new DeflateCompressor(), + 2L, + 3L + ); + BlobPath blobPath = remoteIndexRoutingTableStore.getBlobPathForUpload(remoteObjectForUpload); + BlobPath expectedPath = HASHED_PREFIX.path( + RemoteStorePathStrategy.PathInput.builder() + .basePath( + new BlobPath().add("base-path") + .add(RemoteClusterStateUtils.encodeString("test-cluster")) + .add(CLUSTER_STATE_PATH_TOKEN) + .add("cluster-uuid") + .add(INDEX_ROUTING_TABLE) + ) + .indexUUID(index.getUUID()) + .build(), + FNV_1A_BASE64 + ); + assertEquals(expectedPath, blobPath); + } +} From 24131d81b8e61fe8bc35a54487e2a2b007c05be9 Mon Sep 17 00:00:00 2001 From: Arpit Bandejiya Date: Mon, 15 Jul 2024 22:19:29 +0530 Subject: [PATCH 6/6] Address comments for tests Signed-off-by: Arpit Bandejiya --- .../remote/RemoteClusterStateService.java | 4 ++-- .../model/RemoteRoutingTableBlobStore.java | 4 ++-- .../RemoteRoutingTableServiceTests.java | 23 ++++++++++++++++--- 3 files changed, 24 insertions(+), 7 deletions(-) diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index 5d6bb4e492509..7e7a93e1d42ec 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -1019,10 +1019,10 @@ ClusterState readClusterStateInParallel( LatchedActionListener routingTableLatchedActionListener = new LatchedActionListener<>( ActionListener.wrap(response -> { - logger.debug("Successfully read cluster state component from remote"); + logger.debug(() -> new ParameterizedMessage("Successfully read index-routing for index {}", response.getIndex().getName())); readIndexRoutingTableResults.add(response); }, ex -> { - logger.error("Failed to read cluster state from remote", ex); + logger.error(() -> new ParameterizedMessage("Failed to read index-routing from remote"), ex); exceptionList.add(ex); }), latch diff --git a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteRoutingTableBlobStore.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteRoutingTableBlobStore.java index 1e2edcc4ce3bc..7c4a5bf2236a1 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteRoutingTableBlobStore.java +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteRoutingTableBlobStore.java @@ -97,12 +97,12 @@ private void setPathHashAlgoSetting(RemoteStoreEnums.PathHashAlgorithm pathHashA } // For testing only - public RemoteStoreEnums.PathType getPathTypeSetting() { + protected RemoteStoreEnums.PathType getPathTypeSetting() { return pathType; } // For testing only - public RemoteStoreEnums.PathHashAlgorithm getPathHashAlgoSetting() { + protected RemoteStoreEnums.PathHashAlgorithm getPathHashAlgoSetting() { return pathHashAlgo; } } diff --git a/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java b/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java index 1c5cafc825aa5..564c7f7aed304 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java @@ -33,6 +33,7 @@ import org.opensearch.core.compress.NoneCompressor; import org.opensearch.core.index.Index; import org.opensearch.gateway.remote.ClusterMetadataManifest; +import org.opensearch.gateway.remote.RemoteClusterStateUtils; import org.opensearch.index.remote.RemoteStoreEnums; import org.opensearch.index.remote.RemoteStorePathStrategy; import org.opensearch.index.remote.RemoteStoreUtils; @@ -61,15 +62,17 @@ import static org.opensearch.common.util.FeatureFlags.REMOTE_PUBLICATION_EXPERIMENTAL; import static org.opensearch.gateway.remote.ClusterMetadataManifestTests.randomUploadedIndexMetadataList; +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.CLUSTER_STATE_PATH_TOKEN; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.PATH_DELIMITER; import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_FILE; import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_METADATA_PREFIX; import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_TABLE; import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_TABLE_FORMAT; +import static org.opensearch.index.remote.RemoteStoreEnums.PathHashAlgorithm.FNV_1A_BASE64; +import static org.opensearch.index.remote.RemoteStoreEnums.PathType.HASHED_PREFIX; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY; import static org.hamcrest.Matchers.lessThanOrEqualTo; -import static org.mockito.ArgumentMatchers.anyIterable; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.any; @@ -112,7 +115,7 @@ public void setup() { when(blobStoreRepository.getCompressor()).thenReturn(new DeflateCompressor()); blobStore = mock(BlobStore.class); blobContainer = mock(BlobContainer.class); - when(repositoriesService.repository(anyString())).thenReturn(blobStoreRepository); + when(repositoriesService.repository("routing_repository")).thenReturn(blobStoreRepository); when(blobStoreRepository.blobStore()).thenReturn(blobStore); when(blobStore.blobContainer(any())).thenReturn(blobContainer); Settings nodeSettings = Settings.builder().put(REMOTE_PUBLICATION_EXPERIMENTAL, "true").build(); @@ -552,11 +555,25 @@ public void testGetAsyncIndexRoutingReadAction() throws Exception { public void testGetAsyncIndexRoutingWriteAction() throws Exception { String indexName = randomAlphaOfLength(randomIntBetween(1, 50)); ClusterState clusterState = createClusterState(indexName); + Iterable remotePath = HASHED_PREFIX.path( + RemoteStorePathStrategy.PathInput.builder() + .basePath( + new BlobPath().add("base-path") + .add(RemoteClusterStateUtils.encodeString(ClusterName.DEFAULT.toString())) + .add(CLUSTER_STATE_PATH_TOKEN) + .add(clusterState.metadata().clusterUUID()) + .add(INDEX_ROUTING_TABLE) + ) + .indexUUID(clusterState.getRoutingTable().indicesRouting().get(indexName).getIndex().getUUID()) + .build(), + FNV_1A_BASE64 + ); + doAnswer(invocationOnMock -> { invocationOnMock.getArgument(4, ActionListener.class).onResponse(null); return null; }).when(blobStoreTransferService) - .uploadBlob(any(InputStream.class), anyIterable(), anyString(), eq(WritePriority.URGENT), any(ActionListener.class)); + .uploadBlob(any(InputStream.class), eq(remotePath), anyString(), eq(WritePriority.URGENT), any(ActionListener.class)); TestCapturingListener listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1);