diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreUploadIndexPathIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreUploadIndexPathIT.java index c39cec96aa476..c9c3e434517c2 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreUploadIndexPathIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreUploadIndexPathIT.java @@ -13,6 +13,7 @@ import org.opensearch.common.settings.Settings; import org.opensearch.core.util.FileSystemUtils; import org.opensearch.index.remote.RemoteIndexPath; +import org.opensearch.index.remote.RemoteIndexPathUploader; import org.opensearch.index.remote.RemoteStoreEnums; import org.opensearch.test.OpenSearchIntegTestCase; @@ -87,21 +88,20 @@ private void validateRemoteIndexPathFile(boolean exists) { .prepareGetSettings(INDEX_NAME) .get() .getSetting(INDEX_NAME, IndexMetadata.SETTING_INDEX_UUID); - + String fileName = RemoteIndexPathUploader.generateFileName(indexUUID, 2L, RemoteIndexPath.DEFAULT_VERSION); assertEquals(exists, FileSystemUtils.exists(translogRepoPath.resolve(RemoteIndexPath.DIR))); assertEquals( exists, FileSystemUtils.exists( translogRepoPath.resolve(RemoteIndexPath.DIR) - .resolve(String.format(Locale.ROOT, RemoteIndexPath.FILE_NAME_FORMAT, indexUUID)) + .resolve(String.format(Locale.ROOT, RemoteIndexPath.FILE_NAME_FORMAT, fileName)) ) ); assertEquals(exists, FileSystemUtils.exists(segmentRepoPath.resolve(RemoteIndexPath.DIR))); assertEquals( exists, FileSystemUtils.exists( - segmentRepoPath.resolve(RemoteIndexPath.DIR) - .resolve(String.format(Locale.ROOT, RemoteIndexPath.FILE_NAME_FORMAT, indexUUID)) + segmentRepoPath.resolve(RemoteIndexPath.DIR).resolve(String.format(Locale.ROOT, RemoteIndexPath.FILE_NAME_FORMAT, fileName)) ) ); } diff --git a/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java b/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java index 80b78cfe154f1..61f4624b123ca 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java @@ -623,7 +623,7 @@ public static APIBlock readFrom(StreamInput input) throws IOException { ); public static final String KEY_IN_SYNC_ALLOCATIONS = "in_sync_allocations"; - static final String KEY_VERSION = "version"; + public static final String KEY_VERSION = "version"; static final String KEY_MAPPING_VERSION = "mapping_version"; static final String KEY_SETTINGS_VERSION = "settings_version"; static final String KEY_ALIASES_VERSION = "aliases_version"; diff --git a/server/src/main/java/org/opensearch/gateway/remote/IndexMetadataUploadListener.java b/server/src/main/java/org/opensearch/gateway/remote/IndexMetadataUploadListener.java index f9158c9260747..7d5fe8140e1e1 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/IndexMetadataUploadListener.java +++ b/server/src/main/java/org/opensearch/gateway/remote/IndexMetadataUploadListener.java @@ -13,6 +13,7 @@ import org.opensearch.threadpool.ThreadPool; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.concurrent.ExecutorService; @@ -41,9 +42,17 @@ public IndexMetadataUploadListener(ThreadPool threadPool, String threadPoolName) * @param indexMetadataList list of index metadata of new indexes (or first time index metadata upload). * @param actionListener listener to be invoked on success or failure. */ - public final void onNewIndexUpload(List indexMetadataList, ActionListener actionListener) { - executorService.execute(() -> doOnNewIndexUpload(indexMetadataList, actionListener)); + public final void onUpload( + List indexMetadataList, + Map prevIndexMetadataByName, + ActionListener actionListener + ) { + executorService.execute(() -> doOnUpload(indexMetadataList, prevIndexMetadataByName, actionListener)); } - protected abstract void doOnNewIndexUpload(List indexMetadataList, ActionListener actionListener); + protected abstract void doOnUpload( + List indexMetadataList, + Map prevIndexMetadataByName, + ActionListener actionListener + ); } diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index d2f927c827e5b..93edc7578d1e5 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -242,7 +242,7 @@ public ClusterMetadataManifest writeFullMetadata(ClusterState clusterState, Stri final List allUploadedIndexMetadata = writeIndexMetadataParallel( clusterState, toUpload, - ClusterState.UNKNOWN_UUID.equals(previousClusterUUID) ? toUpload : Collections.emptyList() + Collections.emptyMap() ); final ClusterMetadataManifest manifest = uploadManifest( clusterState, @@ -307,9 +307,9 @@ public ClusterMetadataManifest writeIncrementalMetadata( } // Write Index Metadata - final Map previousStateIndexMetadataVersionByName = new HashMap<>(); + final Map previousStateIndexMetadataByName = new HashMap<>(); for (final IndexMetadata indexMetadata : previousClusterState.metadata().indices().values()) { - previousStateIndexMetadataVersionByName.put(indexMetadata.getIndex().getName(), indexMetadata.getVersion()); + previousStateIndexMetadataByName.put(indexMetadata.getIndex().getName(), indexMetadata); } int numIndicesUpdated = 0; @@ -319,9 +319,12 @@ public ClusterMetadataManifest writeIncrementalMetadata( .collect(Collectors.toMap(UploadedIndexMetadata::getIndexName, Function.identity())); List toUpload = new ArrayList<>(); - List newIndexMetadataList = new ArrayList<>(); + // We prepare a map that contains the previous index metadata for the indexes for which version has changed. + Map prevIndexMetadataByName = new HashMap<>(); for (final IndexMetadata indexMetadata : clusterState.metadata().indices().values()) { - final Long previousVersion = previousStateIndexMetadataVersionByName.get(indexMetadata.getIndex().getName()); + String indexName = indexMetadata.getIndex().getName(); + final IndexMetadata prevIndexMetadata = previousStateIndexMetadataByName.get(indexName); + Long previousVersion = prevIndexMetadata != null ? prevIndexMetadata.getVersion() : null; if (previousVersion == null || indexMetadata.getVersion() != previousVersion) { logger.debug( "updating metadata for [{}], changing version from [{}] to [{}]", @@ -331,22 +334,19 @@ public ClusterMetadataManifest writeIncrementalMetadata( ); numIndicesUpdated++; toUpload.add(indexMetadata); + prevIndexMetadataByName.put(indexName, prevIndexMetadata); } else { numIndicesUnchanged++; } - previousStateIndexMetadataVersionByName.remove(indexMetadata.getIndex().getName()); - // Adding the indexMetadata to newIndexMetadataList if there is no previous version present for the index. - if (previousVersion == null) { - newIndexMetadataList.add(indexMetadata); - } + previousStateIndexMetadataByName.remove(indexMetadata.getIndex().getName()); } - List uploadedIndexMetadataList = writeIndexMetadataParallel(clusterState, toUpload, newIndexMetadataList); + List uploadedIndexMetadataList = writeIndexMetadataParallel(clusterState, toUpload, prevIndexMetadataByName); uploadedIndexMetadataList.forEach( uploadedIndexMetadata -> allUploadedIndexMetadata.put(uploadedIndexMetadata.getIndexName(), uploadedIndexMetadata) ); - for (String removedIndexName : previousStateIndexMetadataVersionByName.keySet()) { + for (String removedIndexName : previousStateIndexMetadataByName.keySet()) { allUploadedIndexMetadata.remove(removedIndexName); } final ClusterMetadataManifest manifest = uploadManifest( @@ -452,7 +452,7 @@ private String writeGlobalMetadata(ClusterState clusterState) throws IOException private List writeIndexMetadataParallel( ClusterState clusterState, List toUpload, - List newIndexMetadataList + Map prevIndexMetadataByName ) throws IOException { assert Objects.nonNull(indexMetadataUploadListeners) : "indexMetadataUploadListeners can not be null"; int latchCount = toUpload.size() + indexMetadataUploadListeners.size(); @@ -482,7 +482,7 @@ private List writeIndexMetadataParallel( writeIndexMetadataAsync(clusterState, indexMetadata, latchedActionListener); } - invokeIndexMetadataUploadListeners(newIndexMetadataList, latch, exceptionList); + invokeIndexMetadataUploadListeners(toUpload, prevIndexMetadataByName, latch, exceptionList); try { if (latch.await(getIndexMetadataUploadTimeout().millis(), TimeUnit.MILLISECONDS) == false) { @@ -527,15 +527,17 @@ private List writeIndexMetadataParallel( * Invokes the index metadata upload listener but does not wait for the execution to complete. */ private void invokeIndexMetadataUploadListeners( - List newIndexMetadataList, + List udpatedIndexMetadataList, + Map prevIndexMetadataByName, CountDownLatch latch, List exceptionList ) { for (IndexMetadataUploadListener listener : indexMetadataUploadListeners) { String listenerName = listener.getClass().getSimpleName(); - listener.onNewIndexUpload( - newIndexMetadataList, - getIndexMetadataUploadActionListener(newIndexMetadataList, latch, exceptionList, listenerName) + listener.onUpload( + udpatedIndexMetadataList, + prevIndexMetadataByName, + getIndexMetadataUploadActionListener(udpatedIndexMetadataList, prevIndexMetadataByName, latch, exceptionList, listenerName) ); } @@ -543,6 +545,7 @@ private void invokeIndexMetadataUploadListeners( private ActionListener getIndexMetadataUploadActionListener( List newIndexMetadataList, + Map prevIndexMetadataByName, CountDownLatch latch, List exceptionList, String listenerName @@ -552,18 +555,21 @@ private ActionListener getIndexMetadataUploadActionListener( ActionListener.wrap( ignored -> logger.trace( new ParameterizedMessage( - "{} : Invoked listener={} successfully tookTimeNs={}", + "listener={} : Invoked successfully with indexMetadataList={} prevIndexMetadataList={} tookTimeNs={}", listenerName, newIndexMetadataList, + prevIndexMetadataByName.values(), (System.nanoTime() - startTime) ) ), ex -> { logger.error( new ParameterizedMessage( - "{} : Exception during invocation of listener={} tookTimeNs={}", + "listener={} : Exception during invocation with indexMetadataList={} prevIndexMetadataList={} tookTimeNs={}", listenerName, newIndexMetadataList, + prevIndexMetadataByName, + prevIndexMetadataByName.values(), (System.nanoTime() - startTime) ), ex diff --git a/server/src/main/java/org/opensearch/index/IndexSettings.java b/server/src/main/java/org/opensearch/index/IndexSettings.java index 388de65ca58a1..6a2d22e459536 100644 --- a/server/src/main/java/org/opensearch/index/IndexSettings.java +++ b/server/src/main/java/org/opensearch/index/IndexSettings.java @@ -48,9 +48,8 @@ import org.opensearch.core.common.unit.ByteSizeUnit; import org.opensearch.core.common.unit.ByteSizeValue; import org.opensearch.core.index.Index; -import org.opensearch.index.remote.RemoteStoreEnums.PathHashAlgorithm; -import org.opensearch.index.remote.RemoteStoreEnums.PathType; import org.opensearch.index.remote.RemoteStorePathStrategy; +import org.opensearch.index.remote.RemoteStoreUtils; import org.opensearch.index.translog.Translog; import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.ingest.IngestService; @@ -62,8 +61,6 @@ import java.util.Collections; import java.util.List; import java.util.Locale; -import java.util.Map; -import java.util.Objects; import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; @@ -990,7 +987,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti */ widenIndexSortType = IndexMetadata.SETTING_INDEX_VERSION_CREATED.get(settings).before(V_2_7_0); assignedOnRemoteNode = RemoteStoreNodeAttribute.isRemoteDataAttributePresent(this.getNodeSettings()); - remoteStorePathStrategy = determineRemoteStorePathStrategy(); + remoteStorePathStrategy = RemoteStoreUtils.determineRemoteStorePathStrategy(indexMetadata, true); setEnableFuzzySetForDocId(scopedSettings.get(INDEX_DOC_ID_FUZZY_SET_ENABLED_SETTING)); setDocIdFuzzySetFalsePositiveProbability(scopedSettings.get(INDEX_DOC_ID_FUZZY_SET_FALSE_POSITIVE_PROBABILITY_SETTING)); @@ -1911,18 +1908,6 @@ public void setDocIdFuzzySetFalsePositiveProbability(double docIdFuzzySetFalsePo this.docIdFuzzySetFalsePositiveProbability = docIdFuzzySetFalsePositiveProbability; } - private RemoteStorePathStrategy determineRemoteStorePathStrategy() { - Map remoteCustomData = indexMetadata.getCustomData(IndexMetadata.REMOTE_STORE_CUSTOM_KEY); - assert remoteCustomData == null || remoteCustomData.containsKey(PathType.NAME); - if (remoteCustomData != null && remoteCustomData.containsKey(PathType.NAME)) { - PathType pathType = PathType.parseString(remoteCustomData.get(PathType.NAME)); - String hashAlgoStr = remoteCustomData.get(PathHashAlgorithm.NAME); - PathHashAlgorithm hashAlgorithm = Objects.nonNull(hashAlgoStr) ? PathHashAlgorithm.parseString(hashAlgoStr) : null; - return new RemoteStorePathStrategy(pathType, hashAlgorithm); - } - return new RemoteStorePathStrategy(PathType.FIXED); - } - public RemoteStorePathStrategy getRemoteStorePathStrategy() { return remoteStorePathStrategy; } diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteIndexPath.java b/server/src/main/java/org/opensearch/index/remote/RemoteIndexPath.java index 68cf6923bcf45..89b642b79df86 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteIndexPath.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteIndexPath.java @@ -52,7 +52,7 @@ public class RemoteIndexPath implements ToXContentFragment { combinedPath.putAll(SEGMENT_PATH); COMBINED_PATH = Collections.unmodifiableMap(combinedPath); } - private static final String DEFAULT_VERSION = "1"; + public static final String DEFAULT_VERSION = "1"; public static final String DIR = "remote-index-path"; public static final String FILE_NAME_FORMAT = "remote_path_%s"; static final String KEY_VERSION = "version"; @@ -60,6 +60,8 @@ public class RemoteIndexPath implements ToXContentFragment { static final String KEY_SHARD_COUNT = "shard_count"; static final String KEY_PATH_CREATION_MAP = "path_creation_map"; static final String KEY_PATHS = "paths"; + + private final String version; private final String indexUUID; private final int shardCount; private final Iterable basePath; @@ -109,6 +111,7 @@ public RemoteIndexPath( .getFormattedMessage() ); } + this.version = DEFAULT_VERSION; this.indexUUID = indexUUID; this.shardCount = shardCount; this.basePath = basePath; @@ -119,7 +122,7 @@ public RemoteIndexPath( @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.field(KEY_VERSION, DEFAULT_VERSION); + builder.field(KEY_VERSION, version); builder.field(KEY_INDEX_UUID, indexUUID); builder.field(KEY_SHARD_COUNT, shardCount); builder.field(PathType.NAME, pathType.name()); @@ -156,4 +159,8 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws public static RemoteIndexPath fromXContent(XContentParser ignored) { throw new UnsupportedOperationException("RemoteIndexPath.fromXContent() is not supported"); } + + String getVersion() { + return version; + } } diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteIndexPathUploader.java b/server/src/main/java/org/opensearch/index/remote/RemoteIndexPathUploader.java index 1ac7e41014d23..5cdd165f547c0 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteIndexPathUploader.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteIndexPathUploader.java @@ -23,6 +23,7 @@ import org.opensearch.core.index.Index; import org.opensearch.gateway.remote.IndexMetadataUploadListener; import org.opensearch.gateway.remote.RemoteClusterStateService.RemoteStateTransferException; +import org.opensearch.index.remote.RemoteStoreEnums.PathType; import org.opensearch.node.Node; import org.opensearch.node.remotestore.RemoteStoreNodeAttribute; import org.opensearch.repositories.RepositoriesService; @@ -47,6 +48,7 @@ import static org.opensearch.index.remote.RemoteIndexPath.COMBINED_PATH; import static org.opensearch.index.remote.RemoteIndexPath.SEGMENT_PATH; import static org.opensearch.index.remote.RemoteIndexPath.TRANSLOG_PATH; +import static org.opensearch.index.remote.RemoteStoreUtils.determineRemoteStorePathStrategy; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteDataAttributePresent; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled; @@ -59,6 +61,7 @@ @ExperimentalApi public class RemoteIndexPathUploader extends IndexMetadataUploadListener { + private static final String DELIMITER = "#"; public static final ConfigBlobStoreFormat REMOTE_INDEX_PATH_FORMAT = new ConfigBlobStoreFormat<>( RemoteIndexPath.FILE_NAME_FORMAT ); @@ -99,7 +102,11 @@ public RemoteIndexPathUploader( } @Override - protected void doOnNewIndexUpload(List indexMetadataList, ActionListener actionListener) { + protected void doOnUpload( + List indexMetadataList, + Map prevIndexMetadataByName, + ActionListener actionListener + ) { if (isRemoteDataAttributePresent == false) { logger.trace("Skipping beforeNewIndexUpload as there are no remote indexes"); actionListener.onResponse(null); @@ -108,7 +115,9 @@ protected void doOnNewIndexUpload(List indexMetadataList, ActionL long startTime = System.nanoTime(); boolean success = false; - List eligibleList = indexMetadataList.stream().filter(this::requiresPathUpload).collect(Collectors.toList()); + List eligibleList = indexMetadataList.stream() + .filter(idxMd -> requiresPathUpload(idxMd, prevIndexMetadataByName.get(idxMd.getIndex().getName()))) + .collect(Collectors.toList()); String indexNames = eligibleList.stream().map(IndexMetadata::getIndex).map(Index::toString).collect(Collectors.joining(",")); int latchCount = eligibleList.size() * (isTranslogSegmentRepoSame ? 1 : 2); CountDownLatch latch = new CountDownLatch(latchCount); @@ -182,7 +191,7 @@ private void writePathToRemoteStore( Map> pathCreationMap ) { Map remoteCustomData = idxMD.getCustomData(IndexMetadata.REMOTE_STORE_CUSTOM_KEY); - RemoteStoreEnums.PathType pathType = RemoteStoreEnums.PathType.valueOf(remoteCustomData.get(RemoteStoreEnums.PathType.NAME)); + PathType pathType = PathType.valueOf(remoteCustomData.get(PathType.NAME)); RemoteStoreEnums.PathHashAlgorithm hashAlgorithm = RemoteStoreEnums.PathHashAlgorithm.valueOf( remoteCustomData.get(RemoteStoreEnums.PathHashAlgorithm.NAME) ); @@ -192,17 +201,27 @@ private void writePathToRemoteStore( BlobContainer blobContainer = repository.blobStore().blobContainer(basePath.add(RemoteIndexPath.DIR)); ActionListener actionListener = getUploadPathLatchedActionListener(idxMD, latch, exceptionList, pathCreationMap); try { + RemoteIndexPath remoteIndexPath = new RemoteIndexPath( + indexUUID, + shardCount, + basePath, + pathType, + hashAlgorithm, + pathCreationMap + ); + String fileName = generateFileName(indexUUID, idxMD.getVersion(), remoteIndexPath.getVersion()); REMOTE_INDEX_PATH_FORMAT.writeAsyncWithUrgentPriority( new RemoteIndexPath(indexUUID, shardCount, basePath, pathType, hashAlgorithm, pathCreationMap), blobContainer, - indexUUID, + fileName, actionListener ); } catch (IOException ioException) { RemoteStateTransferException ex = new RemoteStateTransferException( - String.format(Locale.ROOT, UPLOAD_EXCEPTION_MSG, List.of(idxMD.getIndex().getName())) + String.format(Locale.ROOT, UPLOAD_EXCEPTION_MSG, List.of(idxMD.getIndex().getName())), + ioException ); - actionListener.onFailure(ioException); + actionListener.onFailure(ex); } } @@ -261,21 +280,28 @@ private LatchedActionListener getUploadPathLatchedActionListener( * This method checks if the index metadata has attributes that calls for uploading the index path for remote store * uploads. It checks if the remote store path type is {@code HASHED_PREFIX} and returns true if so. */ - private boolean requiresPathUpload(IndexMetadata indexMetadata) { - // A cluster will have remote custom metadata only if the cluster is remote store enabled from data side. - Map remoteCustomData = indexMetadata.getCustomData(IndexMetadata.REMOTE_STORE_CUSTOM_KEY); - if (Objects.isNull(remoteCustomData) || remoteCustomData.isEmpty()) { - return false; - } - String pathTypeStr = remoteCustomData.get(RemoteStoreEnums.PathType.NAME); - if (Objects.isNull(pathTypeStr)) { - return false; - } - // We need to upload the path only if the path type for an index is hashed_prefix - return RemoteStoreEnums.PathType.HASHED_PREFIX == RemoteStoreEnums.PathType.parseString(pathTypeStr); + private boolean requiresPathUpload(IndexMetadata indexMetadata, IndexMetadata prevIndexMetadata) { + PathType pathType = determineRemoteStorePathStrategy(indexMetadata, false).getType(); + PathType prevPathType = Objects.nonNull(prevIndexMetadata) + ? determineRemoteStorePathStrategy(prevIndexMetadata, false).getType() + : null; + // If previous metadata is null or previous path type is not hashed_prefix, and along with new path type being + // hashed_prefix, then this can mean any of the following - + // 1. This is creation of remote index with hashed_prefix + // 2. We are enabling cluster state for the very first time with multiple indexes having hashed_prefix path type. + // 3. A docrep index is being migrated to being remote store index. + return pathType == PathType.HASHED_PREFIX && (Objects.isNull(prevPathType) || prevPathType != PathType.HASHED_PREFIX); } private void setIndexMetadataUploadTimeout(TimeValue newIndexMetadataUploadTimeout) { this.indexMetadataUploadTimeout = newIndexMetadataUploadTimeout; } + + /** + * Creates a file name by combining index uuid, index metadata version and file version. # has been chosen as the + * delimiter since it does not collide with any possible letters in file name. + */ + public static String generateFileName(String indexUUID, long indexMetadataVersion, String fileVersion) { + return String.join(DELIMITER, indexUUID, Long.toString(indexMetadataVersion), fileVersion); + } } diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteStoreUtils.java b/server/src/main/java/org/opensearch/index/remote/RemoteStoreUtils.java index 4d1d98334c3c4..44da348974086 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteStoreUtils.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteStoreUtils.java @@ -8,6 +8,7 @@ package org.opensearch.index.remote; +import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.collect.Tuple; import java.nio.ByteBuffer; @@ -17,6 +18,7 @@ import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Objects; import java.util.function.Function; /** @@ -146,4 +148,25 @@ static String longToCompositeBase64AndBinaryEncoding(long value, int len) { assert base64DecimalValue >= 0 && base64DecimalValue < 64; return URL_BASE64_CHARSET[base64DecimalValue] + binaryPart; } + + /** + * Determines the remote store path strategy by reading the custom data map in IndexMetadata class. + */ + public static RemoteStorePathStrategy determineRemoteStorePathStrategy(IndexMetadata indexMetadata, boolean assertRemoteCustomData) { + Map remoteCustomData = indexMetadata.getCustomData(IndexMetadata.REMOTE_STORE_CUSTOM_KEY); + if (assertRemoteCustomData) { + assert remoteCustomData == null || remoteCustomData.containsKey(RemoteStoreEnums.PathType.NAME); + } + if (remoteCustomData != null && remoteCustomData.containsKey(RemoteStoreEnums.PathType.NAME)) { + RemoteStoreEnums.PathType pathType = RemoteStoreEnums.PathType.parseString( + remoteCustomData.get(RemoteStoreEnums.PathType.NAME) + ); + String hashAlgoStr = remoteCustomData.get(RemoteStoreEnums.PathHashAlgorithm.NAME); + RemoteStoreEnums.PathHashAlgorithm hashAlgorithm = Objects.nonNull(hashAlgoStr) + ? RemoteStoreEnums.PathHashAlgorithm.parseString(hashAlgoStr) + : null; + return new RemoteStorePathStrategy(pathType, hashAlgorithm); + } + return new RemoteStorePathStrategy(RemoteStoreEnums.PathType.FIXED); + } } diff --git a/server/src/test/java/org/opensearch/index/remote/RemoteIndexPathUploaderTests.java b/server/src/test/java/org/opensearch/index/remote/RemoteIndexPathUploaderTests.java index 2e4dd15ccb581..1d6e018b9176c 100644 --- a/server/src/test/java/org/opensearch/index/remote/RemoteIndexPathUploaderTests.java +++ b/server/src/test/java/org/opensearch/index/remote/RemoteIndexPathUploaderTests.java @@ -23,6 +23,7 @@ import org.opensearch.core.action.ActionListener; import org.opensearch.gateway.remote.RemoteClusterStateService; import org.opensearch.gateway.remote.RemoteClusterStateService.RemoteStateTransferException; +import org.opensearch.index.remote.RemoteStoreEnums.PathHashAlgorithm; import org.opensearch.index.remote.RemoteStoreEnums.PathType; import org.opensearch.node.Node; import org.opensearch.node.remotestore.RemoteStoreNodeAttribute; @@ -36,11 +37,11 @@ import java.io.IOException; import java.io.InputStream; -import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.concurrent.atomic.AtomicLong; import org.mockito.Mockito; @@ -101,8 +102,8 @@ public void setup() { Map remoteCustomData = Map.of( PathType.NAME, HASHED_PREFIX.name(), - RemoteStoreEnums.PathHashAlgorithm.NAME, - RemoteStoreEnums.PathHashAlgorithm.FNV_1A_BASE64.name() + PathHashAlgorithm.NAME, + PathHashAlgorithm.FNV_1A_BASE64.name() ); Settings idxSettings = Settings.builder() .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) @@ -136,7 +137,7 @@ public void testInterceptWithNoRemoteDataAttributes() { res -> successCount.incrementAndGet(), ex -> failureCount.incrementAndGet() ); - remoteIndexPathUploader.doOnNewIndexUpload(indexMetadataList, actionListener); + remoteIndexPathUploader.doOnUpload(indexMetadataList, Collections.emptyMap(), actionListener); assertEquals(1, successCount.get()); assertEquals(0, failureCount.get()); verify(indexMetadataList, times(0)).stream(); @@ -154,7 +155,7 @@ public void testInterceptWithEmptyIndexMetadataList() { res -> successCount.incrementAndGet(), ex -> failureCount.incrementAndGet() ); - remoteIndexPathUploader.doOnNewIndexUpload(Collections.emptyList(), actionListener); + remoteIndexPathUploader.doOnUpload(Collections.emptyList(), Collections.emptyMap(), actionListener); assertEquals(1, successCount.get()); assertEquals(0, failureCount.get()); } @@ -173,34 +174,43 @@ public void testInterceptWithEmptyEligibleIndexMetadataList() { ); // Case 1 - Null remoteCustomData - List indexMetadataList = new ArrayList<>(); - IndexMetadata indexMetadata = mock(IndexMetadata.class); - indexMetadataList.add(indexMetadata); - remoteIndexPathUploader.doOnNewIndexUpload(indexMetadataList, actionListener); + List indexMetadataList = List.of(createIndexMetadata(null)); + remoteIndexPathUploader.doOnUpload(indexMetadataList, Collections.emptyMap(), actionListener); assertEquals(1, successCount.get()); assertEquals(0, failureCount.get()); // Case 2 - Empty remoteCustomData - when(indexMetadata.getCustomData(IndexMetadata.REMOTE_STORE_CUSTOM_KEY)).thenReturn(new HashMap<>()); - remoteIndexPathUploader.doOnNewIndexUpload(indexMetadataList, actionListener); + indexMetadataList = List.of(createIndexMetadata(new HashMap<>())); + remoteIndexPathUploader.doOnUpload(indexMetadataList, Collections.emptyMap(), actionListener); assertEquals(2, successCount.get()); assertEquals(0, failureCount.get()); // Case 3 - RemoteStoreEnums.PathType.NAME not in remoteCustomData map - Map remoteCustomData = Map.of("test", "test"); - when(indexMetadata.getCustomData(IndexMetadata.REMOTE_STORE_CUSTOM_KEY)).thenReturn(remoteCustomData); - remoteIndexPathUploader.doOnNewIndexUpload(indexMetadataList, actionListener); + indexMetadataList = List.of(createIndexMetadata(Map.of("test", "test"))); + remoteIndexPathUploader.doOnUpload(indexMetadataList, Collections.emptyMap(), actionListener); assertEquals(3, successCount.get()); assertEquals(0, failureCount.get()); // Case 4 - RemoteStoreEnums.PathType.NAME is not HASHED_PREFIX - remoteCustomData = Map.of(PathType.NAME, randomFrom(FIXED, HASHED_INFIX).name()); - when(indexMetadata.getCustomData(IndexMetadata.REMOTE_STORE_CUSTOM_KEY)).thenReturn(remoteCustomData); - remoteIndexPathUploader.doOnNewIndexUpload(indexMetadataList, actionListener); + PathType pathType = randomFrom(FIXED, HASHED_INFIX); + PathHashAlgorithm pathHashAlgorithm = pathType == FIXED ? null : randomFrom(PathHashAlgorithm.values()); + indexMetadataList = List.of(createIndexMetadata(Map.of(PathType.NAME, randomFrom(FIXED, HASHED_INFIX).name()))); + remoteIndexPathUploader.doOnUpload(indexMetadataList, Collections.emptyMap(), actionListener); assertEquals(4, successCount.get()); assertEquals(0, failureCount.get()); } + private IndexMetadata createIndexMetadata(Map remoteCustomData) { + IndexMetadata.Builder builder = IndexMetadata.builder("test") + .settings(Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)) + .numberOfShards(1) + .numberOfReplicas(0); + if (Objects.nonNull(remoteCustomData)) { + builder.putCustom(IndexMetadata.REMOTE_STORE_CUSTOM_KEY, remoteCustomData); + } + return builder.build(); + } + public void testInterceptWithSameRepo() throws IOException { RemoteIndexPathUploader remoteIndexPathUploader = new RemoteIndexPathUploader( threadPool, @@ -213,7 +223,7 @@ public void testInterceptWithSameRepo() throws IOException { res -> successCount.incrementAndGet(), ex -> failureCount.incrementAndGet() ); - remoteIndexPathUploader.doOnNewIndexUpload(indexMetadataList, actionListener); + remoteIndexPathUploader.doOnUpload(indexMetadataList, Collections.emptyMap(), actionListener); assertEquals(1, successCount.get()); assertEquals(0, failureCount.get()); verify(blobContainer, times(1)).writeBlob(anyString(), any(InputStream.class), anyLong(), anyBoolean()); @@ -236,7 +246,7 @@ public void testInterceptWithDifferentRepo() throws IOException { res -> successCount.incrementAndGet(), ex -> failureCount.incrementAndGet() ); - remoteIndexPathUploader.doOnNewIndexUpload(indexMetadataList, actionListener); + remoteIndexPathUploader.doOnUpload(indexMetadataList, Collections.emptyMap(), actionListener); assertEquals(1, successCount.get()); assertEquals(0, failureCount.get()); verify(blobContainer, times(2)).writeBlob(anyString(), any(InputStream.class), anyLong(), anyBoolean()); @@ -263,7 +273,7 @@ public void testInterceptWithLatchAwaitTimeout() throws IOException { failureCount.incrementAndGet(); exceptionSetOnce.set(ex); }); - remoteIndexPathUploader.doOnNewIndexUpload(indexMetadataList, actionListener); + remoteIndexPathUploader.doOnUpload(indexMetadataList, Collections.emptyMap(), actionListener); assertEquals(0, successCount.get()); assertEquals(1, failureCount.get()); assertTrue(exceptionSetOnce.get() instanceof RemoteStateTransferException); @@ -295,7 +305,7 @@ public void testInterceptWithInterruptedExceptionDuringLatchAwait() throws Excep }); Thread thread = new Thread(() -> { try { - remoteIndexPathUploader.onNewIndexUpload(indexMetadataList, actionListener); + remoteIndexPathUploader.onUpload(indexMetadataList, Collections.emptyMap(), actionListener); } catch (Exception e) { assertTrue(e instanceof InterruptedException); assertEquals("sleep interrupted", e.getMessage());