From 67906bdab07910e3e7966dfcc3a31c91f434c426 Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Mon, 15 Apr 2024 15:11:38 +0530 Subject: [PATCH] Incoporate PR review feedback Signed-off-by: Ashish Singh --- ...reationPreIndexMetadataUploadListener.java | 46 ++++ .../remote/RemoteClusterStateService.java | 235 +++--------------- ...RemoteUploadPathIndexCreationListener.java | 234 +++++++++++++++++ .../main/java/org/opensearch/node/Node.java | 14 +- 4 files changed, 324 insertions(+), 205 deletions(-) create mode 100644 server/src/main/java/org/opensearch/gateway/remote/IndexCreationPreIndexMetadataUploadListener.java create mode 100644 server/src/main/java/org/opensearch/index/remote/RemoteUploadPathIndexCreationListener.java diff --git a/server/src/main/java/org/opensearch/gateway/remote/IndexCreationPreIndexMetadataUploadListener.java b/server/src/main/java/org/opensearch/gateway/remote/IndexCreationPreIndexMetadataUploadListener.java new file mode 100644 index 0000000000000..8b8c0ca137079 --- /dev/null +++ b/server/src/main/java/org/opensearch/gateway/remote/IndexCreationPreIndexMetadataUploadListener.java @@ -0,0 +1,46 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote; + +import org.opensearch.cluster.metadata.IndexMetadata; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.CountDownLatch; + +/** + * Hook for running code that needs to be executed before the upload of index metadata during index creation or + * after enabling the remote cluster statement for the first time. The listener is intended to be run in parallel and + * async with the index metadata upload. + * + * @opensearch.internal + */ +public interface IndexCreationPreIndexMetadataUploadListener { + + /** + * This returns the additional count that needs to be added in the latch present in {@link RemoteClusterStateService} + * which is used to achieve parallelism and async nature of uploads for index metadata upload. The same latch is used + * for running pre index metadata upload listener. + * + * @param newIndexMetadataList list of index metadata of new indexes (or first time index metadata upload). + * @return latch count to be added by the caller. + */ + int latchCount(List newIndexMetadataList); + + /** + * This will run the pre index metadata upload listener using the {@code newIndexMetadataList}, {@code latch} and + * {@code exceptionList}. This method must execute the operation in parallel and async to ensure that the cluster state + * upload time remains the same. + * + * @param newIndexMetadataList list of index metadata of new indexes (or first time index metadata upload). + * @param latch this is used for counting down once the unit of work per index is done. + * @param exceptionList exception if any during run will be added here and used by the caller. + */ + void run(List newIndexMetadataList, CountDownLatch latch, List exceptionList) throws IOException; +} diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index 4d885e014931c..4cdb0141ae397 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -26,16 +26,11 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.io.IOUtils; -import org.opensearch.common.xcontent.XContentType; import org.opensearch.core.action.ActionListener; import org.opensearch.core.index.Index; import org.opensearch.core.xcontent.ToXContent; import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata; import org.opensearch.index.remote.RemoteIndexPath; -import org.opensearch.index.remote.RemoteStoreEnums.DataCategory; -import org.opensearch.index.remote.RemoteStoreEnums.DataType; -import org.opensearch.index.remote.RemoteStoreEnums.PathHashAlgorithm; -import org.opensearch.index.remote.RemoteStoreEnums.PathType; import org.opensearch.index.remote.RemoteStoreUtils; import org.opensearch.index.translog.transfer.BlobStoreTransferService; import org.opensearch.node.Node; @@ -70,9 +65,6 @@ import java.util.stream.Collectors; import static org.opensearch.gateway.PersistedClusterStateService.SLOW_WRITE_LOGGING_THRESHOLD; -import static org.opensearch.index.remote.RemoteIndexPath.SEGMENT_PATH; -import static org.opensearch.index.remote.RemoteIndexPath.TRANSLOG_PATH; -import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteDataAttributePresent; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled; /** @@ -175,14 +167,10 @@ public class RemoteClusterStateService implements Closeable { private final Settings settings; private final LongSupplier relativeTimeNanosSupplier; private final ThreadPool threadpool; + private final IndexCreationPreIndexMetadataUploadListener indexCreationListener; private BlobStoreRepository blobStoreRepository; private BlobStoreTransferService blobStoreTransferService; private volatile TimeValue slowWriteLoggingThreshold; - private BlobStoreRepository translogRepository; - private BlobStoreTransferService translogTransferService; - private BlobStoreRepository segmentRepository; - private BlobStoreTransferService segmentsTransferService; - private final boolean isRemoteDataAttributePresent; private volatile TimeValue indexMetadataUploadTimeout; private volatile TimeValue globalMetadataUploadTimeout; @@ -210,6 +198,18 @@ public RemoteClusterStateService( ClusterSettings clusterSettings, LongSupplier relativeTimeNanosSupplier, ThreadPool threadPool + ) { + this(nodeId, repositoriesService, settings, clusterSettings, relativeTimeNanosSupplier, threadPool, null); + } + + public RemoteClusterStateService( + String nodeId, + Supplier repositoriesService, + Settings settings, + ClusterSettings clusterSettings, + LongSupplier relativeTimeNanosSupplier, + ThreadPool threadPool, + IndexCreationPreIndexMetadataUploadListener indexCreationListener ) { assert isRemoteStoreClusterStateEnabled(settings) : "Remote cluster state is not enabled"; this.nodeId = nodeId; @@ -226,7 +226,7 @@ public RemoteClusterStateService( clusterSettings.addSettingsUpdateConsumer(GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING, this::setGlobalMetadataUploadTimeout); clusterSettings.addSettingsUpdateConsumer(METADATA_MANIFEST_UPLOAD_TIMEOUT_SETTING, this::setMetadataManifestUploadTimeout); this.remoteStateStats = new RemotePersistenceStats(); - this.isRemoteDataAttributePresent = isRemoteDataAttributePresent(settings); + this.indexCreationListener = indexCreationListener; } private BlobStoreTransferService getBlobStoreTransferService() { @@ -236,20 +236,6 @@ private BlobStoreTransferService getBlobStoreTransferService() { return blobStoreTransferService; } - private BlobStoreTransferService getTranslogTransferService() { - if (translogTransferService == null) { - translogTransferService = new BlobStoreTransferService(translogRepository.blobStore(), threadpool); - } - return translogTransferService; - } - - private BlobStoreTransferService getSegmentsTransferService() { - if (segmentsTransferService == null) { - segmentsTransferService = new BlobStoreTransferService(segmentRepository.blobStore(), threadpool); - } - return segmentsTransferService; - } - /** * This method uploads entire cluster state metadata to the configured blob store. For now only index metadata upload is supported. This method should be * invoked by the elected cluster manager when the remote cluster state is enabled. @@ -482,15 +468,15 @@ private String writeGlobalMetadata(ClusterState clusterState) throws IOException private List writeIndexMetadataParallel( ClusterState clusterState, List toUpload, - List toUploadIndexPath + List newIndexMetadataList ) throws IOException { - boolean isTranslogSegmentRepoSame = isTranslogSegmentRepoSame(); - int latchCount = toUpload.size() + (isTranslogSegmentRepoSame ? toUploadIndexPath.size() : 2 * toUploadIndexPath.size()); + assert Objects.nonNull(indexCreationListener) : "indexCreationListener can not be null"; + int latchCount = toUpload.size() + indexCreationListener.latchCount(newIndexMetadataList); List exceptionList = Collections.synchronizedList(new ArrayList<>(latchCount)); final CountDownLatch latch = new CountDownLatch(latchCount); List result = new ArrayList<>(toUpload.size()); uploadIndexMetadataAsync(clusterState, result, toUpload, latch, exceptionList); - uploadIndexPathAsync(toUploadIndexPath, latch, isTranslogSegmentRepoSame, exceptionList); + indexCreationListener.run(newIndexMetadataList, latch, exceptionList); try { if (latch.await(getIndexMetadataUploadTimeout().millis(), TimeUnit.MILLISECONDS) == false) { @@ -531,102 +517,6 @@ private List writeIndexMetadataParallel( return result; } - private void uploadIndexPathAsync( - List toUploadIndexPath, - CountDownLatch latch, - boolean isTranslogSegmentRepoSame, - List exceptionList - ) throws IOException { - for (IndexMetadata indexMetadata : toUploadIndexPath) { - writeIndexPathAsync(indexMetadata, latch, isTranslogSegmentRepoSame, exceptionList); - } - } - - private void writeIndexPathAsync( - IndexMetadata idxMD, - CountDownLatch latch, - boolean isTranslogSegmentRepoSame, - List exceptionList - ) throws IOException { - Map remoteCustomData = idxMD.getCustomData(IndexMetadata.REMOTE_STORE_CUSTOM_KEY); - PathType pathType = PathType.valueOf(remoteCustomData.get(PathType.NAME)); - PathHashAlgorithm hashAlgorithm = PathHashAlgorithm.valueOf(remoteCustomData.get(PathHashAlgorithm.NAME)); - String indexUUID = idxMD.getIndexUUID(); - int shardCount = idxMD.getNumberOfShards(); - BlobPath translogBasePath = translogRepository.basePath(); - BlobContainer translogBlobContainer = translogRepository.blobStore().blobContainer(translogBasePath.add(RemoteIndexPath.DIR)); - - if (isTranslogSegmentRepoSame) { - // If the repositories are same, then we need to upload a single file containing paths for both translog and segments. - Map> pathCreationMap = new HashMap<>(); - pathCreationMap.putAll(TRANSLOG_PATH); - pathCreationMap.putAll(SEGMENT_PATH); - REMOTE_INDEX_PATH_FORMAT.writeAsyncWithUrgentPriority( - new RemoteIndexPath(indexUUID, shardCount, translogBasePath, pathType, hashAlgorithm, pathCreationMap), - translogBlobContainer, - indexUUID, - translogRepository.getCompressor(), - getUploadPathLatchedActionListener(idxMD, latch, exceptionList, pathCreationMap), - FORMAT_PARAMS, - true, - XContentType.JSON - ); - } else { - // If the repositories are different, then we need to upload one file per segment and translog containing their individual - // paths. - REMOTE_INDEX_PATH_FORMAT.writeAsyncWithUrgentPriority( - new RemoteIndexPath(indexUUID, shardCount, translogBasePath, pathType, hashAlgorithm, TRANSLOG_PATH), - translogBlobContainer, - indexUUID, - translogRepository.getCompressor(), - getUploadPathLatchedActionListener(idxMD, latch, exceptionList, TRANSLOG_PATH), - FORMAT_PARAMS, - true, - XContentType.JSON - ); - - BlobPath segmentBasePath = segmentRepository.basePath(); - BlobContainer segmentBlobContainer = segmentRepository.blobStore().blobContainer(segmentBasePath.add(RemoteIndexPath.DIR)); - REMOTE_INDEX_PATH_FORMAT.writeAsyncWithUrgentPriority( - new RemoteIndexPath(indexUUID, shardCount, segmentBasePath, pathType, hashAlgorithm, SEGMENT_PATH), - segmentBlobContainer, - indexUUID, - segmentRepository.getCompressor(), - getUploadPathLatchedActionListener(idxMD, latch, exceptionList, SEGMENT_PATH), - FORMAT_PARAMS, - true, - XContentType.JSON - ); - } - } - - private LatchedActionListener getUploadPathLatchedActionListener( - IndexMetadata indexMetadata, - CountDownLatch latch, - List exceptionList, - Map> pathCreationMap - ) { - return new LatchedActionListener<>( - ActionListener.wrap( - resp -> logger.trace( - new ParameterizedMessage("Index path uploaded for {} indexMetadata={}", pathCreationMap, indexMetadata) - ), - ex -> { - logger.error( - new ParameterizedMessage( - "Exception during Index path upload for {} indexMetadata={}", - pathCreationMap, - indexMetadata - ), - ex - ); - exceptionList.add(ex); - } - ), - latch - ); - } - private void uploadIndexMetadataAsync( ClusterState clusterState, List result, @@ -657,26 +547,16 @@ private void uploadIndexMetadataAsync( } } - private boolean isTranslogSegmentRepoSame() { - String translogRepoName = settings.get( - Node.NODE_ATTRIBUTES.getKey() + RemoteStoreNodeAttribute.REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY - ); - String segmentRepoName = settings.get( - Node.NODE_ATTRIBUTES.getKey() + RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY - ); - return Objects.equals(translogRepoName, segmentRepoName); - } - private List writeIndexMetadataParallel( ClusterState clusterState, List toUpload, String previousClusterUUID ) throws IOException { - List toUploadIndexPath = Collections.emptyList(); + List newIndexMetadaList = Collections.emptyList(); if (ClusterState.UNKNOWN_UUID.equals(previousClusterUUID)) { - toUploadIndexPath = toUpload; + newIndexMetadaList = toUpload; } - return writeIndexMetadataParallel(clusterState, toUpload, toUploadIndexPath); + return writeIndexMetadataParallel(clusterState, toUpload, newIndexMetadaList); } private List writeIndexMetadataParallel( @@ -684,35 +564,11 @@ private List writeIndexMetadataParallel( List toUpload, Map indexNamePreviousVersionMap ) throws IOException { - List toUploadIndexPath = Collections.emptyList(); - if (isRemoteDataAttributePresent) { - toUploadIndexPath = toUpload.stream() - /* If the previous state's index metadata version is null, then this is index creation */ - .filter(indexMetadata -> Objects.isNull(indexNamePreviousVersionMap.get(indexMetadata.getIndex().getName()))) - /* Checks the condition if the Index path needs to be uploaded or not */ - .filter(this::uploadIndexPathFile) - .collect(Collectors.toList()); - } - return writeIndexMetadataParallel(clusterState, toUpload, toUploadIndexPath); - } - - /** - * This method checks if the index metadata has attributes that calls for uploading the index path for remote store - * uploads. It checks if the remote store path type is {@code HASHED_PREFIX} and returns true if so. - */ - private boolean uploadIndexPathFile(IndexMetadata indexMetadata) { - assert isRemoteDataAttributePresent : "Remote data attributes is expected to be present"; - // A cluster will have remote custom metadata only if the cluster is remote store enabled from data side. - Map remoteCustomData = indexMetadata.getCustomData(IndexMetadata.REMOTE_STORE_CUSTOM_KEY); - if (Objects.isNull(remoteCustomData) || remoteCustomData.isEmpty()) { - return false; - } - String pathTypeStr = remoteCustomData.get(PathType.NAME); - if (Objects.isNull(pathTypeStr)) { - return false; - } - // We need to upload the path only if the path type for an index is hashed_prefix - return PathType.HASHED_PREFIX == PathType.parseString(pathTypeStr); + List newIndexMetadataList = toUpload.stream() + /* If the previous state's index metadata version is null, then this is index creation */ + .filter(indexMetadata -> Objects.isNull(indexNamePreviousVersionMap.get(indexMetadata.getIndex().getName()))) + .collect(Collectors.toList()); + return writeIndexMetadataParallel(clusterState, toUpload, newIndexMetadataList); } /** @@ -783,32 +639,13 @@ public void close() throws IOException { public void start() { assert isRemoteStoreClusterStateEnabled(settings) == true : "Remote cluster state is not enabled"; - blobStoreRepository = (BlobStoreRepository) validateAndGetRepository( - Node.NODE_ATTRIBUTES.getKey() + RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY, - "Cluster State" + final String remoteStoreRepo = settings.get( + Node.NODE_ATTRIBUTES.getKey() + RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY ); - - if (isRemoteDataAttributePresent == false) { - // If remote store data attributes are not present than we skip this. - return; - } - translogRepository = (BlobStoreRepository) validateAndGetRepository( - Node.NODE_ATTRIBUTES.getKey() + RemoteStoreNodeAttribute.REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY, - "Translog" - ); - segmentRepository = (BlobStoreRepository) validateAndGetRepository( - Node.NODE_ATTRIBUTES.getKey() + RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY, - "Translog" - ); - - } - - private Repository validateAndGetRepository(String repoSetting, String repoName) { - final String repo = settings.get(repoSetting); - assert repo != null : "Remote " + repoName + " repository is not configured"; - final Repository repository = repositoriesService.get().repository(repo); + assert remoteStoreRepo != null : "Remote Cluster State repository is not configured"; + final Repository repository = repositoriesService.get().repository(remoteStoreRepo); assert repository instanceof BlobStoreRepository : "Repository should be instance of BlobStoreRepository"; - return repository; + blobStoreRepository = (BlobStoreRepository) repository; } private ClusterMetadataManifest uploadManifest( @@ -887,16 +724,6 @@ private void writeMetadataManifest(String clusterName, String clusterUUID, Clust ); } - private String fetchPreviousClusterUUID(String clusterName, String clusterUUID) { - final Optional latestManifest = getLatestClusterMetadataManifest(clusterName, clusterUUID); - if (!latestManifest.isPresent()) { - final String previousClusterUUID = getLastKnownUUIDFromRemote(clusterName); - assert !clusterUUID.equals(previousClusterUUID) : "Last cluster UUID is same current cluster UUID"; - return previousClusterUUID; - } - return latestManifest.get().getPreviousClusterUUID(); - } - private BlobContainer indexMetadataContainer(String clusterName, String clusterUUID, String indexUUID) { // 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/index/ftqsCnn9TgOX return blobStoreRepository.blobStore() diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteUploadPathIndexCreationListener.java b/server/src/main/java/org/opensearch/index/remote/RemoteUploadPathIndexCreationListener.java new file mode 100644 index 0000000000000..29868b1e0ecc5 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/remote/RemoteUploadPathIndexCreationListener.java @@ -0,0 +1,234 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.remote; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.opensearch.action.LatchedActionListener; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.blobstore.BlobContainer; +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.core.action.ActionListener; +import org.opensearch.gateway.remote.IndexCreationPreIndexMetadataUploadListener; +import org.opensearch.gateway.remote.RemoteClusterStateService; +import org.opensearch.node.Node; +import org.opensearch.node.remotestore.RemoteStoreNodeAttribute; +import org.opensearch.repositories.RepositoriesService; +import org.opensearch.repositories.Repository; +import org.opensearch.repositories.blobstore.BlobStoreRepository; +import org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.CountDownLatch; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +import static org.opensearch.index.remote.RemoteIndexPath.SEGMENT_PATH; +import static org.opensearch.index.remote.RemoteIndexPath.TRANSLOG_PATH; +import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteDataAttributePresent; +import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled; + +/** + * Uploads the remote store path for all possible combinations of {@link org.opensearch.index.remote.RemoteStoreEnums.DataCategory} + * and {@link org.opensearch.index.remote.RemoteStoreEnums.DataType} for each shard of an index. + */ +public class RemoteUploadPathIndexCreationListener implements IndexCreationPreIndexMetadataUploadListener { + + public static final ChecksumBlobStoreFormat REMOTE_INDEX_PATH_FORMAT = new ChecksumBlobStoreFormat<>( + "remote-index-path", + RemoteIndexPath.FILE_NAME_FORMAT, + RemoteIndexPath::fromXContent + ); + + private static final Logger logger = LogManager.getLogger(RemoteUploadPathIndexCreationListener.class); + + private final Settings settings; + private final boolean isRemoteDataAttributePresent; + private final boolean isTranslogSegmentRepoSame; + private final Supplier repositoriesService; + + private BlobStoreRepository translogRepository; + private BlobStoreRepository segmentRepository; + + public RemoteUploadPathIndexCreationListener(Settings settings, Supplier repositoriesService) { + this.settings = settings; + this.repositoriesService = repositoriesService; + isRemoteDataAttributePresent = isRemoteDataAttributePresent(settings); + // If the remote data attributes are not present, then there is no effect of translog and segment being same or different or null. + isTranslogSegmentRepoSame = isTranslogSegmentRepoSame(); + } + + @Override + public int latchCount(List newIndexMetadataList) { + if (isRemoteDataAttributePresent == false) { + return 0; + } + int eligibleIndexCount = (int) newIndexMetadataList.stream().filter(this::uploadIndexPathFile).count(); + return isTranslogSegmentRepoSame ? eligibleIndexCount : 2 * eligibleIndexCount; + } + + @Override + public void run(List newIndexMetadataList, CountDownLatch latch, List exceptionList) throws IOException { + if (isRemoteDataAttributePresent == false) { + return; + } + List elibibleIndexMetadaList = newIndexMetadataList.stream() + .filter(this::uploadIndexPathFile) + .collect(Collectors.toList()); + if (isTranslogSegmentRepoSame) { + assert latchCount(newIndexMetadataList) == elibibleIndexMetadaList.size() + : "Latch count is not equal to elibibleIndexMetadaList's size for path upload"; + } else { + assert latchCount(newIndexMetadataList) == 2 * elibibleIndexMetadaList.size() + : "Latch count is not equal to (2 * elibibleIndexMetadaList's size) for path upload"; + } + for (IndexMetadata indexMetadata : elibibleIndexMetadaList) { + writeIndexPathAsync(indexMetadata, latch, exceptionList); + } + } + + private void writeIndexPathAsync(IndexMetadata idxMD, CountDownLatch latch, List exceptionList) throws IOException { + Map remoteCustomData = idxMD.getCustomData(IndexMetadata.REMOTE_STORE_CUSTOM_KEY); + RemoteStoreEnums.PathType pathType = RemoteStoreEnums.PathType.valueOf(remoteCustomData.get(RemoteStoreEnums.PathType.NAME)); + RemoteStoreEnums.PathHashAlgorithm hashAlgorithm = RemoteStoreEnums.PathHashAlgorithm.valueOf( + remoteCustomData.get(RemoteStoreEnums.PathHashAlgorithm.NAME) + ); + String indexUUID = idxMD.getIndexUUID(); + int shardCount = idxMD.getNumberOfShards(); + BlobPath translogBasePath = translogRepository.basePath(); + BlobContainer translogBlobContainer = translogRepository.blobStore().blobContainer(translogBasePath.add(RemoteIndexPath.DIR)); + + if (isTranslogSegmentRepoSame) { + // If the repositories are same, then we need to upload a single file containing paths for both translog and segments. + Map> pathCreationMap = new HashMap<>(); + pathCreationMap.putAll(TRANSLOG_PATH); + pathCreationMap.putAll(SEGMENT_PATH); + REMOTE_INDEX_PATH_FORMAT.writeAsyncWithUrgentPriority( + new RemoteIndexPath(indexUUID, shardCount, translogBasePath, pathType, hashAlgorithm, pathCreationMap), + translogBlobContainer, + indexUUID, + translogRepository.getCompressor(), + getUploadPathLatchedActionListener(idxMD, latch, exceptionList, pathCreationMap), + RemoteClusterStateService.FORMAT_PARAMS, + true, + XContentType.JSON + ); + } else { + // If the repositories are different, then we need to upload one file per segment and translog containing their individual + // paths. + REMOTE_INDEX_PATH_FORMAT.writeAsyncWithUrgentPriority( + new RemoteIndexPath(indexUUID, shardCount, translogBasePath, pathType, hashAlgorithm, TRANSLOG_PATH), + translogBlobContainer, + indexUUID, + translogRepository.getCompressor(), + getUploadPathLatchedActionListener(idxMD, latch, exceptionList, TRANSLOG_PATH), + RemoteClusterStateService.FORMAT_PARAMS, + true, + XContentType.JSON + ); + + BlobPath segmentBasePath = segmentRepository.basePath(); + BlobContainer segmentBlobContainer = segmentRepository.blobStore().blobContainer(segmentBasePath.add(RemoteIndexPath.DIR)); + REMOTE_INDEX_PATH_FORMAT.writeAsyncWithUrgentPriority( + new RemoteIndexPath(indexUUID, shardCount, segmentBasePath, pathType, hashAlgorithm, SEGMENT_PATH), + segmentBlobContainer, + indexUUID, + segmentRepository.getCompressor(), + getUploadPathLatchedActionListener(idxMD, latch, exceptionList, SEGMENT_PATH), + RemoteClusterStateService.FORMAT_PARAMS, + true, + XContentType.JSON + ); + } + } + + private Repository validateAndGetRepository(String repoSetting) { + final String repo = settings.get(repoSetting); + assert repo != null : "Remote " + repoSetting + " repository is not configured"; + final Repository repository = repositoriesService.get().repository(repo); + assert repository instanceof BlobStoreRepository : "Repository should be instance of BlobStoreRepository"; + return repository; + } + + public void start() { + assert isRemoteStoreClusterStateEnabled(settings) == true : "Remote cluster state is not enabled"; + if (isRemoteDataAttributePresent == false) { + // If remote store data attributes are not present than we skip this. + return; + } + translogRepository = (BlobStoreRepository) validateAndGetRepository( + Node.NODE_ATTRIBUTES.getKey() + RemoteStoreNodeAttribute.REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY + ); + segmentRepository = (BlobStoreRepository) validateAndGetRepository( + Node.NODE_ATTRIBUTES.getKey() + RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY + ); + } + + private boolean isTranslogSegmentRepoSame() { + String translogRepoName = settings.get( + Node.NODE_ATTRIBUTES.getKey() + RemoteStoreNodeAttribute.REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY + ); + String segmentRepoName = settings.get( + Node.NODE_ATTRIBUTES.getKey() + RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY + ); + return Objects.equals(translogRepoName, segmentRepoName); + } + + private LatchedActionListener getUploadPathLatchedActionListener( + IndexMetadata indexMetadata, + CountDownLatch latch, + List exceptionList, + Map> pathCreationMap + ) { + return new LatchedActionListener<>( + ActionListener.wrap( + resp -> logger.trace( + new ParameterizedMessage("Index path uploaded for {} indexMetadata={}", pathCreationMap, indexMetadata) + ), + ex -> { + logger.error( + new ParameterizedMessage( + "Exception during Index path upload for {} indexMetadata={}", + pathCreationMap, + indexMetadata + ), + ex + ); + exceptionList.add(ex); + } + ), + latch + ); + } + + /** + * This method checks if the index metadata has attributes that calls for uploading the index path for remote store + * uploads. It checks if the remote store path type is {@code HASHED_PREFIX} and returns true if so. + */ + private boolean uploadIndexPathFile(IndexMetadata indexMetadata) { + // A cluster will have remote custom metadata only if the cluster is remote store enabled from data side. + Map remoteCustomData = indexMetadata.getCustomData(IndexMetadata.REMOTE_STORE_CUSTOM_KEY); + if (Objects.isNull(remoteCustomData) || remoteCustomData.isEmpty()) { + return false; + } + String pathTypeStr = remoteCustomData.get(RemoteStoreEnums.PathType.NAME); + if (Objects.isNull(pathTypeStr)) { + return false; + } + // We need to upload the path only if the path type for an index is hashed_prefix + return RemoteStoreEnums.PathType.HASHED_PREFIX == RemoteStoreEnums.PathType.parseString(pathTypeStr); + } +} diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 7fa2b6c8ff497..028c495523591 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -147,6 +147,7 @@ import org.opensearch.index.engine.EngineFactory; import org.opensearch.index.recovery.RemoteStoreRestoreService; import org.opensearch.index.remote.RemoteStoreStatsTrackerFactory; +import org.opensearch.index.remote.RemoteUploadPathIndexCreationListener; import org.opensearch.index.store.RemoteSegmentStoreDirectoryFactory; import org.opensearch.index.store.remote.filecache.FileCache; import org.opensearch.index.store.remote.filecache.FileCacheCleaner; @@ -726,17 +727,21 @@ protected Node( threadPool::relativeTimeInMillis ); final RemoteClusterStateService remoteClusterStateService; + final RemoteUploadPathIndexCreationListener indexCreationListener; if (isRemoteStoreClusterStateEnabled(settings)) { + indexCreationListener = new RemoteUploadPathIndexCreationListener(settings, repositoriesServiceReference::get); remoteClusterStateService = new RemoteClusterStateService( nodeEnvironment.nodeId(), repositoriesServiceReference::get, settings, clusterService.getClusterSettings(), threadPool::preciseRelativeTimeInNanos, - threadPool + threadPool, + indexCreationListener ); } else { remoteClusterStateService = null; + indexCreationListener = null; } // collect engine factory providers from plugins @@ -1312,6 +1317,7 @@ protected Node( b.bind(SearchRequestSlowLog.class).toInstance(searchRequestSlowLog); b.bind(MetricsRegistry.class).toInstance(metricsRegistry); b.bind(RemoteClusterStateService.class).toProvider(() -> remoteClusterStateService); + b.bind(RemoteUploadPathIndexCreationListener.class).toProvider(() -> indexCreationListener); b.bind(PersistedStateRegistry.class).toInstance(persistedStateRegistry); b.bind(SegmentReplicationStatsTracker.class).toInstance(segmentReplicationStatsTracker); b.bind(SearchRequestOperationsCompositeListenerFactory.class).toInstance(searchRequestOperationsCompositeListenerFactory); @@ -1461,6 +1467,12 @@ public Node start() throws NodeValidationException { if (remoteClusterStateService != null) { remoteClusterStateService.start(); } + final RemoteUploadPathIndexCreationListener indexCreationListener = injector.getInstance( + RemoteUploadPathIndexCreationListener.class + ); + if (indexCreationListener != null) { + indexCreationListener.start(); + } // Load (and maybe upgrade) the metadata stored on disk final GatewayMetaState gatewayMetaState = injector.getInstance(GatewayMetaState.class); gatewayMetaState.start(